[ASTERIXDB-2444][STO] Avoid Using System Clock in Storage
- user model changes: no
- storage format changes: yes
- interface changes: yes
Details:
- Replace the usage of system clock timestamps in LSM
index components file names by a sequencer. The next
sequence id to use is determined by checking the list
of existing components on disk. Note that due to a
rollback, an index checkpoint file may have last valid
component sequence which is greater than what is on disk.
This should not cause any issues since only components
that have a sequence greater than that appears in the
checkpoint will be deleted.
- Replace the usage of system clock timestamps in LSM
index components ids by a monotonically increasing
sequencer. The sequencer is initialized after restarts
by the last valid component id that appears in the
index checkpoint.
- Refactor the logic to generate flush/merge file names.
- Refactor the logic to check invalid components.
- Adapt test cases to new naming format.
Change-Id: I9dff8ffb38ce8064a199d03b070ed1f5b924b8a4
Reviewed-on: https://asterix-gerrit.ics.uci.edu/2927
Sonar-Qube: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
Reviewed-by: abdullah alamoudi <bamousaa@gmail.com>
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/IndexCheckpointManager.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/IndexCheckpointManager.java
index 64d8e93..3c62d99 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/IndexCheckpointManager.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/IndexCheckpointManager.java
@@ -30,7 +30,6 @@
import java.util.Arrays;
import java.util.Comparator;
import java.util.List;
-import java.util.Optional;
import org.apache.asterix.common.storage.IIndexCheckpointManager;
import org.apache.asterix.common.storage.IndexCheckpoint;
@@ -56,7 +55,7 @@
}
@Override
- public synchronized void init(String lastComponentTimestamp, long lsn) throws HyracksDataException {
+ public synchronized void init(long validComponentSequence, long lsn) throws HyracksDataException {
List<IndexCheckpoint> checkpoints;
try {
checkpoints = getCheckpoints();
@@ -67,25 +66,24 @@
LOGGER.warn(() -> "Checkpoints found on initializing: " + indexPath);
delete();
}
- IndexCheckpoint firstCheckpoint = IndexCheckpoint.first(lastComponentTimestamp, lsn);
+ IndexCheckpoint firstCheckpoint = IndexCheckpoint.first(validComponentSequence, lsn);
persist(firstCheckpoint);
}
@Override
- public synchronized void replicated(String componentTimestamp, long masterLsn, long componentId)
+ public synchronized void replicated(long componentSequence, long masterLsn, long componentId)
throws HyracksDataException {
final Long localLsn = getLatest().getMasterNodeFlushMap().get(masterLsn);
if (localLsn == null) {
throw new IllegalStateException("Component flushed before lsn mapping was received");
}
- flushed(componentTimestamp, localLsn, componentId);
+ flushed(componentSequence, localLsn, componentId);
}
@Override
- public synchronized void flushed(String componentTimestamp, long lsn, long componentId)
- throws HyracksDataException {
+ public synchronized void flushed(long componentSequence, long lsn, long componentId) throws HyracksDataException {
final IndexCheckpoint latest = getLatest();
- IndexCheckpoint nextCheckpoint = IndexCheckpoint.next(latest, lsn, componentTimestamp, componentId);
+ IndexCheckpoint nextCheckpoint = IndexCheckpoint.next(latest, lsn, componentSequence, componentId);
persist(nextCheckpoint);
deleteHistory(nextCheckpoint.getId(), HISTORY_CHECKPOINTS);
}
@@ -95,7 +93,7 @@
final IndexCheckpoint latest = getLatest();
latest.getMasterNodeFlushMap().put(masterLsn, localLsn);
final IndexCheckpoint next = IndexCheckpoint.next(latest, latest.getLowWatermark(),
- latest.getValidComponentTimestamp(), latest.getLastComponentId());
+ latest.getValidComponentSequence(), latest.getLastComponentId());
persist(next);
notifyAll();
}
@@ -119,9 +117,8 @@
}
@Override
- public Optional<String> getValidComponentTimestamp() throws HyracksDataException {
- String validComponentTimestamp = getLatest().getValidComponentTimestamp();
- return validComponentTimestamp != null ? Optional.of(validComponentTimestamp) : Optional.empty();
+ public long getValidComponentSequence() throws HyracksDataException {
+ return getLatest().getValidComponentSequence();
}
@Override
@@ -153,18 +150,17 @@
@Override
public synchronized void setLastComponentId(long componentId) throws HyracksDataException {
final IndexCheckpoint latest = getLatest();
- final IndexCheckpoint next = IndexCheckpoint.next(latest, latest.getLowWatermark(),
- latest.getValidComponentTimestamp(), componentId);
+ final IndexCheckpoint next =
+ IndexCheckpoint.next(latest, latest.getLowWatermark(), latest.getValidComponentSequence(), componentId);
persist(next);
}
@Override
- public synchronized void advanceValidComponentTimestamp(String timestamp) throws HyracksDataException {
+ public synchronized void advanceValidComponentSequence(long componentSequence) throws HyracksDataException {
final IndexCheckpoint latest = getLatest();
- if (latest.getValidComponentTimestamp() == null
- || timestamp.compareTo(latest.getValidComponentTimestamp()) > 0) {
- final IndexCheckpoint next =
- IndexCheckpoint.next(latest, latest.getLowWatermark(), timestamp, latest.getLastComponentId());
+ if (componentSequence > latest.getValidComponentSequence()) {
+ final IndexCheckpoint next = IndexCheckpoint.next(latest, latest.getLowWatermark(), componentSequence,
+ latest.getLastComponentId());
persist(next);
}
}
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestLsmIoOpCallbackFactory.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestLsmIoOpCallbackFactory.java
index 108afa7..f4c0ea6 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestLsmIoOpCallbackFactory.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestLsmIoOpCallbackFactory.java
@@ -28,7 +28,7 @@
import org.apache.hyracks.api.io.IJsonSerializable;
import org.apache.hyracks.api.io.IPersistedResourceRegistry;
import org.apache.hyracks.storage.am.lsm.btree.impl.TestLsmBtree;
-import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentId;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentIdGenerator;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation.LSMIOOperationType;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback;
@@ -64,8 +64,8 @@
// Whenever this is called, it resets the counter
// However, the counters for the failed operations are never reset since we expect them
// To be always 0
- return new TestLsmIoOpCallback(datasetInfoProvider.getDatasetInfo(ncCtx), index,
- getComponentIdGenerator().getId(), getIndexCheckpointManagerProvider());
+ return new TestLsmIoOpCallback(datasetInfoProvider.getDatasetInfo(ncCtx), index, getComponentIdGenerator(),
+ getIndexCheckpointManagerProvider());
}
public int getTotalFlushes() {
@@ -125,9 +125,9 @@
public class TestLsmIoOpCallback extends LSMIOOperationCallback {
private final TestLsmBtree lsmBtree;
- public TestLsmIoOpCallback(DatasetInfo dsInfo, ILSMIndex index, ILSMComponentId id,
+ public TestLsmIoOpCallback(DatasetInfo dsInfo, ILSMIndex index, ILSMComponentIdGenerator idGenerator,
IIndexCheckpointManagerProvider checkpointManagerProvider) {
- super(dsInfo, index, id, checkpointManagerProvider);
+ super(dsInfo, index, idGenerator, checkpointManagerProvider);
lsmBtree = (TestLsmBtree) index;
}
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/storage/PersistentLocalResourceRepositoryTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/storage/PersistentLocalResourceRepositoryTest.java
index 694a0c7..54ae683 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/storage/PersistentLocalResourceRepositoryTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/storage/PersistentLocalResourceRepositoryTest.java
@@ -18,18 +18,11 @@
*/
package org.apache.asterix.test.storage;
-import static org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndexFileManager.COMPONENT_TIMESTAMP_FORMAT;
-
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
-import java.text.Format;
-import java.text.SimpleDateFormat;
import java.util.Arrays;
-import java.util.Date;
-import java.util.Optional;
-import java.util.concurrent.TimeUnit;
import org.apache.asterix.api.common.AsterixHyracksIntegrationUtil;
import org.apache.asterix.common.TestDataUtil;
@@ -129,17 +122,15 @@
TestDataUtil.upsertData(datasetName, 100);
ncAppCtx.getDatasetLifecycleManager().flushDataset(dataset.getDatasetId(), false);
- // create new invalid component with a timestamp > checkpoint valid component timestamp (i.e. in the future)
- Format formatter = new SimpleDateFormat(COMPONENT_TIMESTAMP_FORMAT);
- Date futureTime = new Date(System.currentTimeMillis() + TimeUnit.SECONDS.toMillis(5));
- String invalidComponentTimestamp =
- formatter.format(futureTime) + AbstractLSMIndexFileManager.DELIMITER + formatter.format(futureTime);
+ // create new invalid component sequence with a sequence > checkpoint valid component sequence
+ String invalidComponentId = "1000";
+ String invalidComponentRange = invalidComponentId + AbstractLSMIndexFileManager.DELIMITER + invalidComponentId;
FileReference indexDirRef = ncAppCtx.getIoManager().resolve(indexPath);
String indexDir = indexDirRef.getFile().getAbsolutePath();
// create the invalid component files
- Path btreePath = Paths.get(indexDir, invalidComponentTimestamp + AbstractLSMIndexFileManager.DELIMITER
+ Path btreePath = Paths.get(indexDir, invalidComponentRange + AbstractLSMIndexFileManager.DELIMITER
+ AbstractLSMIndexFileManager.BTREE_SUFFIX);
- Path filterPath = Paths.get(indexDir, invalidComponentTimestamp + AbstractLSMIndexFileManager.DELIMITER
+ Path filterPath = Paths.get(indexDir, invalidComponentRange + AbstractLSMIndexFileManager.DELIMITER
+ AbstractLSMIndexFileManager.BLOOM_FILTER_SUFFIX);
Files.createFile(btreePath);
Files.createFile(filterPath);
@@ -156,14 +147,14 @@
DatasetResourceReference drr = DatasetResourceReference.of(localResource);
IIndexCheckpointManagerProvider indexCheckpointManagerProvider = ncAppCtx.getIndexCheckpointManagerProvider();
IIndexCheckpointManager indexCheckpointManager = indexCheckpointManagerProvider.get(drr);
- Optional<String> validComponentTimestamp = indexCheckpointManager.getValidComponentTimestamp();
- Assert.assertTrue(validComponentTimestamp.isPresent());
+ long validComponentSequence = indexCheckpointManager.getValidComponentSequence();
+ Assert.assertTrue(validComponentSequence > Long.MIN_VALUE);
File[] indexRemainingFiles =
indexDirRef.getFile().listFiles(AbstractLSMIndexFileManager.COMPONENT_FILES_FILTER);
Assert.assertNotNull(indexRemainingFiles);
long validComponentFilesCount = Arrays.stream(indexRemainingFiles)
- .filter(file -> file.getName().startsWith(validComponentTimestamp.get())).count();
+ .filter(file -> file.getName().startsWith(String.valueOf(validComponentSequence))).count();
Assert.assertTrue(validComponentFilesCount > 0);
}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
index 50a4bef..682eaea 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
@@ -19,6 +19,7 @@
package org.apache.asterix.common.context;
import static org.apache.asterix.common.metadata.MetadataIndexImmutableProperties.METADATA_DATASETS_PARTITIONS;
+import static org.apache.hyracks.storage.am.lsm.common.impls.LSMComponentId.MIN_VALID_COMPONENT_ID;
import java.io.IOException;
import java.io.OutputStream;
@@ -112,6 +113,9 @@
datasetResource = getDatasetLifecycle(did);
}
datasetResource.register(resource, (ILSMIndex) index);
+ if (((ILSMIndex) index).isPrimaryIndex()) {
+ initializeDatasetPartitionValidComponentId(datasetResource, resource);
+ }
}
private int getDIDfromResourcePath(String resourcePath) throws HyracksDataException {
@@ -600,4 +604,24 @@
indexInfo.setOpen(false);
}
}
+
+ private void initializeDatasetPartitionValidComponentId(DatasetResource datasetResource,
+ LocalResource primaryIndexResource) {
+ final IndexInfo indexInfo = datasetResource.getIndexInfo(primaryIndexResource.getId());
+ final int partition = indexInfo.getPartition();
+ final ILSMComponentIdGenerator componentIdGenerator =
+ getComponentIdGenerator(datasetResource.getDatasetID(), partition);
+ final long indexLastValidComponentId = getIndexLastValidComponentId(indexInfo.getLocalResource());
+ componentIdGenerator.init(indexLastValidComponentId);
+ }
+
+ private long getIndexLastValidComponentId(LocalResource resource) {
+ try {
+ final DatasetResourceReference datasetResource = DatasetResourceReference.of(resource);
+ return Math.max(indexCheckpointManagerProvider.get(datasetResource).getLatest().getLastComponentId(),
+ MIN_VALID_COMPONENT_ID);
+ } catch (HyracksDataException e) {
+ throw new IllegalStateException(e);
+ }
+ }
}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMIOOperationCallback.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMIOOperationCallback.java
index ea53d68..606d63c 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMIOOperationCallback.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMIOOperationCallback.java
@@ -37,6 +37,7 @@
import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentId;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentIdGenerator;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation.LSMIOOperationStatus;
@@ -44,9 +45,9 @@
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMMemoryComponent;
-import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndexFileManager;
import org.apache.hyracks.storage.am.lsm.common.impls.DiskComponentMetadata;
import org.apache.hyracks.storage.am.lsm.common.impls.FlushOperation;
+import org.apache.hyracks.storage.am.lsm.common.impls.IndexComponentFileReference;
import org.apache.hyracks.storage.am.lsm.common.impls.LSMComponentId;
import org.apache.hyracks.storage.am.lsm.common.util.ComponentUtils;
import org.apache.hyracks.storage.am.lsm.common.util.LSMComponentIdUtils;
@@ -68,17 +69,19 @@
private final IIndexCheckpointManagerProvider indexCheckpointManagerProvider;
protected final DatasetInfo dsInfo;
protected final ILSMIndex lsmIndex;
+ private final ILSMComponentIdGenerator componentIdGenerator;
private long firstLsnForCurrentMemoryComponent = 0L;
private long persistenceLsn = 0L;
private int pendingFlushes = 0;
private Deque<ILSMComponentId> componentIds = new ArrayDeque<>();
+ private boolean firstAllocation = true;
- public LSMIOOperationCallback(DatasetInfo dsInfo, ILSMIndex lsmIndex, ILSMComponentId nextComponentId,
+ public LSMIOOperationCallback(DatasetInfo dsInfo, ILSMIndex lsmIndex, ILSMComponentIdGenerator componentIdGenerator,
IIndexCheckpointManagerProvider indexCheckpointManagerProvider) {
this.dsInfo = dsInfo;
this.lsmIndex = lsmIndex;
+ this.componentIdGenerator = componentIdGenerator;
this.indexCheckpointManagerProvider = indexCheckpointManagerProvider;
- componentIds.add(nextComponentId);
}
@Override
@@ -132,8 +135,8 @@
operation.getIOOpertionType() == LSMIOOperationType.FLUSH ? (Long) map.get(KEY_FLUSH_LOG_LSN) : 0L;
final LSMComponentId id = (LSMComponentId) map.get(KEY_FLUSHED_COMPONENT_ID);
final ResourceReference ref = ResourceReference.of(target.getAbsolutePath());
- final String componentEndTime = AbstractLSMIndexFileManager.getComponentEndTime(ref.getName());
- indexCheckpointManagerProvider.get(ref).flushed(componentEndTime, lsn, id.getMaxId());
+ final long componentSequence = IndexComponentFileReference.of(ref.getName()).getSequenceEnd();
+ indexCheckpointManagerProvider.get(ref).flushed(componentSequence, lsn, id.getMaxId());
}
private void deleteComponentsFromCheckpoint(ILSMIOOperation operation) throws HyracksDataException {
@@ -275,6 +278,9 @@
@Override
public void allocated(ILSMMemoryComponent component) throws HyracksDataException {
- // No Op
+ if (firstAllocation) {
+ firstAllocation = false;
+ componentIds.add(componentIdGenerator.getId());
+ }
}
}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMIndexIOOperationCallbackFactory.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMIndexIOOperationCallbackFactory.java
index 25cd8b2..68ffd6a 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMIndexIOOperationCallbackFactory.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMIndexIOOperationCallbackFactory.java
@@ -70,8 +70,8 @@
@Override
public ILSMIOOperationCallback createIoOpCallback(ILSMIndex index) throws HyracksDataException {
- return new LSMIOOperationCallback(datasetInfoProvider.getDatasetInfo(ncCtx), index,
- getComponentIdGenerator().getId(), getIndexCheckpointManagerProvider());
+ return new LSMIOOperationCallback(datasetInfoProvider.getDatasetInfo(ncCtx), index, getComponentIdGenerator(),
+ getIndexCheckpointManagerProvider());
}
@Override
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IIndexCheckpointManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IIndexCheckpointManager.java
index 2c0872c..dd9ede5 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IIndexCheckpointManager.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IIndexCheckpointManager.java
@@ -27,33 +27,33 @@
/**
* Initializes the first checkpoint of an index with low watermark {@code lsn}
*
- * @param componentTimestamp
+ * @param validComponentSequence
* @param lsn
* @throws HyracksDataException
*/
- void init(String componentTimestamp, long lsn) throws HyracksDataException;
+ void init(long validComponentSequence, long lsn) throws HyracksDataException;
/**
* Called when a new LSM disk component is flushed. When called, the index checkpoint is updated
- * with the latest valid {@code componentTimestamp} and low watermark {@code lsn}
+ * with the latest valid {@code componentSequence} and low watermark {@code lsn}
*
- * @param componentTimestamp
+ * @param componentSequence
* @param lsn
* @throws HyracksDataException
*/
- void flushed(String componentTimestamp, long lsn, long componentId) throws HyracksDataException;
+ void flushed(long componentSequence, long lsn, long componentId) throws HyracksDataException;
/**
* Called when a new LSM disk component is replicated from master. When called, the index checkpoint is updated
- * with the latest valid {@code componentTimestamp} and the local lsn mapping of {@code masterLsn} is set as the
+ * with the latest valid {@code componentSequence} and the local lsn mapping of {@code masterLsn} is set as the
* new low watermark.
*
- * @param componentTimestamp
+ * @param componentSequence
* @param masterLsn
* @param componentId
* @throws HyracksDataException
*/
- void replicated(String componentTimestamp, long masterLsn, long componentId) throws HyracksDataException;
+ void replicated(long componentSequence, long masterLsn, long componentId) throws HyracksDataException;
/**
* Called when a flush log is received and replicated from master. The mapping between
@@ -88,12 +88,12 @@
void delete();
/**
- * Gets the index last valid component timestamp if the index has any components. Otherwise {@link Optional#empty()}
+ * Gets the index last valid component sequence.
*
- * @return the index last valid component timestamp
+ * @return the index last valid component sequence
* @throws HyracksDataException
*/
- Optional<String> getValidComponentTimestamp() throws HyracksDataException;
+ long getValidComponentSequence() throws HyracksDataException;
/**
* Gets the number of valid checkpoints the index has.
@@ -110,12 +110,12 @@
IndexCheckpoint getLatest() throws HyracksDataException;
/**
- * Advance the last valid component timestamp. Used for replicated bulkloaded components
+ * Advance the last valid component sequence. Used for replicated bulkloaded components
*
- * @param timeStamp
+ * @param componentSequence
* @throws HyracksDataException
*/
- void advanceValidComponentTimestamp(String timeStamp) throws HyracksDataException;
+ void advanceValidComponentSequence(long componentSequence) throws HyracksDataException;
/**
* Set the last component id. Used during recovery or after component delete
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IndexCheckpoint.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IndexCheckpoint.java
index 73d3122..f84167e 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IndexCheckpoint.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IndexCheckpoint.java
@@ -34,22 +34,22 @@
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
private static final long INITIAL_CHECKPOINT_ID = 0;
private long id;
- private String validComponentTimestamp;
+ private long validComponentSequence;
private long lowWatermark;
private long lastComponentId;
private Map<Long, Long> masterNodeFlushMap;
- public static IndexCheckpoint first(String lastComponentTimestamp, long lowWatermark) {
+ public static IndexCheckpoint first(long lastComponentSequence, long lowWatermark) {
IndexCheckpoint firstCheckpoint = new IndexCheckpoint();
firstCheckpoint.id = INITIAL_CHECKPOINT_ID;
firstCheckpoint.lowWatermark = lowWatermark;
- firstCheckpoint.validComponentTimestamp = lastComponentTimestamp;
+ firstCheckpoint.validComponentSequence = lastComponentSequence;
firstCheckpoint.lastComponentId = LSMComponentId.EMPTY_INDEX_LAST_COMPONENT_ID.getMaxId();
firstCheckpoint.masterNodeFlushMap = new HashMap<>();
return firstCheckpoint;
}
- public static IndexCheckpoint next(IndexCheckpoint latest, long lowWatermark, String validComponentTimestamp,
+ public static IndexCheckpoint next(IndexCheckpoint latest, long lowWatermark, long validComponentSequence,
long lastComponentId) {
if (lowWatermark < latest.getLowWatermark()) {
throw new IllegalStateException("Low watermark should always be increasing");
@@ -58,7 +58,7 @@
next.id = latest.getId() + 1;
next.lowWatermark = lowWatermark;
next.lastComponentId = lastComponentId;
- next.validComponentTimestamp = validComponentTimestamp;
+ next.validComponentSequence = validComponentSequence;
next.masterNodeFlushMap = latest.getMasterNodeFlushMap();
// remove any lsn from the map that wont be used anymore
next.masterNodeFlushMap.values().removeIf(lsn -> lsn <= lowWatermark);
@@ -69,8 +69,8 @@
private IndexCheckpoint() {
}
- public String getValidComponentTimestamp() {
- return validComponentTimestamp;
+ public long getValidComponentSequence() {
+ return validComponentSequence;
}
public long getLowWatermark() {
diff --git a/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/LSMIOOperationCallbackTest.java b/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/LSMIOOperationCallbackTest.java
index 29a2aa0..4b18e1d 100644
--- a/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/LSMIOOperationCallbackTest.java
+++ b/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/LSMIOOperationCallbackTest.java
@@ -19,9 +19,8 @@
package org.apache.asterix.test.ioopcallbacks;
-import java.text.Format;
-import java.text.SimpleDateFormat;
-import java.util.Date;
+import static org.apache.hyracks.storage.am.lsm.common.impls.LSMComponentId.MIN_VALID_COMPONENT_ID;
+
import java.util.HashMap;
import java.util.Map;
@@ -38,7 +37,6 @@
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMMemoryComponent;
-import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndexFileManager;
import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMMemoryComponent;
import org.apache.hyracks.storage.am.lsm.common.impls.DiskComponentMetadata;
import org.apache.hyracks.storage.am.lsm.common.impls.FlushOperation;
@@ -64,13 +62,11 @@
* 7. destroy
*/
- private static final Format FORMATTER =
- new SimpleDateFormat(AbstractLSMIndexFileManager.COMPONENT_TIMESTAMP_FORMAT);
+ private static long COMPONENT_SEQUENCE = 0;
private static String getComponentFileName() {
- Date date = new Date();
- String ts = FORMATTER.format(date);
- return ts + '_' + ts;
+ final String sequence = String.valueOf(COMPONENT_SEQUENCE++);
+ return sequence + '_' + sequence;
}
@Test
@@ -83,8 +79,9 @@
Mockito.when(mockIndex.getCurrentMemoryComponent()).thenReturn(Mockito.mock(AbstractLSMMemoryComponent.class));
DatasetInfo dsInfo = new DatasetInfo(101, null);
LSMComponentIdGenerator idGenerator = new LSMComponentIdGenerator(numMemoryComponents);
- LSMIOOperationCallback callback = new LSMIOOperationCallback(dsInfo, mockIndex, idGenerator.getId(),
- mockIndexCheckpointManagerProvider());
+ idGenerator.init(MIN_VALID_COMPONENT_ID);
+ LSMIOOperationCallback callback =
+ new LSMIOOperationCallback(dsInfo, mockIndex, idGenerator, mockIndexCheckpointManagerProvider());
//Flush first
idGenerator.refresh();
long flushLsn = 1L;
@@ -142,21 +139,18 @@
int numMemoryComponents = 2;
DatasetInfo dsInfo = new DatasetInfo(101, null);
ILSMComponentIdGenerator idGenerator = new LSMComponentIdGenerator(numMemoryComponents);
+ idGenerator.init(MIN_VALID_COMPONENT_ID);
ILSMIndex mockIndex = Mockito.mock(ILSMIndex.class);
Mockito.when(mockIndex.getNumberOfAllMemoryComponents()).thenReturn(numMemoryComponents);
ILSMMemoryComponent mockComponent = Mockito.mock(AbstractLSMMemoryComponent.class);
Mockito.when(mockIndex.getCurrentMemoryComponent()).thenReturn(mockComponent);
- LSMIOOperationCallback callback = new LSMIOOperationCallback(dsInfo, mockIndex, idGenerator.getId(),
- mockIndexCheckpointManagerProvider());
+ LSMIOOperationCallback callback =
+ new LSMIOOperationCallback(dsInfo, mockIndex, idGenerator, mockIndexCheckpointManagerProvider());
+ callback.allocated(mockComponent);
ILSMComponentId initialId = idGenerator.getId();
- // simulate a partition is flushed before allocated
idGenerator.refresh();
long flushLsn = 1L;
ILSMComponentId nextComponentId = idGenerator.getId();
- Map<String, Object> flushMap = new HashMap<>();
- flushMap.put(LSMIOOperationCallback.KEY_FLUSH_LOG_LSN, flushLsn);
- flushMap.put(LSMIOOperationCallback.KEY_NEXT_COMPONENT_ID, nextComponentId);
- callback.allocated(mockComponent);
callback.recycled(mockComponent);
checkMemoryComponent(initialId, mockComponent);
}
@@ -166,13 +160,15 @@
int numMemoryComponents = 2;
DatasetInfo dsInfo = new DatasetInfo(101, null);
ILSMComponentIdGenerator idGenerator = new LSMComponentIdGenerator(numMemoryComponents);
+ idGenerator.init(MIN_VALID_COMPONENT_ID);
ILSMIndex mockIndex = Mockito.mock(ILSMIndex.class);
Mockito.when(mockIndex.getNumberOfAllMemoryComponents()).thenReturn(numMemoryComponents);
ILSMMemoryComponent mockComponent = Mockito.mock(AbstractLSMMemoryComponent.class);
Mockito.when(mockIndex.getCurrentMemoryComponent()).thenReturn(mockComponent);
- LSMIOOperationCallback callback = new LSMIOOperationCallback(dsInfo, mockIndex, idGenerator.getId(),
- mockIndexCheckpointManagerProvider());
+ LSMIOOperationCallback callback =
+ new LSMIOOperationCallback(dsInfo, mockIndex, idGenerator, mockIndexCheckpointManagerProvider());
String indexId = "mockIndexId";
+ callback.allocated(mockComponent);
ILSMComponentId id = idGenerator.getId();
callback.recycled(mockComponent);
checkMemoryComponent(id, mockComponent);
@@ -223,7 +219,8 @@
IIndexCheckpointManagerProvider indexCheckpointManagerProvider =
Mockito.mock(IIndexCheckpointManagerProvider.class);
IIndexCheckpointManager indexCheckpointManager = Mockito.mock(IIndexCheckpointManager.class);
- Mockito.doNothing().when(indexCheckpointManager).flushed(Mockito.any(), Mockito.anyLong(), Mockito.anyLong());
+ Mockito.doNothing().when(indexCheckpointManager).flushed(Mockito.anyLong(), Mockito.anyLong(),
+ Mockito.anyLong());
Mockito.doReturn(indexCheckpointManager).when(indexCheckpointManagerProvider).get(Mockito.any());
return indexCheckpointManagerProvider;
}
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/CheckpointPartitionIndexesTask.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/CheckpointPartitionIndexesTask.java
index d4d601c..448613b 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/CheckpointPartitionIndexesTask.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/CheckpointPartitionIndexesTask.java
@@ -36,6 +36,7 @@
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.io.IIOManager;
import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndexFileManager;
+import org.apache.hyracks.storage.am.lsm.common.impls.IndexComponentFileReference;
import org.apache.hyracks.storage.common.LocalResource;
/**
@@ -62,20 +63,19 @@
DatasetResourceReference ref = DatasetResourceReference.of(ls);
final IIndexCheckpointManager indexCheckpointManager = indexCheckpointManagerProvider.get(ref);
indexCheckpointManager.delete();
- // Get most recent timestamp of existing files to avoid deletion
+ // Get most recent sequence of existing files to avoid deletion
Path indexPath = StoragePathUtil.getIndexPath(ioManager, ref);
String[] files = indexPath.toFile().list(AbstractLSMIndexFileManager.COMPONENT_FILES_FILTER);
if (files == null) {
throw HyracksDataException
.create(new IOException(indexPath + " is not a directory or an IO Error occurred"));
}
- String mostRecentTimestamp = null;
+ long maxComponentSequence = Long.MIN_VALUE;
for (String file : files) {
- String nextTimeStamp = AbstractLSMIndexFileManager.getComponentEndTime(file);
- mostRecentTimestamp = mostRecentTimestamp == null || nextTimeStamp.compareTo(mostRecentTimestamp) > 0
- ? nextTimeStamp : mostRecentTimestamp;
+ maxComponentSequence =
+ Math.max(maxComponentSequence, IndexComponentFileReference.of(file).getSequenceEnd());
}
- indexCheckpointManager.init(mostRecentTimestamp, currentLSN);
+ indexCheckpointManager.init(maxComponentSequence, currentLSN);
}
ReplicationProtocol.sendAck(worker.getChannel(), worker.getReusableBuffer());
}
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ComponentMaskTask.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ComponentMaskTask.java
index d5dc51d..55dd5d4 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ComponentMaskTask.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ComponentMaskTask.java
@@ -63,8 +63,8 @@
final IIOManager ioManager = appCtx.getIoManager();
final FileReference localPath = ioManager.resolve(componentFile);
final Path resourceDir = Files.createDirectories(localPath.getFile().getParentFile().toPath());
- final String componentId = PersistentLocalResourceRepository.getComponentId(componentFile);
- return Paths.get(resourceDir.toString(), StorageConstants.COMPONENT_MASK_FILE_PREFIX + componentId);
+ final String componentSequence = PersistentLocalResourceRepository.getComponentSequence(componentFile);
+ return Paths.get(resourceDir.toString(), StorageConstants.COMPONENT_MASK_FILE_PREFIX + componentSequence);
}
@Override
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/MarkComponentValidTask.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/MarkComponentValidTask.java
index a4f9b43..b360a09 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/MarkComponentValidTask.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/MarkComponentValidTask.java
@@ -37,6 +37,7 @@
import org.apache.asterix.replication.sync.IndexSynchronizer;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndexFileManager;
+import org.apache.hyracks.storage.am.lsm.common.impls.IndexComponentFileReference;
/**
* A task to mark a replicated LSM component as valid
@@ -57,7 +58,7 @@
public void perform(INcApplicationContext appCtx, IReplicationWorker worker) {
try {
if (masterLsn == IndexSynchronizer.BULKLOAD_LSN) {
- updateBulkLoadedLastComponentTimestamp(appCtx);
+ updateBulkLoadedLastComponentSequence(appCtx);
} else if (masterLsn != IndexSynchronizer.MERGE_LSN) {
ensureComponentLsnFlushed(appCtx);
}
@@ -70,13 +71,12 @@
}
}
- private void updateBulkLoadedLastComponentTimestamp(INcApplicationContext appCtx) throws HyracksDataException {
+ private void updateBulkLoadedLastComponentSequence(INcApplicationContext appCtx) throws HyracksDataException {
final ResourceReference indexRef = ResourceReference.of(file);
final IIndexCheckpointManagerProvider checkpointManagerProvider = appCtx.getIndexCheckpointManagerProvider();
final IIndexCheckpointManager indexCheckpointManager = checkpointManagerProvider.get(indexRef);
- final String componentEndTime = AbstractLSMIndexFileManager.getComponentEndTime(indexRef.getName());
- indexCheckpointManager.advanceValidComponentTimestamp(componentEndTime);
-
+ final long componentSequence = IndexComponentFileReference.of(indexRef.getName()).getSequenceEnd();
+ indexCheckpointManager.advanceValidComponentSequence(componentSequence);
}
private void ensureComponentLsnFlushed(INcApplicationContext appCtx)
@@ -95,8 +95,8 @@
indexCheckpointManager.wait(replicationTimeOut);
replicationTimeOut -= TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTime);
}
- final String componentEndTime = AbstractLSMIndexFileManager.getComponentEndTime(indexRef.getName());
- indexCheckpointManager.replicated(componentEndTime, masterLsn, lastComponentId);
+ final long componentSequence = IndexComponentFileReference.of(indexRef.getName()).getSequenceEnd();
+ indexCheckpointManager.replicated(componentSequence, masterLsn, lastComponentId);
}
}
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ReplicateFileTask.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ReplicateFileTask.java
index 20663d1..f53d448 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ReplicateFileTask.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ReplicateFileTask.java
@@ -98,7 +98,7 @@
final IIndexCheckpointManager indexCheckpointManager = checkpointManagerProvider.get(indexRef);
final long currentLSN = appCtx.getTransactionSubsystem().getLogManager().getAppendLSN();
indexCheckpointManager.delete();
- indexCheckpointManager.init(null, currentLSN);
+ indexCheckpointManager.init(Long.MIN_VALUE, currentLSN);
LOGGER.info(() -> "Checkpoint index: " + indexRef);
}
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
index f9718c4..c0da095 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
@@ -22,7 +22,6 @@
import static org.apache.asterix.common.utils.StorageConstants.METADATA_FILE_NAME;
import static org.apache.hyracks.api.exceptions.ErrorCode.CANNOT_CREATE_FILE;
import static org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndexFileManager.COMPONENT_FILES_FILTER;
-import static org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndexFileManager.COMPONENT_TIMESTAMP_FORMAT;
import java.io.File;
import java.io.FilenameFilter;
@@ -30,12 +29,9 @@
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
-import java.text.Format;
import java.text.ParseException;
-import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Collection;
-import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@@ -71,6 +67,7 @@
import org.apache.hyracks.api.util.IoUtil;
import org.apache.hyracks.storage.am.common.api.ITreeIndexFrame;
import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndexFileManager;
+import org.apache.hyracks.storage.am.lsm.common.impls.IndexComponentFileReference;
import org.apache.hyracks.storage.common.ILocalResourceRepository;
import org.apache.hyracks.storage.common.LocalResource;
import org.apache.hyracks.util.ExitUtil;
@@ -129,9 +126,6 @@
}
};
- private static final ThreadLocal<SimpleDateFormat> THREAD_LOCAL_FORMATTER =
- ThreadLocal.withInitial(() -> new SimpleDateFormat(COMPONENT_TIMESTAMP_FORMAT));
-
// Finals
private final IIOManager ioManager;
private final Cache<String, LocalResource> resourceCache;
@@ -202,7 +196,7 @@
byte[] bytes = OBJECT_MAPPER.writeValueAsBytes(resource.toJson(persistedResourceRegistry));
final Path path = Paths.get(resourceFile.getAbsolutePath());
Files.write(path, bytes);
- indexCheckpointManagerProvider.get(DatasetResourceReference.of(resource)).init(null, 0);
+ indexCheckpointManagerProvider.get(DatasetResourceReference.of(resource)).init(Long.MIN_VALUE, 0);
deleteResourceFileMask(resourceFile);
} catch (Exception e) {
cleanup(resourceFile);
@@ -481,30 +475,18 @@
}
private void deleteIndexInvalidComponents(File index) throws IOException, ParseException {
- final Format formatter = THREAD_LOCAL_FORMATTER.get();
final File[] indexComponentFiles = index.listFiles(COMPONENT_FILES_FILTER);
if (indexComponentFiles == null) {
throw new IOException(index + " doesn't exist or an IO error occurred");
}
- final Optional<String> validComponentTimestamp = getIndexCheckpointManager(index).getValidComponentTimestamp();
- if (!validComponentTimestamp.isPresent()) {
- // index doesn't have any valid component, delete all
- for (File componentFile : indexComponentFiles) {
+ final long validComponentSequence = getIndexCheckpointManager(index).getValidComponentSequence();
+ for (File componentFile : indexComponentFiles) {
+ // delete any file with start sequence > valid component sequence
+ final long fileStart = IndexComponentFileReference.of(componentFile.getName()).getSequenceStart();
+ if (fileStart > validComponentSequence) {
LOGGER.info(() -> "Deleting invalid component file: " + componentFile.getAbsolutePath());
Files.delete(componentFile.toPath());
}
- } else {
- final Date validTimestamp = (Date) formatter.parseObject(validComponentTimestamp.get());
- for (File componentFile : indexComponentFiles) {
- // delete any file with startTime > validTimestamp
- final String fileStartTimeStr =
- AbstractLSMIndexFileManager.getComponentStartTime(componentFile.getName());
- final Date fileStartTime = (Date) formatter.parseObject(fileStartTimeStr);
- if (fileStartTime.after(validTimestamp)) {
- LOGGER.info(() -> "Deleting invalid component file: " + componentFile.getAbsolutePath());
- Files.delete(componentFile.toPath());
- }
- }
}
}
@@ -545,8 +527,8 @@
long fileSize = file.length();
totalSize += fileSize;
if (isComponentFile(resolvedPath.getFile(), file.getName())) {
- String componentId = getComponentId(file.getAbsolutePath());
- componentsStats.put(componentId, componentsStats.getOrDefault(componentId, 0L) + fileSize);
+ String componentSeq = getComponentSequence(file.getAbsolutePath());
+ componentsStats.put(componentSeq, componentsStats.getOrDefault(componentSeq, 0L) + fileSize);
}
}
}
@@ -576,17 +558,16 @@
}
/**
- * Gets a component id based on its unique timestamp.
- * e.g. a component file 2018-01-08-01-08-50-439_2018-01-08-01-08-50-439_b
- * will return a component id 2018-01-08-01-08-50-439_2018-01-08-01-08-50-439
+ * Gets a component sequence based on its unique timestamp.
+ * e.g. a component file 1_3_b
+ * will return a component sequence 1_3
*
- * @param componentFile
- * any component file
- * @return The component id
+ * @param componentFile any component file
+ * @return The component sequence
*/
- public static String getComponentId(String componentFile) {
+ public static String getComponentSequence(String componentFile) {
final ResourceReference ref = ResourceReference.of(componentFile);
- return ref.getName().substring(0, ref.getName().lastIndexOf(AbstractLSMIndexFileManager.DELIMITER));
+ return IndexComponentFileReference.of(ref.getName()).getSequence();
}
private static boolean isComponentMask(File mask) {
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeFileManager.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeFileManager.java
index 7fbde73..2240fd9 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeFileManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeFileManager.java
@@ -34,6 +34,7 @@
import org.apache.hyracks.api.util.IoUtil;
import org.apache.hyracks.storage.am.common.api.ITreeIndex;
import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndexFileManager;
+import org.apache.hyracks.storage.am.lsm.common.impls.IndexComponentFileReference;
import org.apache.hyracks.storage.am.lsm.common.impls.LSMComponentFileReferences;
import org.apache.hyracks.storage.am.lsm.common.impls.TreeIndexFactory;
@@ -53,21 +54,14 @@
@Override
public LSMComponentFileReferences getRelFlushFileReference() throws HyracksDataException {
- String ts = getCurrentTimestamp();
- String baseName = ts + DELIMITER + ts;
- // Begin timestamp and end timestamp are identical since it is a flush
+ String baseName = getNextComponentSequence(btreeFilter);
return new LSMComponentFileReferences(baseDir.getChild(baseName + DELIMITER + BTREE_SUFFIX), null,
hasBloomFilter ? baseDir.getChild(baseName + DELIMITER + BLOOM_FILTER_SUFFIX) : null);
}
@Override
- public LSMComponentFileReferences getRelMergeFileReference(String firstFileName, String lastFileName)
- throws HyracksDataException {
- String[] firstTimestampRange = firstFileName.split(DELIMITER);
- String[] lastTimestampRange = lastFileName.split(DELIMITER);
-
- String baseName = firstTimestampRange[0] + DELIMITER + lastTimestampRange[1];
- // Get the range of timestamps by taking the earliest and the latest timestamps
+ public LSMComponentFileReferences getRelMergeFileReference(String firstFileName, String lastFileName) {
+ final String baseName = IndexComponentFileReference.getMergeSequence(firstFileName, lastFileName);
return new LSMComponentFileReferences(baseDir.getChild(baseName + DELIMITER + BTREE_SUFFIX), null,
hasBloomFilter ? baseDir.getChild(baseName + DELIMITER + BLOOM_FILTER_SUFFIX) : null);
}
@@ -75,17 +69,17 @@
@Override
public List<LSMComponentFileReferences> cleanupAndGetValidFiles() throws HyracksDataException {
List<LSMComponentFileReferences> validFiles = new ArrayList<>();
- ArrayList<ComparableFileName> allBTreeFiles = new ArrayList<>();
- ArrayList<ComparableFileName> allBloomFilterFiles = new ArrayList<>();
+ ArrayList<IndexComponentFileReference> allBTreeFiles = new ArrayList<>();
+ ArrayList<IndexComponentFileReference> allBloomFilterFiles = new ArrayList<>();
// create transaction filter <to hide transaction files>
FilenameFilter transactionFilter = getTransactionFileFilter(false);
// List of valid BTree files.
cleanupAndGetValidFilesInternal(getCompoundFilter(transactionFilter, btreeFilter), btreeFactory, allBTreeFiles,
btreeFactory.getBufferCache());
HashSet<String> btreeFilesSet = new HashSet<>();
- for (ComparableFileName cmpFileName : allBTreeFiles) {
- int index = cmpFileName.fileName.lastIndexOf(DELIMITER);
- btreeFilesSet.add(cmpFileName.fileName.substring(0, index));
+ for (IndexComponentFileReference cmpFileName : allBTreeFiles) {
+ int index = cmpFileName.getFileName().lastIndexOf(DELIMITER);
+ btreeFilesSet.add(cmpFileName.getFileName().substring(0, index));
}
if (hasBloomFilter) {
@@ -104,53 +98,51 @@
// Special case: sorting is not required
if (allBTreeFiles.size() == 1 && (!hasBloomFilter || allBloomFilterFiles.size() == 1)) {
- validFiles.add(new LSMComponentFileReferences(allBTreeFiles.get(0).fileRef, null,
- hasBloomFilter ? allBloomFilterFiles.get(0).fileRef : null));
+ validFiles.add(new LSMComponentFileReferences(allBTreeFiles.get(0).getFileRef(), null,
+ hasBloomFilter ? allBloomFilterFiles.get(0).getFileRef() : null));
return validFiles;
}
- // Sorts files names from earliest to latest timestamp.
+ // Sorts files names from earliest to latest sequence.
Collections.sort(allBTreeFiles);
if (hasBloomFilter) {
Collections.sort(allBloomFilterFiles);
}
- List<ComparableFileName> validComparableBTreeFiles = new ArrayList<>();
- ComparableFileName lastBTree = allBTreeFiles.get(0);
+ List<IndexComponentFileReference> validComparableBTreeFiles = new ArrayList<>();
+ IndexComponentFileReference lastBTree = allBTreeFiles.get(0);
validComparableBTreeFiles.add(lastBTree);
- List<ComparableFileName> validComparableBloomFilterFiles = null;
- ComparableFileName lastBloomFilter = null;
+ List<IndexComponentFileReference> validComparableBloomFilterFiles = null;
+ IndexComponentFileReference lastBloomFilter = null;
if (hasBloomFilter) {
validComparableBloomFilterFiles = new ArrayList<>();
lastBloomFilter = allBloomFilterFiles.get(0);
validComparableBloomFilterFiles.add(lastBloomFilter);
}
- ComparableFileName currentBTree;
- ComparableFileName currentBloomFilter = null;
+ IndexComponentFileReference currentBTree;
+ IndexComponentFileReference currentBloomFilter = null;
for (int i = 1; i < allBTreeFiles.size(); i++) {
currentBTree = allBTreeFiles.get(i);
if (hasBloomFilter) {
currentBloomFilter = allBloomFilterFiles.get(i);
}
- // Current start timestamp is greater than last stop timestamp.
- if (currentBTree.interval[0].compareTo(lastBTree.interval[1]) > 0
- && (!hasBloomFilter || currentBloomFilter.interval[0].compareTo(lastBloomFilter.interval[1]) > 0)) {
+ // Current start sequence is greater than last stop sequence.
+ if (currentBTree.isMoreRecentThan(lastBTree)
+ && (!hasBloomFilter || currentBloomFilter.isMoreRecentThan(lastBloomFilter))) {
validComparableBTreeFiles.add(currentBTree);
lastBTree = currentBTree;
if (hasBloomFilter) {
validComparableBloomFilterFiles.add(currentBloomFilter);
lastBloomFilter = currentBloomFilter;
}
- } else if (currentBTree.interval[0].compareTo(lastBTree.interval[0]) >= 0
- && currentBTree.interval[1].compareTo(lastBTree.interval[1]) <= 0
- && (!hasBloomFilter || (currentBloomFilter.interval[0].compareTo(lastBloomFilter.interval[0]) >= 0
- && currentBloomFilter.interval[1].compareTo(lastBloomFilter.interval[1]) <= 0))) {
+ } else if (currentBTree.isWithin(lastBTree)
+ && (!hasBloomFilter || currentBloomFilter.isWithin(lastBloomFilter))) {
// Invalid files are completely contained in last interval.
- delete(btreeFactory.getBufferCache(), currentBTree.fullPath);
+ delete(btreeFactory.getBufferCache(), currentBTree.getFullPath());
if (hasBloomFilter) {
- delete(btreeFactory.getBufferCache(), currentBloomFilter.fullPath);
+ delete(btreeFactory.getBufferCache(), currentBloomFilter.getFullPath());
}
} else {
// This scenario should not be possible.
@@ -161,21 +153,21 @@
// Sort valid files in reverse lexicographical order, such that newer
// files come first.
Collections.sort(validComparableBTreeFiles, recencyCmp);
- Iterator<ComparableFileName> btreeFileIter = validComparableBTreeFiles.iterator();
- Iterator<ComparableFileName> bloomFilterFileIter = null;
+ Iterator<IndexComponentFileReference> btreeFileIter = validComparableBTreeFiles.iterator();
+ Iterator<IndexComponentFileReference> bloomFilterFileIter = null;
if (hasBloomFilter) {
Collections.sort(validComparableBloomFilterFiles, recencyCmp);
bloomFilterFileIter = validComparableBloomFilterFiles.iterator();
}
- ComparableFileName cmpBTreeFileName = null;
- ComparableFileName cmpBloomFilterFileName = null;
+ IndexComponentFileReference cmpBTreeFileName = null;
+ IndexComponentFileReference cmpBloomFilterFileName = null;
while (btreeFileIter.hasNext() && (hasBloomFilter ? bloomFilterFileIter.hasNext() : true)) {
cmpBTreeFileName = btreeFileIter.next();
if (hasBloomFilter) {
cmpBloomFilterFileName = bloomFilterFileIter.next();
}
- validFiles.add(new LSMComponentFileReferences(cmpBTreeFileName.fileRef, null,
- hasBloomFilter ? cmpBloomFilterFileName.fileRef : null));
+ validFiles.add(new LSMComponentFileReferences(cmpBTreeFileName.getFileRef(), null,
+ hasBloomFilter ? cmpBloomFilterFileName.getFileRef() : null));
}
return validFiles;
@@ -183,11 +175,10 @@
@Override
public LSMComponentFileReferences getNewTransactionFileReference() throws IOException {
- String ts = getCurrentTimestamp();
+ String sequence = getNextComponentSequence(btreeFilter);
// Create transaction lock file
- IoUtil.create(baseDir.getChild(TXN_PREFIX + ts));
- String baseName = ts + DELIMITER + ts;
- // Begin timestamp and end timestamp are identical since it is a transaction
+ IoUtil.create(baseDir.getChild(TXN_PREFIX + sequence));
+ String baseName = getNextComponentSequence(btreeFilter);
return new LSMComponentFileReferences(baseDir.getChild(baseName + DELIMITER + BTREE_SUFFIX), null,
baseDir.getChild(baseName + DELIMITER + BLOOM_FILTER_SUFFIX));
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeWithBuddyFileManager.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeWithBuddyFileManager.java
index 23c2367..8fb3751 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeWithBuddyFileManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeWithBuddyFileManager.java
@@ -35,6 +35,7 @@
import org.apache.hyracks.api.util.IoUtil;
import org.apache.hyracks.storage.am.common.api.ITreeIndex;
import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndexFileManager;
+import org.apache.hyracks.storage.am.lsm.common.impls.IndexComponentFileReference;
import org.apache.hyracks.storage.am.lsm.common.impls.LSMComponentFileReferences;
import org.apache.hyracks.storage.am.lsm.common.impls.TreeIndexFactory;
@@ -58,23 +59,15 @@
@Override
public LSMComponentFileReferences getRelFlushFileReference() throws HyracksDataException {
- String ts = getCurrentTimestamp();
- String baseName = ts + DELIMITER + ts;
- // Begin timestamp and end timestamp are identical since it is a flush
+ String baseName = getNextComponentSequence(btreeFilter);
return new LSMComponentFileReferences(baseDir.getChild(baseName + DELIMITER + BTREE_SUFFIX),
baseDir.getChild(baseName + DELIMITER + DELETE_TREE_SUFFIX),
baseDir.getChild(baseName + DELIMITER + BLOOM_FILTER_SUFFIX));
}
@Override
- public LSMComponentFileReferences getRelMergeFileReference(String firstFileName, String lastFileName)
- throws HyracksDataException {
- String[] firstTimestampRange = firstFileName.split(DELIMITER);
- String[] lastTimestampRange = lastFileName.split(DELIMITER);
-
- String baseName = firstTimestampRange[0] + DELIMITER + lastTimestampRange[1];
- // Get the range of timestamps by taking the earliest and the latest
- // timestamps
+ public LSMComponentFileReferences getRelMergeFileReference(String firstFileName, String lastFileName) {
+ final String baseName = IndexComponentFileReference.getMergeSequence(firstFileName, lastFileName);
return new LSMComponentFileReferences(baseDir.getChild(baseName + DELIMITER + BTREE_SUFFIX),
baseDir.getChild(baseName + DELIMITER + DELETE_TREE_SUFFIX),
baseDir.getChild(baseName + DELIMITER + BLOOM_FILTER_SUFFIX));
@@ -83,18 +76,17 @@
@Override
public List<LSMComponentFileReferences> cleanupAndGetValidFiles() throws HyracksDataException {
List<LSMComponentFileReferences> validFiles = new ArrayList<>();
- ArrayList<ComparableFileName> allBTreeFiles = new ArrayList<>();
- ArrayList<ComparableFileName> allBuddyBTreeFiles = new ArrayList<>();
- ArrayList<ComparableFileName> allBloomFilterFiles = new ArrayList<>();
+ ArrayList<IndexComponentFileReference> allBTreeFiles = new ArrayList<>();
+ ArrayList<IndexComponentFileReference> allBuddyBTreeFiles = new ArrayList<>();
+ ArrayList<IndexComponentFileReference> allBloomFilterFiles = new ArrayList<>();
// Create transaction file filter
FilenameFilter transactionFilefilter = getTransactionFileFilter(false);
// Gather files.
cleanupAndGetValidFilesInternal(getCompoundFilter(btreeFilter, transactionFilefilter), btreeFactory,
allBTreeFiles, btreeFactory.getBufferCache());
HashSet<String> btreeFilesSet = new HashSet<>();
- for (ComparableFileName cmpFileName : allBTreeFiles) {
- int index = cmpFileName.fileName.lastIndexOf(DELIMITER);
- btreeFilesSet.add(cmpFileName.fileName.substring(0, index));
+ for (IndexComponentFileReference cmpFileName : allBTreeFiles) {
+ btreeFilesSet.add(cmpFileName.getSequence());
}
validateFiles(btreeFilesSet, allBuddyBTreeFiles, getCompoundFilter(buddyBtreeFilter, transactionFilefilter),
buddyBtreeFactory, btreeFactory.getBufferCache());
@@ -109,52 +101,47 @@
return validFiles;
}
if (allBTreeFiles.size() == 1 && allBuddyBTreeFiles.size() == 1 && allBloomFilterFiles.size() == 1) {
- validFiles.add(new LSMComponentFileReferences(allBTreeFiles.get(0).fileRef,
- allBuddyBTreeFiles.get(0).fileRef, allBloomFilterFiles.get(0).fileRef));
+ validFiles.add(new LSMComponentFileReferences(allBTreeFiles.get(0).getFileRef(),
+ allBuddyBTreeFiles.get(0).getFileRef(), allBloomFilterFiles.get(0).getFileRef()));
return validFiles;
}
- // Sorts files names from earliest to latest timestamp.
+ // Sorts files names from earliest to latest sequence.
Collections.sort(allBTreeFiles);
Collections.sort(allBuddyBTreeFiles);
Collections.sort(allBloomFilterFiles);
- List<ComparableFileName> validComparableBTreeFiles = new ArrayList<>();
- ComparableFileName lastBTree = allBTreeFiles.get(0);
+ List<IndexComponentFileReference> validComparableBTreeFiles = new ArrayList<>();
+ IndexComponentFileReference lastBTree = allBTreeFiles.get(0);
validComparableBTreeFiles.add(lastBTree);
- List<ComparableFileName> validComparableBuddyBTreeFiles = new ArrayList<>();
- ComparableFileName lastBuddyBTree = allBuddyBTreeFiles.get(0);
+ List<IndexComponentFileReference> validComparableBuddyBTreeFiles = new ArrayList<>();
+ IndexComponentFileReference lastBuddyBTree = allBuddyBTreeFiles.get(0);
validComparableBuddyBTreeFiles.add(lastBuddyBTree);
- List<ComparableFileName> validComparableBloomFilterFiles = new ArrayList<>();
- ComparableFileName lastBloomFilter = allBloomFilterFiles.get(0);
+ List<IndexComponentFileReference> validComparableBloomFilterFiles = new ArrayList<>();
+ IndexComponentFileReference lastBloomFilter = allBloomFilterFiles.get(0);
validComparableBloomFilterFiles.add(lastBloomFilter);
for (int i = 1; i < allBTreeFiles.size(); i++) {
- ComparableFileName currentBTree = allBTreeFiles.get(i);
- ComparableFileName currentBuddyBTree = allBuddyBTreeFiles.get(i);
- ComparableFileName currentBloomFilter = allBloomFilterFiles.get(i);
- // Current start timestamp is greater than last stop timestamp.
- if (currentBTree.interval[0].compareTo(lastBTree.interval[1]) > 0
- && currentBuddyBTree.interval[0].compareTo(lastBuddyBTree.interval[1]) > 0
- && currentBloomFilter.interval[0].compareTo(lastBloomFilter.interval[1]) > 0) {
+ IndexComponentFileReference currentBTree = allBTreeFiles.get(i);
+ IndexComponentFileReference currentBuddyBTree = allBuddyBTreeFiles.get(i);
+ IndexComponentFileReference currentBloomFilter = allBloomFilterFiles.get(i);
+ // Current start sequence is greater than last stop sequence
+ if (currentBTree.isMoreRecentThan(lastBTree) && currentBuddyBTree.isMoreRecentThan(lastBuddyBTree)
+ && currentBloomFilter.isMoreRecentThan(lastBloomFilter)) {
validComparableBTreeFiles.add(currentBTree);
validComparableBuddyBTreeFiles.add(currentBuddyBTree);
validComparableBloomFilterFiles.add(currentBloomFilter);
lastBTree = currentBTree;
lastBuddyBTree = currentBuddyBTree;
lastBloomFilter = currentBloomFilter;
- } else if (currentBTree.interval[0].compareTo(lastBTree.interval[0]) >= 0
- && currentBTree.interval[1].compareTo(lastBTree.interval[1]) <= 0
- && currentBuddyBTree.interval[0].compareTo(lastBuddyBTree.interval[0]) >= 0
- && currentBuddyBTree.interval[1].compareTo(lastBuddyBTree.interval[1]) <= 0
- && currentBloomFilter.interval[0].compareTo(lastBloomFilter.interval[0]) >= 0
- && currentBloomFilter.interval[1].compareTo(lastBloomFilter.interval[1]) <= 0) {
- // Invalid files are completely contained in last interval.
- delete(treeFactory.getBufferCache(), currentBTree.fullPath);
- delete(treeFactory.getBufferCache(), currentBuddyBTree.fullPath);
- delete(treeFactory.getBufferCache(), currentBloomFilter.fullPath);
+ } else if (currentBTree.isWithin(lastBTree) && currentBuddyBTree.isWithin(lastBuddyBTree)
+ && currentBloomFilter.isWithin(lastBloomFilter)) {
+ // Invalid files are completely contained in last sequence.
+ delete(treeFactory.getBufferCache(), currentBTree.getFullPath());
+ delete(treeFactory.getBufferCache(), currentBuddyBTree.getFullPath());
+ delete(treeFactory.getBufferCache(), currentBloomFilter.getFullPath());
} else {
// This scenario should not be possible.
throw HyracksDataException.create(ErrorCode.FOUND_OVERLAPPING_LSM_FILES, baseDir);
@@ -163,19 +150,19 @@
// Sort valid files in reverse lexicographical order, such that newer
// files come first.
- Collections.sort(validComparableBTreeFiles, recencyCmp);
- Collections.sort(validComparableBuddyBTreeFiles, recencyCmp);
- Collections.sort(validComparableBloomFilterFiles, recencyCmp);
+ validComparableBTreeFiles.sort(recencyCmp);
+ validComparableBuddyBTreeFiles.sort(recencyCmp);
+ validComparableBloomFilterFiles.sort(recencyCmp);
- Iterator<ComparableFileName> btreeFileIter = validComparableBTreeFiles.iterator();
- Iterator<ComparableFileName> buddyBtreeFileIter = validComparableBuddyBTreeFiles.iterator();
- Iterator<ComparableFileName> bloomFilterFileIter = validComparableBloomFilterFiles.iterator();
+ Iterator<IndexComponentFileReference> btreeFileIter = validComparableBTreeFiles.iterator();
+ Iterator<IndexComponentFileReference> buddyBtreeFileIter = validComparableBuddyBTreeFiles.iterator();
+ Iterator<IndexComponentFileReference> bloomFilterFileIter = validComparableBloomFilterFiles.iterator();
while (btreeFileIter.hasNext() && buddyBtreeFileIter.hasNext()) {
- ComparableFileName cmpBTreeFileName = btreeFileIter.next();
- ComparableFileName cmpBuddyBTreeFileName = buddyBtreeFileIter.next();
- ComparableFileName cmpBloomFilterFileName = bloomFilterFileIter.next();
- validFiles.add(new LSMComponentFileReferences(cmpBTreeFileName.fileRef, cmpBuddyBTreeFileName.fileRef,
- cmpBloomFilterFileName.fileRef));
+ IndexComponentFileReference cmpBTreeFileName = btreeFileIter.next();
+ IndexComponentFileReference cmpBuddyBTreeFileName = buddyBtreeFileIter.next();
+ IndexComponentFileReference cmpBloomFilterFileName = bloomFilterFileIter.next();
+ validFiles.add(new LSMComponentFileReferences(cmpBTreeFileName.getFileRef(),
+ cmpBuddyBTreeFileName.getFileRef(), cmpBloomFilterFileName.getFileRef()));
}
return validFiles;
@@ -183,10 +170,9 @@
@Override
public LSMComponentFileReferences getNewTransactionFileReference() throws IOException {
- String ts = getCurrentTimestamp();
// Create transaction lock file
- Files.createFile(Paths.get(baseDir + TXN_PREFIX + ts));
- String baseName = ts + DELIMITER + ts;
+ String baseName = getNextComponentSequence(btreeFilter);
+ Files.createFile(Paths.get(baseDir + TXN_PREFIX + baseName));
return new LSMComponentFileReferences(baseDir.getChild(baseName + DELIMITER + BTREE_SUFFIX),
baseDir.getChild(baseName + DELIMITER + DELETE_TREE_SUFFIX),
baseDir.getChild(baseName + DELIMITER + BLOOM_FILTER_SUFFIX));
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMComponentIdGenerator.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMComponentIdGenerator.java
index e6aa2d1..0af06d7 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMComponentIdGenerator.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMComponentIdGenerator.java
@@ -39,4 +39,10 @@
*/
int getCurrentComponentIndex();
+ /**
+ * Initializes this {@link ILSMComponentIdGenerator} by setting the last used id
+ *
+ * @param lastUsedId
+ */
+ void init(long lastUsedId);
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndexFileManager.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndexFileManager.java
index 57fd01d..1f481c9 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndexFileManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndexFileManager.java
@@ -21,12 +21,9 @@
import java.io.FilenameFilter;
import java.io.IOException;
-import java.text.Format;
-import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
-import java.util.Date;
import java.util.HashSet;
import java.util.List;
@@ -42,7 +39,9 @@
import org.apache.hyracks.storage.common.buffercache.IBufferCache;
import org.apache.hyracks.storage.common.buffercache.ICachedPage;
import org.apache.hyracks.storage.common.file.BufferedFileHandle;
+import org.apache.hyracks.util.annotations.NotThreadSafe;
+@NotThreadSafe
public abstract class AbstractLSMIndexFileManager implements ILSMIndexFileManager {
public enum TreeIndexState {
@@ -76,22 +75,18 @@
*/
public static final String TXN_PREFIX = ".T";
- public static final String COMPONENT_TIMESTAMP_FORMAT = "yyyy-MM-dd-HH-mm-ss-SSS";
-
public static final FilenameFilter COMPONENT_FILES_FILTER = (dir, name) -> !name.startsWith(".");
protected static final FilenameFilter txnFileNameFilter = (dir, name) -> name.startsWith(TXN_PREFIX);
protected static FilenameFilter bloomFilterFilter =
(dir, name) -> !name.startsWith(".") && name.endsWith(BLOOM_FILTER_SUFFIX);
- protected static final FilenameFilter dummyFilter = (dir, name) -> true;
protected static final Comparator<String> cmp = new FileNameComparator();
+ private static final FilenameFilter dummyFilter = (dir, name) -> true;
protected final IIOManager ioManager;
// baseDir should reflect dataset name and partition name and be absolute
protected final FileReference baseDir;
- protected final Format formatter = new SimpleDateFormat(COMPONENT_TIMESTAMP_FORMAT);
- protected final Comparator<ComparableFileName> recencyCmp = new RecencyComparator();
+ protected final Comparator<IndexComponentFileReference> recencyCmp = new RecencyComparator();
protected final TreeIndexFactory<? extends ITreeIndex> treeFactory;
- private String prevTimestamp = null;
public AbstractLSMIndexFileManager(IIOManager ioManager, FileReference file,
TreeIndexFactory<? extends ITreeIndex> treeFactory) {
@@ -131,18 +126,18 @@
}
protected void cleanupAndGetValidFilesInternal(FilenameFilter filter,
- TreeIndexFactory<? extends ITreeIndex> treeFactory, ArrayList<ComparableFileName> allFiles,
+ TreeIndexFactory<? extends ITreeIndex> treeFactory, ArrayList<IndexComponentFileReference> allFiles,
IBufferCache bufferCache) throws HyracksDataException {
String[] files = listDirFiles(baseDir, filter);
for (String fileName : files) {
FileReference fileRef = baseDir.getChild(fileName);
if (treeFactory == null) {
- allFiles.add(new ComparableFileName(fileRef));
+ allFiles.add(IndexComponentFileReference.of(fileRef));
continue;
}
TreeIndexState idxState = isValidTreeIndex(treeFactory.createIndexInstance(fileRef));
if (idxState == TreeIndexState.VALID) {
- allFiles.add(new ComparableFileName(fileRef));
+ allFiles.add(IndexComponentFileReference.of(fileRef));
} else if (idxState == TreeIndexState.INVALID) {
bufferCache.deleteFile(fileRef);
}
@@ -167,18 +162,16 @@
return files;
}
- protected void validateFiles(HashSet<String> groundTruth, ArrayList<ComparableFileName> validFiles,
+ protected void validateFiles(HashSet<String> groundTruth, ArrayList<IndexComponentFileReference> validFiles,
FilenameFilter filter, TreeIndexFactory<? extends ITreeIndex> treeFactory, IBufferCache bufferCache)
throws HyracksDataException {
- ArrayList<ComparableFileName> tmpAllInvListsFiles = new ArrayList<>();
+ ArrayList<IndexComponentFileReference> tmpAllInvListsFiles = new ArrayList<>();
cleanupAndGetValidFilesInternal(filter, treeFactory, tmpAllInvListsFiles, bufferCache);
- for (ComparableFileName cmpFileName : tmpAllInvListsFiles) {
- int index = cmpFileName.fileName.lastIndexOf(DELIMITER);
- String file = cmpFileName.fileName.substring(0, index);
- if (groundTruth.contains(file)) {
+ for (IndexComponentFileReference cmpFileName : tmpAllInvListsFiles) {
+ if (groundTruth.contains(cmpFileName.getSequence())) {
validFiles.add(cmpFileName);
} else {
- delete(bufferCache, cmpFileName.fullPath);
+ delete(bufferCache, cmpFileName.getFullPath());
}
}
}
@@ -198,30 +191,20 @@
@Override
public LSMComponentFileReferences getRelFlushFileReference() throws HyracksDataException {
- String ts = getCurrentTimestamp();
- // Begin timestamp and end timestamp are identical since it is a flush
- return new LSMComponentFileReferences(baseDir.getChild(ts + DELIMITER + ts), null, null);
+ final String sequence = getNextComponentSequence(COMPONENT_FILES_FILTER);
+ return new LSMComponentFileReferences(baseDir.getChild(sequence), null, null);
}
@Override
- public LSMComponentFileReferences getRelMergeFileReference(String firstFileName, String lastFileName)
- throws HyracksDataException {
- String[] firstTimestampRange = firstFileName.split(DELIMITER);
- String[] lastTimestampRange = lastFileName.split(DELIMITER);
- String start = firstTimestampRange[0];
- String end = lastTimestampRange[1];
- if (end.compareTo(start) <= 0) {
- throw new IllegalArgumentException(
- "A Merge file must have end greater than start. Found end: " + end + " and start: " + start);
- }
- // Get the range of timestamps by taking the earliest and the latest timestamps
- return new LSMComponentFileReferences(baseDir.getChild(start + DELIMITER + end), null, null);
+ public LSMComponentFileReferences getRelMergeFileReference(String firstFileName, String lastFileName) {
+ final String baseName = IndexComponentFileReference.getMergeSequence(firstFileName, lastFileName);
+ return new LSMComponentFileReferences(baseDir.getChild(baseName), null, null);
}
@Override
public List<LSMComponentFileReferences> cleanupAndGetValidFiles() throws HyracksDataException {
List<LSMComponentFileReferences> validFiles = new ArrayList<>();
- ArrayList<ComparableFileName> allFiles = new ArrayList<>();
+ ArrayList<IndexComponentFileReference> allFiles = new ArrayList<>();
// Gather files and delete invalid files
// There are two types of invalid files:
@@ -235,40 +218,37 @@
}
if (allFiles.size() == 1) {
- validFiles.add(new LSMComponentFileReferences(allFiles.get(0).fileRef, null, null));
+ validFiles.add(new LSMComponentFileReferences(allFiles.get(0).getFileRef(), null, null));
return validFiles;
}
- // Sorts files names from earliest to latest timestamp.
+ // Sorts files names from earliest to latest
Collections.sort(allFiles);
- List<ComparableFileName> validComparableFiles = new ArrayList<>();
- ComparableFileName last = allFiles.get(0);
+ List<IndexComponentFileReference> validComparableFiles = new ArrayList<>();
+ IndexComponentFileReference last = allFiles.get(0);
validComparableFiles.add(last);
for (int i = 1; i < allFiles.size(); i++) {
- ComparableFileName current = allFiles.get(i);
- // The current start timestamp is greater than last stop timestamp so current is valid.
- if (current.interval[0].compareTo(last.interval[1]) > 0) {
+ IndexComponentFileReference current = allFiles.get(i);
+ if (current.isMoreRecentThan(last)) {
+ // The current start sequence is greater than last stop sequence so current is valid.
validComparableFiles.add(current);
last = current;
- } else if (current.interval[0].compareTo(last.interval[0]) >= 0
- && current.interval[1].compareTo(last.interval[1]) <= 0) {
+ } else if (current.isWithin(last)) {
// The current file is completely contained in the interval of the
// last file. Thus the last file must contain at least as much information
// as the current file, so delete the current file.
- delete(treeFactory.getBufferCache(), current.fullPath);
+ delete(treeFactory.getBufferCache(), current.getFullPath());
} else {
// This scenario should not be possible since timestamps are monotonically increasing.
throw HyracksDataException.create(ErrorCode.FOUND_OVERLAPPING_LSM_FILES, baseDir);
}
}
-
// Sort valid files in reverse lexicographical order, such that newer files come first.
- Collections.sort(validComparableFiles, recencyCmp);
- for (ComparableFileName cmpFileName : validComparableFiles) {
- validFiles.add(new LSMComponentFileReferences(cmpFileName.fileRef, null, null));
+ validComparableFiles.sort(recencyCmp);
+ for (IndexComponentFileReference cmpFileName : validComparableFiles) {
+ validFiles.add(new LSMComponentFileReferences(cmpFileName.getFileRef(), null, null));
}
-
return validFiles;
}
@@ -287,8 +267,7 @@
private static class FileNameComparator implements Comparator<String> {
@Override
public int compare(String a, String b) {
- // Consciously ignoring locale.
- return -a.compareTo(b);
+ return IndexComponentFileReference.of(b).compareTo(IndexComponentFileReference.of(a));
}
}
@@ -309,45 +288,14 @@
}
}
- protected class ComparableFileName implements Comparable<ComparableFileName> {
- public final FileReference fileRef;
- public final String fullPath;
- public final String fileName;
-
- // Timestamp interval.
- public final String[] interval;
-
- public ComparableFileName(FileReference fileRef) {
- this.fileRef = fileRef;
- this.fullPath = fileRef.getFile().getAbsolutePath();
- this.fileName = fileRef.getFile().getName();
- interval = fileName.split(DELIMITER);
- }
-
+ private class RecencyComparator implements Comparator<IndexComponentFileReference> {
@Override
- public int compareTo(ComparableFileName b) {
- int startCmp = interval[0].compareTo(b.interval[0]);
+ public int compare(IndexComponentFileReference a, IndexComponentFileReference b) {
+ int startCmp = -Long.compare(a.getSequenceStart(), b.getSequenceStart());
if (startCmp != 0) {
return startCmp;
}
- return b.interval[1].compareTo(interval[1]);
- }
-
- @Override
- public String toString() {
- return "{\"type\" : \"" + (interval[0].equals(interval[1]) ? "flush" : "merge") + "\", \"start\" : \""
- + interval[0] + "\", \"end\" : \"" + interval[1] + "\"}";
- }
- }
-
- private class RecencyComparator implements Comparator<ComparableFileName> {
- @Override
- public int compare(ComparableFileName a, ComparableFileName b) {
- int cmp = -a.interval[0].compareTo(b.interval[0]);
- if (cmp != 0) {
- return cmp;
- }
- return -a.interval[1].compareTo(b.interval[1]);
+ return -Long.compare(a.getSequenceEnd(), b.getSequenceEnd());
}
}
@@ -382,10 +330,10 @@
return null;
}
- protected static FilenameFilter createTransactionFilter(String transactionFileName, final boolean inclusive) {
+ private static FilenameFilter createTransactionFilter(String transactionFileName, final boolean inclusive) {
final String timeStamp =
transactionFileName.substring(transactionFileName.indexOf(TXN_PREFIX) + TXN_PREFIX.length());
- return (dir, name) -> inclusive ? name.startsWith(timeStamp) : !name.startsWith(timeStamp);
+ return (dir, name) -> inclusive == name.startsWith(timeStamp);
}
protected FilenameFilter getTransactionFileFilter(boolean inclusive) throws HyracksDataException {
@@ -406,34 +354,12 @@
return (dir, name) -> filter1.accept(dir, name) && filter2.accept(dir, name);
}
- /**
- * @return The string format of the current timestamp.
- * The returned results of this method are guaranteed to not have duplicates.
- */
- protected String getCurrentTimestamp() {
- Date date = new Date();
- String ts = formatter.format(date);
- /**
- * prevent a corner case where the same timestamp can be given.
- */
- while (prevTimestamp != null && ts.compareTo(prevTimestamp) == 0) {
- try {
- Thread.sleep(1);
- date = new Date();
- ts = formatter.format(date);
- } catch (InterruptedException e) {
- //ignore
- }
+ protected String getNextComponentSequence(FilenameFilter filenameFilter) throws HyracksDataException {
+ long maxComponentSeq = -1;
+ final String[] files = listDirFiles(baseDir, filenameFilter);
+ for (String fileName : files) {
+ maxComponentSeq = Math.max(maxComponentSeq, IndexComponentFileReference.of(fileName).getSequenceEnd());
}
- prevTimestamp = ts;
- return ts;
- }
-
- public static String getComponentStartTime(String fileName) {
- return fileName.split(DELIMITER)[0];
- }
-
- public static String getComponentEndTime(String fileName) {
- return fileName.split(DELIMITER)[1];
+ return IndexComponentFileReference.getFlushSequence(maxComponentSeq + 1);
}
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/IndexComponentFileReference.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/IndexComponentFileReference.java
new file mode 100644
index 0000000..bbadf60
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/IndexComponentFileReference.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.storage.am.lsm.common.impls;
+
+import static org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndexFileManager.DELIMITER;
+
+import java.util.Objects;
+
+import org.apache.hyracks.api.io.FileReference;
+
+public class IndexComponentFileReference implements Comparable<IndexComponentFileReference> {
+
+ private FileReference fileRef;
+ private String fullPath;
+ private String fileName;
+ private long sequenceStart;
+ private long sequenceEnd;
+
+ private IndexComponentFileReference() {
+ }
+
+ public static IndexComponentFileReference of(String file) {
+ final IndexComponentFileReference ref = new IndexComponentFileReference();
+ ref.fileName = file;
+ final String[] splits = file.split(DELIMITER);
+ ref.sequenceStart = Long.parseLong(splits[0]);
+ ref.sequenceEnd = Long.parseLong(splits[1]);
+ return ref;
+ }
+
+ public static IndexComponentFileReference of(FileReference fileRef) {
+ final IndexComponentFileReference ref = of(fileRef.getFile().getName());
+ ref.fileRef = fileRef;
+ ref.fullPath = fileRef.getFile().getAbsolutePath();
+ return ref;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ IndexComponentFileReference that = (IndexComponentFileReference) o;
+ return Objects.equals(fileName, that.fileName);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(fileName);
+ }
+
+ @Override
+ public int compareTo(IndexComponentFileReference o) {
+ int startCmp = Long.compare(sequenceStart, o.sequenceStart);
+ if (startCmp != 0) {
+ return startCmp;
+ }
+ return Long.compare(o.sequenceEnd, sequenceEnd);
+ }
+
+ public String getFileName() {
+ return fileName;
+ }
+
+ public long getSequenceStart() {
+ return sequenceStart;
+ }
+
+ public long getSequenceEnd() {
+ return sequenceEnd;
+ }
+
+ public String getFullPath() {
+ return fullPath;
+ }
+
+ public FileReference getFileRef() {
+ return fileRef;
+ }
+
+ public String getSequence() {
+ return sequenceStart + DELIMITER + sequenceEnd;
+ }
+
+ public boolean isMoreRecentThan(IndexComponentFileReference other) {
+ return sequenceStart > other.getSequenceEnd();
+ }
+
+ public boolean isWithin(IndexComponentFileReference other) {
+ return sequenceStart >= other.getSequenceStart() && sequenceEnd <= other.getSequenceEnd();
+ }
+
+ @Override
+ public String toString() {
+ return "{\"type\" : \"" + (isFlush() ? "flush" : "merge") + "\", \"start\" : \"" + sequenceStart
+ + "\", \"end\" : \"" + sequenceEnd + "\"}";
+ }
+
+ private boolean isFlush() {
+ return sequenceStart == sequenceEnd;
+ }
+
+ public static String getFlushSequence(long componentSequence) {
+ return componentSequence + DELIMITER + componentSequence;
+ }
+
+ public static String getMergeSequence(String firstComponentName, String lastComponentName) {
+ long mergeSequenceStart = IndexComponentFileReference.of(firstComponentName).getSequenceStart();
+ long mergeSequenceEnd = IndexComponentFileReference.of(lastComponentName).getSequenceEnd();
+ return mergeSequenceStart + DELIMITER + mergeSequenceEnd;
+ }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMComponentId.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMComponentId.java
index c7990a6..cf6c4a2 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMComponentId.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMComponentId.java
@@ -24,12 +24,14 @@
public class LSMComponentId implements ILSMComponentId {
public static final long NOT_FOUND = -1;
+ public static final long MIN_VALID_COMPONENT_ID = 0;
// Used to represent an empty index with no components
public static final LSMComponentId EMPTY_INDEX_LAST_COMPONENT_ID = new LSMComponentId(NOT_FOUND, NOT_FOUND);
// A default component id used for bulk loaded component
- public static final LSMComponentId DEFAULT_COMPONENT_ID = new LSMComponentId(0, 0);
+ public static final LSMComponentId DEFAULT_COMPONENT_ID =
+ new LSMComponentId(MIN_VALID_COMPONENT_ID, MIN_VALID_COMPONENT_ID);
private long minId;
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMComponentIdGenerator.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMComponentIdGenerator.java
index 3da57fd..e6bf0ab 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMComponentIdGenerator.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMComponentIdGenerator.java
@@ -21,55 +21,52 @@
import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentId;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentIdGenerator;
+import org.apache.hyracks.util.annotations.ThreadSafe;
/**
* A default implementation of {@link ILSMComponentIdGenerator}.
- *
*/
+@ThreadSafe
public class LSMComponentIdGenerator implements ILSMComponentIdGenerator {
private final int numComponents;
private int currentComponentIndex;
- protected long previousTimestamp = -1L;
+ private long lastUsedId;
private ILSMComponentId componentId;
+ private boolean initialized = false;
public LSMComponentIdGenerator(int numComponents) {
this.numComponents = numComponents;
+ }
+
+ @Override
+ public synchronized void init(long lastUsedId) {
+ this.lastUsedId = lastUsedId;
+ initialized = true;
refresh();
currentComponentIndex = 0;
}
@Override
- public void refresh() {
- long ts = getCurrentTimestamp();
- componentId = new LSMComponentId(ts, ts);
+ public synchronized void refresh() {
+ if (!initialized) {
+ throw new IllegalStateException("Attempt to refresh component id before initialziation.");
+ }
+ final long nextId = ++lastUsedId;
+ componentId = new LSMComponentId(nextId, nextId);
currentComponentIndex = (currentComponentIndex + 1) % numComponents;
}
@Override
- public ILSMComponentId getId() {
+ public synchronized ILSMComponentId getId() {
+ if (!initialized) {
+ throw new IllegalStateException("Attempt to get component id before initialziation.");
+ }
return componentId;
}
@Override
- public int getCurrentComponentIndex() {
+ public synchronized int getCurrentComponentIndex() {
return currentComponentIndex;
}
-
- protected long getCurrentTimestamp() {
- long timestamp = System.currentTimeMillis();
- while (timestamp <= previousTimestamp) {
- // make sure timestamp is strictly increasing
- try {
- Thread.sleep(1);
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- }
- timestamp = System.currentTimeMillis();
- }
- previousTimestamp = timestamp;
- return timestamp;
-
- }
-
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexFileManager.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexFileManager.java
index 2f1eb87..4471102 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexFileManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexFileManager.java
@@ -32,6 +32,7 @@
import org.apache.hyracks.api.io.IIOManager;
import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndexFileManager;
import org.apache.hyracks.storage.am.lsm.common.impls.BTreeFactory;
+import org.apache.hyracks.storage.am.lsm.common.impls.IndexComponentFileReference;
import org.apache.hyracks.storage.am.lsm.common.impls.LSMComponentFileReferences;
import org.apache.hyracks.storage.am.lsm.invertedindex.api.IInvertedIndexFileNameMapper;
@@ -57,21 +58,15 @@
@Override
public LSMComponentFileReferences getRelFlushFileReference() throws HyracksDataException {
- String ts = getCurrentTimestamp();
- String baseName = ts + DELIMITER + ts;
- // Begin timestamp and end timestamp are identical since it is a flush
+ String baseName = getNextComponentSequence(deletedKeysBTreeFilter);
return new LSMComponentFileReferences(baseDir.getChild(baseName + DELIMITER + DICT_BTREE_SUFFIX),
baseDir.getChild(baseName + DELIMITER + DELETED_KEYS_BTREE_SUFFIX),
baseDir.getChild(baseName + DELIMITER + BLOOM_FILTER_SUFFIX));
}
@Override
- public LSMComponentFileReferences getRelMergeFileReference(String firstFileName, String lastFileName)
- throws HyracksDataException {
- String[] firstTimestampRange = firstFileName.split(DELIMITER);
- String[] lastTimestampRange = lastFileName.split(DELIMITER);
- String baseName = firstTimestampRange[0] + DELIMITER + lastTimestampRange[1];
- // Get the range of timestamps by taking the earliest and the latest timestamps
+ public LSMComponentFileReferences getRelMergeFileReference(String firstFileName, String lastFileName) {
+ final String baseName = IndexComponentFileReference.getMergeSequence(firstFileName, lastFileName);
return new LSMComponentFileReferences(baseDir.getChild(baseName + DELIMITER + DICT_BTREE_SUFFIX),
baseDir.getChild(baseName + DELIMITER + DELETED_KEYS_BTREE_SUFFIX),
baseDir.getChild(baseName + DELIMITER + BLOOM_FILTER_SUFFIX));
@@ -80,18 +75,17 @@
@Override
public List<LSMComponentFileReferences> cleanupAndGetValidFiles() throws HyracksDataException {
List<LSMComponentFileReferences> validFiles = new ArrayList<>();
- ArrayList<ComparableFileName> allDictBTreeFiles = new ArrayList<>();
- ArrayList<ComparableFileName> allInvListsFiles = new ArrayList<>();
- ArrayList<ComparableFileName> allDeletedKeysBTreeFiles = new ArrayList<>();
- ArrayList<ComparableFileName> allBloomFilterFiles = new ArrayList<>();
+ ArrayList<IndexComponentFileReference> allDictBTreeFiles = new ArrayList<>();
+ ArrayList<IndexComponentFileReference> allInvListsFiles = new ArrayList<>();
+ ArrayList<IndexComponentFileReference> allDeletedKeysBTreeFiles = new ArrayList<>();
+ ArrayList<IndexComponentFileReference> allBloomFilterFiles = new ArrayList<>();
// Gather files.
cleanupAndGetValidFilesInternal(deletedKeysBTreeFilter, btreeFactory, allDeletedKeysBTreeFiles,
btreeFactory.getBufferCache());
HashSet<String> deletedKeysBTreeFilesSet = new HashSet<>();
- for (ComparableFileName cmpFileName : allDeletedKeysBTreeFiles) {
- int index = cmpFileName.fileName.lastIndexOf(DELIMITER);
- deletedKeysBTreeFilesSet.add(cmpFileName.fileName.substring(0, index));
+ for (IndexComponentFileReference cmpFileName : allDeletedKeysBTreeFiles) {
+ deletedKeysBTreeFilesSet.add(cmpFileName.getSequence());
}
// TODO: do we really need to validate the inverted lists files or is validating the dict. BTrees is enough?
@@ -116,52 +110,48 @@
if (allDictBTreeFiles.size() == 1 && allInvListsFiles.size() == 1 && allDeletedKeysBTreeFiles.size() == 1
&& allBloomFilterFiles.size() == 1) {
- validFiles.add(new LSMComponentFileReferences(allDictBTreeFiles.get(0).fileRef,
- allDeletedKeysBTreeFiles.get(0).fileRef, allBloomFilterFiles.get(0).fileRef));
+ validFiles.add(new LSMComponentFileReferences(allDictBTreeFiles.get(0).getFileRef(),
+ allDeletedKeysBTreeFiles.get(0).getFileRef(), allBloomFilterFiles.get(0).getFileRef()));
return validFiles;
}
- // Sorts files names from earliest to latest timestamp.
+ // Sorts files names from earliest to latest sequence.
Collections.sort(allDeletedKeysBTreeFiles);
Collections.sort(allDictBTreeFiles);
Collections.sort(allBloomFilterFiles);
- List<ComparableFileName> validComparableDictBTreeFiles = new ArrayList<>();
- ComparableFileName lastDictBTree = allDictBTreeFiles.get(0);
+ List<IndexComponentFileReference> validComparableDictBTreeFiles = new ArrayList<>();
+ IndexComponentFileReference lastDictBTree = allDictBTreeFiles.get(0);
validComparableDictBTreeFiles.add(lastDictBTree);
- List<ComparableFileName> validComparableDeletedKeysBTreeFiles = new ArrayList<>();
- ComparableFileName lastDeletedKeysBTree = allDeletedKeysBTreeFiles.get(0);
+ List<IndexComponentFileReference> validComparableDeletedKeysBTreeFiles = new ArrayList<>();
+ IndexComponentFileReference lastDeletedKeysBTree = allDeletedKeysBTreeFiles.get(0);
validComparableDeletedKeysBTreeFiles.add(lastDeletedKeysBTree);
- List<ComparableFileName> validComparableBloomFilterFiles = new ArrayList<>();
- ComparableFileName lastBloomFilter = allBloomFilterFiles.get(0);
+ List<IndexComponentFileReference> validComparableBloomFilterFiles = new ArrayList<>();
+ IndexComponentFileReference lastBloomFilter = allBloomFilterFiles.get(0);
validComparableBloomFilterFiles.add(lastBloomFilter);
for (int i = 1; i < allDictBTreeFiles.size(); i++) {
- ComparableFileName currentDeletedKeysBTree = allDeletedKeysBTreeFiles.get(i);
- ComparableFileName currentDictBTree = allDictBTreeFiles.get(i);
- ComparableFileName currentBloomFilter = allBloomFilterFiles.get(i);
- // Current start timestamp is greater than last stop timestamp.
- if (currentDeletedKeysBTree.interval[0].compareTo(lastDeletedKeysBTree.interval[1]) > 0
- && currentDictBTree.interval[0].compareTo(lastDictBTree.interval[1]) > 0
- && currentBloomFilter.interval[0].compareTo(lastBloomFilter.interval[1]) > 0) {
+ IndexComponentFileReference currentDeletedKeysBTree = allDeletedKeysBTreeFiles.get(i);
+ IndexComponentFileReference currentDictBTree = allDictBTreeFiles.get(i);
+ IndexComponentFileReference currentBloomFilter = allBloomFilterFiles.get(i);
+ // Current start sequence is greater than last stop sequence.
+ if (currentDeletedKeysBTree.isMoreRecentThan(lastDeletedKeysBTree)
+ && currentDictBTree.isMoreRecentThan(lastDictBTree)
+ && currentBloomFilter.isMoreRecentThan(lastBloomFilter)) {
validComparableDictBTreeFiles.add(currentDictBTree);
validComparableDeletedKeysBTreeFiles.add(currentDeletedKeysBTree);
validComparableBloomFilterFiles.add(currentBloomFilter);
lastDictBTree = currentDictBTree;
lastDeletedKeysBTree = currentDeletedKeysBTree;
lastBloomFilter = currentBloomFilter;
- } else if (currentDeletedKeysBTree.interval[0].compareTo(lastDeletedKeysBTree.interval[0]) >= 0
- && currentDeletedKeysBTree.interval[1].compareTo(lastDeletedKeysBTree.interval[1]) <= 0
- && currentDictBTree.interval[0].compareTo(lastDictBTree.interval[0]) >= 0
- && currentDictBTree.interval[1].compareTo(lastDictBTree.interval[1]) <= 0
- && currentBloomFilter.interval[0].compareTo(lastBloomFilter.interval[0]) >= 0
- && currentBloomFilter.interval[1].compareTo(lastBloomFilter.interval[1]) <= 0) {
- // Invalid files are completely contained in last interval.
- delete(treeFactory.getBufferCache(), currentDeletedKeysBTree.fullPath);
- delete(treeFactory.getBufferCache(), currentDictBTree.fullPath);
- delete(treeFactory.getBufferCache(), currentBloomFilter.fullPath);
+ } else if (currentDeletedKeysBTree.isWithin(lastDeletedKeysBTree)
+ && currentDictBTree.isWithin(lastDictBTree) && currentBloomFilter.isWithin(lastBloomFilter)) {
+ // Invalid files are completely contained in last sequence.
+ delete(treeFactory.getBufferCache(), currentDeletedKeysBTree.getFullPath());
+ delete(treeFactory.getBufferCache(), currentDictBTree.getFullPath());
+ delete(treeFactory.getBufferCache(), currentBloomFilter.getFullPath());
} else {
// This scenario should not be possible.
throw HyracksDataException.create(ErrorCode.FOUND_OVERLAPPING_LSM_FILES, baseDir);
@@ -170,21 +160,20 @@
// Sort valid files in reverse lexicographical order, such that newer
// files come first.
- Collections.sort(validComparableDictBTreeFiles, recencyCmp);
- Collections.sort(validComparableDeletedKeysBTreeFiles, recencyCmp);
- Collections.sort(validComparableBloomFilterFiles, recencyCmp);
+ validComparableDictBTreeFiles.sort(recencyCmp);
+ validComparableDeletedKeysBTreeFiles.sort(recencyCmp);
+ validComparableBloomFilterFiles.sort(recencyCmp);
- Iterator<ComparableFileName> dictBTreeFileIter = validComparableDictBTreeFiles.iterator();
- Iterator<ComparableFileName> deletedKeysBTreeIter = validComparableDeletedKeysBTreeFiles.iterator();
- Iterator<ComparableFileName> bloomFilterFileIter = validComparableBloomFilterFiles.iterator();
+ Iterator<IndexComponentFileReference> dictBTreeFileIter = validComparableDictBTreeFiles.iterator();
+ Iterator<IndexComponentFileReference> deletedKeysBTreeIter = validComparableDeletedKeysBTreeFiles.iterator();
+ Iterator<IndexComponentFileReference> bloomFilterFileIter = validComparableBloomFilterFiles.iterator();
while (dictBTreeFileIter.hasNext() && deletedKeysBTreeIter.hasNext()) {
- ComparableFileName cmpDictBTreeFile = dictBTreeFileIter.next();
- ComparableFileName cmpDeletedKeysBTreeFile = deletedKeysBTreeIter.next();
- ComparableFileName cmpBloomFilterFileName = bloomFilterFileIter.next();
- validFiles.add(new LSMComponentFileReferences(cmpDictBTreeFile.fileRef, cmpDeletedKeysBTreeFile.fileRef,
- cmpBloomFilterFileName.fileRef));
+ IndexComponentFileReference cmpDictBTreeFile = dictBTreeFileIter.next();
+ IndexComponentFileReference cmpDeletedKeysBTreeFile = deletedKeysBTreeIter.next();
+ IndexComponentFileReference cmpBloomFilterFileName = bloomFilterFileIter.next();
+ validFiles.add(new LSMComponentFileReferences(cmpDictBTreeFile.getFileRef(),
+ cmpDeletedKeysBTreeFile.getFileRef(), cmpBloomFilterFileName.getFileRef()));
}
-
return validFiles;
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeFileManager.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeFileManager.java
index 2512776..3348407 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeFileManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeFileManager.java
@@ -36,6 +36,7 @@
import org.apache.hyracks.api.util.IoUtil;
import org.apache.hyracks.storage.am.common.api.ITreeIndex;
import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndexFileManager;
+import org.apache.hyracks.storage.am.lsm.common.impls.IndexComponentFileReference;
import org.apache.hyracks.storage.am.lsm.common.impls.LSMComponentFileReferences;
import org.apache.hyracks.storage.am.lsm.common.impls.TreeIndexFactory;
@@ -58,22 +59,15 @@
@Override
public LSMComponentFileReferences getRelFlushFileReference() throws HyracksDataException {
- String ts = getCurrentTimestamp();
- String baseName = ts + DELIMITER + ts;
- // Begin timestamp and end timestamp are identical since it is a flush
+ String baseName = getNextComponentSequence(btreeFilter);
return new LSMComponentFileReferences(baseDir.getChild(baseName + DELIMITER + RTREE_SUFFIX),
baseDir.getChild(baseName + DELIMITER + BTREE_SUFFIX),
baseDir.getChild(baseName + DELIMITER + BLOOM_FILTER_SUFFIX));
}
@Override
- public LSMComponentFileReferences getRelMergeFileReference(String firstFileName, String lastFileName)
- throws HyracksDataException {
- String[] firstTimestampRange = firstFileName.split(DELIMITER);
- String[] lastTimestampRange = lastFileName.split(DELIMITER);
- String baseName = firstTimestampRange[0] + DELIMITER + lastTimestampRange[1];
- // Get the range of timestamps by taking the earliest and the latest
- // timestamps
+ public LSMComponentFileReferences getRelMergeFileReference(String firstFileName, String lastFileName) {
+ final String baseName = IndexComponentFileReference.getMergeSequence(firstFileName, lastFileName);
return new LSMComponentFileReferences(baseDir.getChild(baseName + DELIMITER + RTREE_SUFFIX),
baseDir.getChild(baseName + DELIMITER + BTREE_SUFFIX),
baseDir.getChild(baseName + DELIMITER + BLOOM_FILTER_SUFFIX));
@@ -82,9 +76,9 @@
@Override
public List<LSMComponentFileReferences> cleanupAndGetValidFiles() throws HyracksDataException {
List<LSMComponentFileReferences> validFiles = new ArrayList<>();
- ArrayList<ComparableFileName> allRTreeFiles = new ArrayList<>();
- ArrayList<ComparableFileName> allBTreeFiles = new ArrayList<>();
- ArrayList<ComparableFileName> allBloomFilterFiles = new ArrayList<>();
+ ArrayList<IndexComponentFileReference> allRTreeFiles = new ArrayList<>();
+ ArrayList<IndexComponentFileReference> allBTreeFiles = new ArrayList<>();
+ ArrayList<IndexComponentFileReference> allBloomFilterFiles = new ArrayList<>();
// Create a transaction filter <- to hide transaction components->
FilenameFilter transactionFilter = getTransactionFileFilter(false);
@@ -93,9 +87,8 @@
cleanupAndGetValidFilesInternal(getCompoundFilter(transactionFilter, btreeFilter), btreeFactory, allBTreeFiles,
btreeFactory.getBufferCache());
HashSet<String> btreeFilesSet = new HashSet<>();
- for (ComparableFileName cmpFileName : allBTreeFiles) {
- int index = cmpFileName.fileName.lastIndexOf(DELIMITER);
- btreeFilesSet.add(cmpFileName.fileName.substring(0, index));
+ for (IndexComponentFileReference cmpFileName : allBTreeFiles) {
+ btreeFilesSet.add(cmpFileName.getSequence());
}
validateFiles(btreeFilesSet, allRTreeFiles, getCompoundFilter(transactionFilter, rtreeFilter), rtreeFactory,
btreeFactory.getBufferCache());
@@ -113,52 +106,47 @@
}
if (allRTreeFiles.size() == 1 && allBTreeFiles.size() == 1 && allBloomFilterFiles.size() == 1) {
- validFiles.add(new LSMComponentFileReferences(allRTreeFiles.get(0).fileRef, allBTreeFiles.get(0).fileRef,
- allBloomFilterFiles.get(0).fileRef));
+ validFiles.add(new LSMComponentFileReferences(allRTreeFiles.get(0).getFileRef(),
+ allBTreeFiles.get(0).getFileRef(), allBloomFilterFiles.get(0).getFileRef()));
return validFiles;
}
- // Sorts files names from earliest to latest timestamp.
+ // Sorts files names from earliest to latest sequence.
Collections.sort(allRTreeFiles);
Collections.sort(allBTreeFiles);
Collections.sort(allBloomFilterFiles);
- List<ComparableFileName> validComparableRTreeFiles = new ArrayList<>();
- ComparableFileName lastRTree = allRTreeFiles.get(0);
+ List<IndexComponentFileReference> validComparableRTreeFiles = new ArrayList<>();
+ IndexComponentFileReference lastRTree = allRTreeFiles.get(0);
validComparableRTreeFiles.add(lastRTree);
- List<ComparableFileName> validComparableBTreeFiles = new ArrayList<>();
- ComparableFileName lastBTree = allBTreeFiles.get(0);
+ List<IndexComponentFileReference> validComparableBTreeFiles = new ArrayList<>();
+ IndexComponentFileReference lastBTree = allBTreeFiles.get(0);
validComparableBTreeFiles.add(lastBTree);
- List<ComparableFileName> validComparableBloomFilterFiles = new ArrayList<>();
- ComparableFileName lastBloomFilter = allBloomFilterFiles.get(0);
+ List<IndexComponentFileReference> validComparableBloomFilterFiles = new ArrayList<>();
+ IndexComponentFileReference lastBloomFilter = allBloomFilterFiles.get(0);
validComparableBloomFilterFiles.add(lastBloomFilter);
for (int i = 1; i < allRTreeFiles.size(); i++) {
- ComparableFileName currentRTree = allRTreeFiles.get(i);
- ComparableFileName currentBTree = allBTreeFiles.get(i);
- ComparableFileName currentBloomFilter = allBloomFilterFiles.get(i);
- // Current start timestamp is greater than last stop timestamp.
- if (currentRTree.interval[0].compareTo(lastRTree.interval[1]) > 0
- && currentBTree.interval[0].compareTo(lastBTree.interval[1]) > 0
- && currentBloomFilter.interval[0].compareTo(lastBloomFilter.interval[1]) > 0) {
+ IndexComponentFileReference currentRTree = allRTreeFiles.get(i);
+ IndexComponentFileReference currentBTree = allBTreeFiles.get(i);
+ IndexComponentFileReference currentBloomFilter = allBloomFilterFiles.get(i);
+ // Current start sequence is greater than last stop sequence.
+ if (currentRTree.isMoreRecentThan(lastRTree) && currentBTree.isMoreRecentThan(lastBTree)
+ && currentBloomFilter.isMoreRecentThan(lastBloomFilter)) {
validComparableRTreeFiles.add(currentRTree);
validComparableBTreeFiles.add(currentBTree);
validComparableBloomFilterFiles.add(currentBloomFilter);
lastRTree = currentRTree;
lastBTree = currentBTree;
lastBloomFilter = currentBloomFilter;
- } else if (currentRTree.interval[0].compareTo(lastRTree.interval[0]) >= 0
- && currentRTree.interval[1].compareTo(lastRTree.interval[1]) <= 0
- && currentBTree.interval[0].compareTo(lastBTree.interval[0]) >= 0
- && currentBTree.interval[1].compareTo(lastBTree.interval[1]) <= 0
- && currentBloomFilter.interval[0].compareTo(lastBloomFilter.interval[0]) >= 0
- && currentBloomFilter.interval[1].compareTo(lastBloomFilter.interval[1]) <= 0) {
- // Invalid files are completely contained in last interval.
- delete(treeFactory.getBufferCache(), currentRTree.fullPath);
- delete(treeFactory.getBufferCache(), currentBTree.fullPath);
- delete(treeFactory.getBufferCache(), currentBloomFilter.fullPath);
+ } else if (currentRTree.isWithin(lastRTree) && currentBTree.isWithin(lastBTree)
+ && currentBloomFilter.isWithin(lastBloomFilter)) {
+ // Invalid files are completely contained in last sequence.
+ delete(treeFactory.getBufferCache(), currentRTree.getFullPath());
+ delete(treeFactory.getBufferCache(), currentBTree.getFullPath());
+ delete(treeFactory.getBufferCache(), currentBloomFilter.getFullPath());
} else {
// This scenario should not be possible.
throw HyracksDataException.create(ErrorCode.FOUND_OVERLAPPING_LSM_FILES, baseDir);
@@ -167,29 +155,28 @@
// Sort valid files in reverse lexicographical order, such that newer
// files come first.
- Collections.sort(validComparableRTreeFiles, recencyCmp);
- Collections.sort(validComparableBTreeFiles, recencyCmp);
- Collections.sort(validComparableBloomFilterFiles, recencyCmp);
+ validComparableRTreeFiles.sort(recencyCmp);
+ validComparableBTreeFiles.sort(recencyCmp);
+ validComparableBloomFilterFiles.sort(recencyCmp);
- Iterator<ComparableFileName> rtreeFileIter = validComparableRTreeFiles.iterator();
- Iterator<ComparableFileName> btreeFileIter = validComparableBTreeFiles.iterator();
- Iterator<ComparableFileName> bloomFilterFileIter = validComparableBloomFilterFiles.iterator();
+ Iterator<IndexComponentFileReference> rtreeFileIter = validComparableRTreeFiles.iterator();
+ Iterator<IndexComponentFileReference> btreeFileIter = validComparableBTreeFiles.iterator();
+ Iterator<IndexComponentFileReference> bloomFilterFileIter = validComparableBloomFilterFiles.iterator();
while (rtreeFileIter.hasNext() && btreeFileIter.hasNext()) {
- ComparableFileName cmpRTreeFileName = rtreeFileIter.next();
- ComparableFileName cmpBTreeFileName = btreeFileIter.next();
- ComparableFileName cmpBloomFilterFileName = bloomFilterFileIter.next();
- validFiles.add(new LSMComponentFileReferences(cmpRTreeFileName.fileRef, cmpBTreeFileName.fileRef,
- cmpBloomFilterFileName.fileRef));
+ IndexComponentFileReference cmpRTreeFileName = rtreeFileIter.next();
+ IndexComponentFileReference cmpBTreeFileName = btreeFileIter.next();
+ IndexComponentFileReference cmpBloomFilterFileName = bloomFilterFileIter.next();
+ validFiles.add(new LSMComponentFileReferences(cmpRTreeFileName.getFileRef(), cmpBTreeFileName.getFileRef(),
+ cmpBloomFilterFileName.getFileRef()));
}
return validFiles;
}
@Override
public LSMComponentFileReferences getNewTransactionFileReference() throws IOException {
- String ts = getCurrentTimestamp();
+ String baseName = getNextComponentSequence(btreeFilter);
// Create transaction lock file
- Files.createFile(Paths.get(baseDir + TXN_PREFIX + ts));
- String baseName = ts + DELIMITER + ts;
+ Files.createFile(Paths.get(baseDir + TXN_PREFIX + baseName));
return new LSMComponentFileReferences(baseDir.getChild(baseName + DELIMITER + RTREE_SUFFIX),
baseDir.getChild(baseName + DELIMITER + BTREE_SUFFIX),
baseDir.getChild(baseName + DELIMITER + BLOOM_FILTER_SUFFIX));
diff --git a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-common-test/src/test/java/org/apache/hyracks/storage/am/lsm/common/component/TestLsmIndexFileManager.java b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-common-test/src/test/java/org/apache/hyracks/storage/am/lsm/common/component/TestLsmIndexFileManager.java
index 8ecbcc4..c255ee5 100644
--- a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-common-test/src/test/java/org/apache/hyracks/storage/am/lsm/common/component/TestLsmIndexFileManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-common-test/src/test/java/org/apache/hyracks/storage/am/lsm/common/component/TestLsmIndexFileManager.java
@@ -22,16 +22,19 @@
import java.io.FilenameFilter;
import java.util.ArrayList;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.io.FileReference;
import org.apache.hyracks.api.io.IIOManager;
import org.apache.hyracks.storage.am.common.api.ITreeIndex;
import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndexFileManager;
+import org.apache.hyracks.storage.am.lsm.common.impls.IndexComponentFileReference;
+import org.apache.hyracks.storage.am.lsm.common.impls.LSMComponentFileReferences;
import org.apache.hyracks.storage.am.lsm.common.impls.TreeIndexFactory;
import org.apache.hyracks.storage.common.buffercache.IBufferCache;
public class TestLsmIndexFileManager extends AbstractLSMIndexFileManager {
+ private long componentSeq = 0;
+
public TestLsmIndexFileManager(IIOManager ioManager, FileReference file,
TreeIndexFactory<? extends ITreeIndex> treeIndexFactory) {
super(ioManager, file, treeIndexFactory);
@@ -39,12 +42,18 @@
@Override
protected void cleanupAndGetValidFilesInternal(FilenameFilter filter,
- TreeIndexFactory<? extends ITreeIndex> treeFactory, ArrayList<ComparableFileName> allFiles,
- IBufferCache bufferCache) throws HyracksDataException {
+ TreeIndexFactory<? extends ITreeIndex> treeFactory, ArrayList<IndexComponentFileReference> allFiles,
+ IBufferCache bufferCache) {
String[] files = baseDir.getFile().list(filter);
for (String fileName : files) {
FileReference fileRef = baseDir.getChild(fileName);
- allFiles.add(new ComparableFileName(fileRef));
+ allFiles.add(IndexComponentFileReference.of(fileRef));
}
}
+
+ @Override
+ public LSMComponentFileReferences getRelFlushFileReference() {
+ String sequence = IndexComponentFileReference.getFlushSequence(componentSeq++);
+ return new LSMComponentFileReferences(baseDir.getChild(sequence), null, null);
+ }
}