[NO ISSUE][STO] Introduce Index Checkpoints
- user model changes: no
- storage format changes: yes
- Add index checkpoints.
- Use index checkpoint to determine low watermark
during recovery.
- interface changes: yes
- Introduce IIndexCheckpointManager for managing
indexes checkpoints.
- Introduce IIndexCheckpointProvider for tracking
IIndexCheckpointManager references.
Details:
- Unify LSM flush/merge operations completion order.
- Introduce index checkpoints which contains:
- Index low watermark.
- Latest valid LSM component
- Mapping between master replica and local replica.
- Use index checkpoints instead of LSM component metadata
for identifying low watermark in recovery.
- Use index checkpoints in replication instead of overwriting
LSN byte offset in replica component metadata.
- Replace LSN_MAP used in replication by index checkpoints.
- Replace NIO Files.find by Commons FileUtils.listFiles to
avoid no NoSuchFileException on any file deletion.
Change-Id: Ib22800002bf8ea3660242e599b3f5f20678301a8
Reviewed-on: https://asterix-gerrit.ics.uci.edu/2200
Sonar-Qube: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: abdullah alamoudi <bamousaa@gmail.com>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Contrib: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Till Westmann <tillw@apache.org>
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/IndexCheckpointManager.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/IndexCheckpointManager.java
new file mode 100644
index 0000000..446d04d
--- /dev/null
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/IndexCheckpointManager.java
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.app.nc;
+
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FilenameFilter;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.asterix.common.storage.IIndexCheckpointManager;
+import org.apache.asterix.common.storage.IndexCheckpoint;
+import org.apache.asterix.common.utils.StorageConstants;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.util.annotations.ThreadSafe;
+
+@ThreadSafe
+public class IndexCheckpointManager implements IIndexCheckpointManager {
+
+ private static final Logger LOGGER = Logger.getLogger(IndexCheckpointManager.class.getName());
+ private static final int HISTORY_CHECKPOINTS = 1;
+ private static final int MAX_CHECKPOINT_WRITE_ATTEMPTS = 5;
+ private static final FilenameFilter CHECKPOINT_FILE_FILTER =
+ (file, name) -> name.startsWith(StorageConstants.INDEX_CHECKPOINT_FILE_PREFIX);
+ private static final long BULKLOAD_LSN = 0;
+ private final Path indexPath;
+
+ public IndexCheckpointManager(Path indexPath) {
+ this.indexPath = indexPath;
+ }
+
+ @Override
+ public synchronized void init(long lsn) throws HyracksDataException {
+ final List<IndexCheckpoint> checkpoints = getCheckpoints();
+ if (!checkpoints.isEmpty()) {
+ LOGGER.warning(() -> "Checkpoints found on initializing: " + indexPath);
+ delete();
+ }
+ IndexCheckpoint firstCheckpoint = IndexCheckpoint.first(lsn);
+ persist(firstCheckpoint);
+ }
+
+ @Override
+ public synchronized void replicated(String componentTimestamp, long masterLsn) throws HyracksDataException {
+ final Long localLsn = getLatest().getMasterNodeFlushMap().get(masterLsn);
+ if (localLsn == null) {
+ throw new IllegalStateException("Component flushed before lsn mapping was received");
+ }
+ flushed(componentTimestamp, localLsn);
+ }
+
+ @Override
+ public synchronized void flushed(String componentTimestamp, long lsn) throws HyracksDataException {
+ final IndexCheckpoint latest = getLatest();
+ IndexCheckpoint nextCheckpoint = IndexCheckpoint.next(latest, lsn, componentTimestamp);
+ persist(nextCheckpoint);
+ deleteHistory(nextCheckpoint.getId(), HISTORY_CHECKPOINTS);
+ }
+
+ @Override
+ public synchronized void masterFlush(long masterLsn, long localLsn) throws HyracksDataException {
+ final IndexCheckpoint latest = getLatest();
+ latest.getMasterNodeFlushMap().put(masterLsn, localLsn);
+ final IndexCheckpoint next =
+ IndexCheckpoint.next(latest, latest.getLowWatermark(), latest.getValidComponentTimestamp());
+ persist(next);
+ notifyAll();
+ }
+
+ @Override
+ public synchronized long getLowWatermark() throws HyracksDataException {
+ return getLatest().getLowWatermark();
+ }
+
+ @Override
+ public synchronized boolean isFlushed(long masterLsn) throws HyracksDataException {
+ if (masterLsn == BULKLOAD_LSN) {
+ return true;
+ }
+ return getLatest().getMasterNodeFlushMap().containsKey(masterLsn);
+ }
+
+ @Override
+ public synchronized void advanceLowWatermark(long lsn) throws HyracksDataException {
+ flushed(getLatest().getValidComponentTimestamp(), lsn);
+ }
+
+ @Override
+ public synchronized void delete() {
+ deleteHistory(Long.MAX_VALUE, 0);
+ }
+
+ private IndexCheckpoint getLatest() {
+ final List<IndexCheckpoint> checkpoints = getCheckpoints();
+ if (checkpoints.isEmpty()) {
+ throw new IllegalStateException("Couldn't find any checkpoints for resource: " + indexPath);
+ }
+ checkpoints.sort(Comparator.comparingLong(IndexCheckpoint::getId).reversed());
+ return checkpoints.get(0);
+ }
+
+ private List<IndexCheckpoint> getCheckpoints() {
+ List<IndexCheckpoint> checkpoints = new ArrayList<>();
+ final File[] checkpointFiles = indexPath.toFile().listFiles(CHECKPOINT_FILE_FILTER);
+ if (checkpointFiles != null) {
+ for (File checkpointFile : checkpointFiles) {
+ try {
+ checkpoints.add(read(checkpointFile.toPath()));
+ } catch (IOException e) {
+ LOGGER.log(Level.WARNING, e, () -> "Couldn't read index checkpoint file: " + e);
+ }
+ }
+ }
+ return checkpoints;
+ }
+
+ private void persist(IndexCheckpoint checkpoint) throws HyracksDataException {
+ final Path checkpointPath = getCheckpointPath(checkpoint);
+ for (int i = 1; i <= MAX_CHECKPOINT_WRITE_ATTEMPTS; i++) {
+ try {
+ // clean up from previous write failure
+ if (checkpointPath.toFile().exists()) {
+ Files.delete(checkpointPath);
+ }
+ try (BufferedWriter writer = Files.newBufferedWriter(checkpointPath)) {
+ writer.write(checkpoint.asJson());
+ }
+ // ensure it was written correctly by reading it
+ read(checkpointPath);
+ } catch (IOException e) {
+ if (i == MAX_CHECKPOINT_WRITE_ATTEMPTS) {
+ throw HyracksDataException.create(e);
+ }
+ LOGGER.log(Level.WARNING, e, () -> "Filed to write checkpoint at: " + indexPath);
+ int nextAttempt = i + 1;
+ LOGGER.info(() -> "Checkpoint write attempt " + nextAttempt + "/" + MAX_CHECKPOINT_WRITE_ATTEMPTS);
+ }
+ }
+ }
+
+ private IndexCheckpoint read(Path checkpointPath) throws IOException {
+ return IndexCheckpoint.fromJson(new String(Files.readAllBytes(checkpointPath)));
+ }
+
+ private void deleteHistory(long latestId, int historyToKeep) {
+ try {
+ final File[] checkpointFiles = indexPath.toFile().listFiles(CHECKPOINT_FILE_FILTER);
+ if (checkpointFiles != null) {
+ for (File checkpointFile : checkpointFiles) {
+ if (getCheckpointIdFromFileName(checkpointFile.toPath()) < (latestId - historyToKeep)) {
+ Files.delete(checkpointFile.toPath());
+ }
+ }
+ }
+ } catch (Exception e) {
+ LOGGER.log(Level.WARNING, e, () -> "Couldn't delete history checkpoints at " + indexPath);
+ }
+ }
+
+ private Path getCheckpointPath(IndexCheckpoint checkpoint) {
+ return Paths.get(indexPath.toString(),
+ StorageConstants.INDEX_CHECKPOINT_FILE_PREFIX + String.valueOf(checkpoint.getId()));
+ }
+
+ private long getCheckpointIdFromFileName(Path checkpointPath) {
+ return Long.valueOf(checkpointPath.getFileName().toString()
+ .substring(StorageConstants.INDEX_CHECKPOINT_FILE_PREFIX.length()));
+ }
+}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/IndexCheckpointManagerProvider.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/IndexCheckpointManagerProvider.java
new file mode 100644
index 0000000..19ad8f6
--- /dev/null
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/IndexCheckpointManagerProvider.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.app.nc;
+
+import java.nio.file.Path;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.asterix.common.storage.IIndexCheckpointManager;
+import org.apache.asterix.common.storage.IIndexCheckpointManagerProvider;
+import org.apache.asterix.common.storage.ResourceReference;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.io.IIOManager;
+
+public class IndexCheckpointManagerProvider implements IIndexCheckpointManagerProvider {
+
+ private final Map<ResourceReference, IndexCheckpointManager> indexCheckpointManagerMap = new HashMap<>();
+ private final IIOManager ioManager;
+
+ public IndexCheckpointManagerProvider(IIOManager ioManager) {
+ this.ioManager = ioManager;
+ }
+
+ @Override
+ public IIndexCheckpointManager get(ResourceReference ref) throws HyracksDataException {
+ synchronized (indexCheckpointManagerMap) {
+ return indexCheckpointManagerMap.computeIfAbsent(ref, this::create);
+ }
+ }
+
+ @Override
+ public void close(ResourceReference ref) {
+ synchronized (indexCheckpointManagerMap) {
+ indexCheckpointManagerMap.remove(ref);
+ }
+ }
+
+ private IndexCheckpointManager create(ResourceReference ref) {
+ try {
+ final Path indexPath = getIndexPath(ref);
+ return new IndexCheckpointManager(indexPath);
+ } catch (HyracksDataException e) {
+ throw new IllegalStateException(e);
+ }
+ }
+
+ private Path getIndexPath(ResourceReference indexRef) throws HyracksDataException {
+ return ioManager.resolve(indexRef.getRelativePath().toString()).getFile().toPath();
+ }
+}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
index 5cae2d6..b6bf2df 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
@@ -60,6 +60,7 @@
import org.apache.asterix.common.replication.IReplicaResourcesManager;
import org.apache.asterix.common.replication.IReplicationChannel;
import org.apache.asterix.common.replication.IReplicationManager;
+import org.apache.asterix.common.storage.IIndexCheckpointManagerProvider;
import org.apache.asterix.common.storage.IStorageSubsystem;
import org.apache.asterix.common.transactions.IAppRuntimeContextProvider;
import org.apache.asterix.common.transactions.IRecoveryManager;
@@ -142,6 +143,7 @@
private final IStorageComponentProvider componentProvider;
private IHyracksClientConnection hcc;
private IStorageSubsystem storageSubsystem;
+ private IIndexCheckpointManagerProvider indexCheckpointManagerProvider;
public NCAppRuntimeContext(INCServiceContext ncServiceContext, List<AsterixExtension> extensions)
throws AsterixException, InstantiationException, IllegalAccessException, ClassNotFoundException,
@@ -182,11 +184,11 @@
lsmIOScheduler = AsynchronousScheduler.INSTANCE;
metadataMergePolicyFactory = new PrefixMergePolicyFactory();
+ indexCheckpointManagerProvider = new IndexCheckpointManagerProvider(ioManager);
ILocalResourceRepositoryFactory persistentLocalResourceRepositoryFactory =
new PersistentLocalResourceRepositoryFactory(ioManager, getServiceContext().getNodeId(),
- metadataProperties);
-
+ metadataProperties, indexCheckpointManagerProvider);
localResourceRepository =
(PersistentLocalResourceRepository) persistentLocalResourceRepositoryFactory.createRepository();
@@ -203,11 +205,10 @@
}
localResourceRepository.deleteStorageData();
}
-
datasetMemoryManager = new DatasetMemoryManager(storageProperties);
datasetLifecycleManager =
new DatasetLifecycleManager(storageProperties, localResourceRepository, txnSubsystem.getLogManager(),
- datasetMemoryManager, ioManager.getIODevices().size());
+ datasetMemoryManager, indexCheckpointManagerProvider, ioManager.getIODevices().size());
final String nodeId = getServiceContext().getNodeId();
final ClusterPartition[] nodePartitions = metadataProperties.getNodePartitions().get(nodeId);
final Set<Integer> nodePartitionsIds = Arrays.stream(nodePartitions).map(ClusterPartition::getPartitionId)
@@ -220,7 +221,8 @@
if (replicationProperties.isParticipant(getServiceContext().getNodeId())) {
- replicaResourcesManager = new ReplicaResourcesManager(localResourceRepository, metadataProperties);
+ replicaResourcesManager = new ReplicaResourcesManager(localResourceRepository, metadataProperties,
+ indexCheckpointManagerProvider);
replicationManager = new ReplicationManager(nodeId, replicationProperties, replicaResourcesManager,
txnSubsystem.getLogManager(), asterixAppRuntimeContextProvider);
@@ -229,13 +231,13 @@
//LogManager to replicate logs
txnSubsystem.getLogManager().setReplicationManager(replicationManager);
- //PersistentLocalResourceRepository to replicate metadata files and delete backups on drop index
+ //PersistentLocalResourceRepository to replicated metadata files and delete backups on drop index
localResourceRepository.setReplicationManager(replicationManager);
/*
* add the partitions that will be replicated in this node as inactive partitions
*/
- //get nodes which replicate to this node
+ //get nodes which replicated to this node
Set<String> remotePrimaryReplicas = replicationProperties.getRemotePrimaryReplicasIds(nodeId);
for (String clientId : remotePrimaryReplicas) {
//get the partitions of each client
@@ -529,4 +531,9 @@
public IStorageSubsystem getStorageSubsystem() {
return storageSubsystem;
}
+
+ @Override
+ public IIndexCheckpointManagerProvider getIndexCheckpointManagerProvider() {
+ return indexCheckpointManagerProvider;
+ }
}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java
index e29e3fe..f0ed5e9 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java
@@ -43,11 +43,14 @@
import java.util.logging.Logger;
import org.apache.asterix.common.api.IDatasetLifecycleManager;
+import org.apache.asterix.common.api.INcApplicationContext;
import org.apache.asterix.common.config.ReplicationProperties;
import org.apache.asterix.common.dataflow.DatasetLocalResource;
import org.apache.asterix.common.exceptions.ACIDException;
import org.apache.asterix.common.ioopcallbacks.AbstractLSMIOOperationCallback;
import org.apache.asterix.common.replication.IReplicaResourcesManager;
+import org.apache.asterix.common.storage.DatasetResourceReference;
+import org.apache.asterix.common.storage.IIndexCheckpointManagerProvider;
import org.apache.asterix.common.transactions.Checkpoint;
import org.apache.asterix.common.transactions.IAppRuntimeContextProvider;
import org.apache.asterix.common.transactions.ICheckpointManager;
@@ -293,6 +296,8 @@
IAppRuntimeContextProvider appRuntimeContext = txnSubsystem.getAsterixAppRuntimeContextProvider();
IDatasetLifecycleManager datasetLifecycleManager = appRuntimeContext.getDatasetLifecycleManager();
+ final IIndexCheckpointManagerProvider indexCheckpointManagerProvider =
+ ((INcApplicationContext) (serviceCtx.getApplicationContext())).getIndexCheckpointManagerProvider();
Map<Long, LocalResource> resourcesMap = localResourceRepository.loadAndGetAllResources();
Map<Long, Long> resourceId2MaxLSNMap = new HashMap<>();
@@ -356,18 +361,15 @@
index = (ILSMIndex) localResourceMetadata.createInstance(serviceCtx);
datasetLifecycleManager.register(localResource.getPath(), index);
datasetLifecycleManager.open(localResource.getPath());
-
- //#. get maxDiskLastLSN
- ILSMIndex lsmIndex = index;
try {
+ final DatasetResourceReference resourceReference =
+ DatasetResourceReference.of(localResource);
maxDiskLastLsn =
- ((AbstractLSMIOOperationCallback) lsmIndex.getIOOperationCallback())
- .getComponentLSN(lsmIndex.getDiskComponents());
+ indexCheckpointManagerProvider.get(resourceReference).getLowWatermark();
} catch (HyracksDataException e) {
datasetLifecycleManager.close(localResource.getPath());
throw e;
}
-
//#. set resourceId and maxDiskLastLSN to the map
resourceId2MaxLSNMap.put(resourceId, maxDiskLastLsn);
} else {
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestLsmBtreeIoOpCallbackFactory.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestLsmBtreeIoOpCallbackFactory.java
index fea6cd8..4bfc581 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestLsmBtreeIoOpCallbackFactory.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestLsmBtreeIoOpCallbackFactory.java
@@ -22,6 +22,8 @@
import org.apache.asterix.common.ioopcallbacks.LSMBTreeIOOperationCallback;
import org.apache.asterix.common.ioopcallbacks.LSMBTreeIOOperationCallbackFactory;
+import org.apache.asterix.common.storage.IIndexCheckpointManager;
+import org.apache.asterix.common.storage.IIndexCheckpointManagerProvider;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.storage.am.lsm.btree.impl.TestLsmBtree;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent;
@@ -58,7 +60,7 @@
// Whenever this is called, it resets the counter
// However, the counters for the failed operations are never reset since we expect them
// To be always 0
- return new TestLsmBtreeIoOpCallback(index, getComponentIdGenerator());
+ return new TestLsmBtreeIoOpCallback(index, getComponentIdGenerator(), getIndexCheckpointManagerProvider());
}
public int getTotalFlushes() {
@@ -100,8 +102,9 @@
public class TestLsmBtreeIoOpCallback extends LSMBTreeIOOperationCallback {
private final TestLsmBtree lsmBtree;
- public TestLsmBtreeIoOpCallback(ILSMIndex index, ILSMComponentIdGenerator idGenerator) {
- super(index, idGenerator);
+ public TestLsmBtreeIoOpCallback(ILSMIndex index, ILSMComponentIdGenerator idGenerator,
+ IIndexCheckpointManagerProvider checkpointManagerProvider) {
+ super(index, idGenerator, checkpointManagerProvider);
lsmBtree = (TestLsmBtree) index;
}
@@ -121,7 +124,8 @@
}
@Override
- public void afterFinalize(LSMIOOperationType opType, ILSMDiskComponent newComponent) {
+ public void afterFinalize(LSMIOOperationType opType, ILSMDiskComponent newComponent)
+ throws HyracksDataException {
lsmBtree.afterIoFinalizeCalled();
super.afterFinalize(opType, newComponent);
synchronized (TestLsmBtreeIoOpCallbackFactory.this) {
diff --git a/asterixdb/asterix-common/pom.xml b/asterixdb/asterix-common/pom.xml
index ad83d60..b909e91 100644
--- a/asterixdb/asterix-common/pom.xml
+++ b/asterixdb/asterix-common/pom.xml
@@ -225,18 +225,10 @@
</dependency>
<dependency>
<groupId>org.apache.hyracks</groupId>
- <artifactId>hyracks-storage-am-btree</artifactId>
- </dependency>
- <dependency>
- <groupId>org.apache.hyracks</groupId>
<artifactId>hyracks-storage-am-bloomfilter</artifactId>
</dependency>
<dependency>
<groupId>org.apache.hyracks</groupId>
- <artifactId>hyracks-storage-am-rtree</artifactId>
- </dependency>
- <dependency>
- <groupId>org.apache.hyracks</groupId>
<artifactId>hyracks-api</artifactId>
</dependency>
<dependency>
@@ -245,10 +237,6 @@
</dependency>
<dependency>
<groupId>org.apache.hyracks</groupId>
- <artifactId>hyracks-storage-am-lsm-invertedindex</artifactId>
- </dependency>
- <dependency>
- <groupId>org.apache.hyracks</groupId>
<artifactId>hyracks-storage-am-lsm-rtree</artifactId>
</dependency>
<dependency>
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/INcApplicationContext.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/INcApplicationContext.java
index 162e693..0503c09 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/INcApplicationContext.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/INcApplicationContext.java
@@ -28,7 +28,9 @@
import org.apache.asterix.common.replication.IReplicaResourcesManager;
import org.apache.asterix.common.replication.IReplicationChannel;
import org.apache.asterix.common.replication.IReplicationManager;
+import org.apache.asterix.common.storage.IIndexCheckpointManagerProvider;
import org.apache.asterix.common.storage.IStorageSubsystem;
+import org.apache.asterix.common.storage.IIndexCheckpointManager;
import org.apache.asterix.common.transactions.ITransactionSubsystem;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.api.application.INCServiceContext;
@@ -118,4 +120,6 @@
INCServiceContext getServiceContext();
IStorageSubsystem getStorageSubsystem();
+
+ IIndexCheckpointManagerProvider getIndexCheckpointManagerProvider();
}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/BaseOperationTracker.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/BaseOperationTracker.java
index 9f57981..9ec13ef 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/BaseOperationTracker.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/BaseOperationTracker.java
@@ -46,7 +46,7 @@
@Override
public void afterOperation(ILSMIndex index, LSMOperationType opType, ISearchOperationCallback searchCallback,
IModificationOperationCallback modificationCallback) throws HyracksDataException {
- if (opType == LSMOperationType.FLUSH || opType == LSMOperationType.REPLICATE) {
+ if (opType == LSMOperationType.REPLICATE) {
dsInfo.undeclareActiveIOOperation();
}
}
@@ -54,14 +54,11 @@
@Override
public void completeOperation(ILSMIndex index, LSMOperationType opType, ISearchOperationCallback searchCallback,
IModificationOperationCallback modificationCallback) throws HyracksDataException {
- if (opType == LSMOperationType.MERGE) {
+ if (opType == LSMOperationType.FLUSH || opType == LSMOperationType.MERGE) {
dsInfo.undeclareActiveIOOperation();
}
}
- public void exclusiveJobCommitted() throws HyracksDataException {
- }
-
@Override
public void beforeTransaction(long resourceId) {
/*
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
index ce43bca..6a1ebfb 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
@@ -35,6 +35,8 @@
import org.apache.asterix.common.exceptions.ACIDException;
import org.apache.asterix.common.ioopcallbacks.AbstractLSMIOOperationCallback;
import org.apache.asterix.common.replication.IReplicationStrategy;
+import org.apache.asterix.common.storage.DatasetResourceReference;
+import org.apache.asterix.common.storage.IIndexCheckpointManagerProvider;
import org.apache.asterix.common.transactions.ILogManager;
import org.apache.asterix.common.transactions.LogRecord;
import org.apache.asterix.common.utils.StoragePathUtil;
@@ -64,13 +66,16 @@
private final LogRecord logRecord;
private final int numPartitions;
private volatile boolean stopped = false;
+ private final IIndexCheckpointManagerProvider indexCheckpointManagerProvider;
public DatasetLifecycleManager(StorageProperties storageProperties, ILocalResourceRepository resourceRepository,
- ILogManager logManager, IDatasetMemoryManager memoryManager, int numPartitions) {
+ ILogManager logManager, IDatasetMemoryManager memoryManager,
+ IIndexCheckpointManagerProvider indexCheckpointManagerProvider, int numPartitions) {
this.logManager = logManager;
this.storageProperties = storageProperties;
this.resourceRepository = resourceRepository;
this.memoryManager = memoryManager;
+ this.indexCheckpointManagerProvider = indexCheckpointManagerProvider;
this.numPartitions = numPartitions;
logRecord = new LogRecord();
}
@@ -149,12 +154,7 @@
// TODO: use fine-grained counters, one for each index instead of a single counter per dataset.
DatasetInfo dsInfo = dsr.getDatasetInfo();
dsInfo.waitForIO();
- if (iInfo.isOpen()) {
- ILSMOperationTracker indexOpTracker = iInfo.getIndex().getOperationTracker();
- synchronized (indexOpTracker) {
- iInfo.getIndex().deactivate(false);
- }
- }
+ closeIndex(iInfo);
dsInfo.getIndexes().remove(resourceID);
if (dsInfo.getReferenceCount() == 0 && dsInfo.isOpen() && dsInfo.getIndexes().isEmpty()
&& !dsInfo.isExternal()) {
@@ -451,13 +451,7 @@
throw HyracksDataException.create(e);
}
for (IndexInfo iInfo : dsInfo.getIndexes().values()) {
- if (iInfo.isOpen()) {
- ILSMOperationTracker opTracker = iInfo.getIndex().getOperationTracker();
- synchronized (opTracker) {
- iInfo.getIndex().deactivate(false);
- }
- iInfo.setOpen(false);
- }
+ closeIndex(iInfo);
}
removeDatasetFromCache(dsInfo.getDatasetID());
dsInfo.setOpen(false);
@@ -579,4 +573,15 @@
}
}
}
+
+ private void closeIndex(IndexInfo indexInfo) throws HyracksDataException {
+ if (indexInfo.isOpen()) {
+ ILSMOperationTracker opTracker = indexInfo.getIndex().getOperationTracker();
+ synchronized (opTracker) {
+ indexInfo.getIndex().deactivate(false);
+ }
+ indexCheckpointManagerProvider.close(DatasetResourceReference.of(indexInfo.getLocalResource()));
+ indexInfo.setOpen(false);
+ }
+ }
}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetResource.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetResource.java
index f6e2b0d..c02de7e 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetResource.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetResource.java
@@ -108,7 +108,7 @@
if (index == null) {
throw new HyracksDataException("Attempt to register a null index");
}
- datasetInfo.getIndexes().put(resourceID, new IndexInfo(index, datasetInfo.getDatasetID(), resourceID,
+ datasetInfo.getIndexes().put(resourceID, new IndexInfo(index, datasetInfo.getDatasetID(), resource,
((DatasetLocalResource) resource.getResource()).getPartition()));
}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/IndexInfo.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/IndexInfo.java
index 9eb5b6c..b094b6f 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/IndexInfo.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/IndexInfo.java
@@ -19,17 +19,20 @@
package org.apache.asterix.common.context;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
+import org.apache.hyracks.storage.common.LocalResource;
public class IndexInfo extends Info {
private final ILSMIndex index;
private final int datasetId;
private final long resourceId;
private final int partition;
+ private final LocalResource localResource;
- public IndexInfo(ILSMIndex index, int datasetId, long resourceId, int partition) {
+ public IndexInfo(ILSMIndex index, int datasetId, LocalResource localResource, int partition) {
this.index = index;
this.datasetId = datasetId;
- this.resourceId = resourceId;
+ this.localResource = localResource;
+ this.resourceId = localResource.getId();
this.partition = partition;
}
@@ -48,4 +51,8 @@
public int getDatasetId() {
return datasetId;
}
+
+ public LocalResource getLocalResource() {
+ return localResource;
+ }
}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java
index ababe9c..14e91ba 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java
@@ -73,7 +73,7 @@
public void afterOperation(ILSMIndex index, LSMOperationType opType, ISearchOperationCallback searchCallback,
IModificationOperationCallback modificationCallback) throws HyracksDataException {
// Searches are immediately considered complete, because they should not prevent the execution of flushes.
- if (opType == LSMOperationType.FLUSH || opType == LSMOperationType.REPLICATE) {
+ if (opType == LSMOperationType.REPLICATE) {
completeOperation(index, opType, searchCallback, modificationCallback);
}
}
@@ -160,12 +160,6 @@
flushLogCreated = false;
}
- @Override
- public void exclusiveJobCommitted() throws HyracksDataException {
- numActiveOperations.set(0);
- flushIfRequested();
- }
-
public int getNumActiveOperations() {
return numActiveOperations.get();
}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/LSMIndexUtil.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/LSMIndexUtil.java
index 04090bb..e844192 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/LSMIndexUtil.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/LSMIndexUtil.java
@@ -41,11 +41,4 @@
}
}
}
-
- public static long getComponentFileLSNOffset(ILSMIndex lsmIndex, ILSMDiskComponent lsmComponent,
- String componentFilePath) throws HyracksDataException {
- AbstractLSMIOOperationCallback ioOpCallback =
- (AbstractLSMIOOperationCallback) lsmIndex.getIOOperationCallback();
- return ioOpCallback.getComponentFileLSNOffset(lsmComponent, componentFilePath);
- }
}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/AbstractLSMIOOperationCallback.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/AbstractLSMIOOperationCallback.java
index 1432f25..c625988 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/AbstractLSMIOOperationCallback.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/AbstractLSMIOOperationCallback.java
@@ -19,8 +19,13 @@
package org.apache.asterix.common.ioopcallbacks;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import org.apache.asterix.common.storage.IIndexCheckpointManagerProvider;
+import org.apache.asterix.common.storage.ResourceReference;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.data.std.primitive.LongPointable;
import org.apache.hyracks.storage.am.common.api.IMetadataPageManager;
@@ -33,6 +38,7 @@
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMMemoryComponent;
+import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndexFileManager;
import org.apache.hyracks.storage.am.lsm.common.impls.DiskComponentMetadata;
import org.apache.hyracks.storage.am.lsm.common.util.ComponentUtils;
import org.apache.hyracks.storage.am.lsm.common.util.LSMComponentIdUtils;
@@ -61,10 +67,14 @@
protected ILSMComponentId[] nextComponentIds;
protected final ILSMComponentIdGenerator idGenerator;
+ private final IIndexCheckpointManagerProvider indexCheckpointManagerProvider;
+ private final Map<ILSMComponentId, Long> componentLsnMap = new HashMap<>();
- public AbstractLSMIOOperationCallback(ILSMIndex lsmIndex, ILSMComponentIdGenerator idGenerator) {
+ public AbstractLSMIOOperationCallback(ILSMIndex lsmIndex, ILSMComponentIdGenerator idGenerator,
+ IIndexCheckpointManagerProvider indexCheckpointManagerProvider) {
this.lsmIndex = lsmIndex;
this.idGenerator = idGenerator;
+ this.indexCheckpointManagerProvider = indexCheckpointManagerProvider;
int count = lsmIndex.getNumberOfAllMemoryComponents();
mutableLastLSNs = new long[count];
firstLSNs = new long[count];
@@ -104,42 +114,59 @@
public void afterOperation(LSMIOOperationType opType, List<ILSMComponent> oldComponents,
ILSMDiskComponent newComponent) throws HyracksDataException {
//TODO: Copying Filters and all content of the metadata pages for flush operation should be done here
- if (newComponent != null) {
- putLSNIntoMetadata(newComponent, oldComponents);
- putComponentIdIntoMetadata(opType, newComponent, oldComponents);
- if (opType == LSMIOOperationType.MERGE) {
- // In case of merge, oldComponents are never null
- LongPointable markerLsn =
- LongPointable.FACTORY.createPointable(ComponentUtils.getLong(oldComponents.get(0).getMetadata(),
- ComponentUtils.MARKER_LSN_KEY, ComponentUtils.NOT_FOUND));
- newComponent.getMetadata().put(ComponentUtils.MARKER_LSN_KEY, markerLsn);
- } else if (opType == LSMIOOperationType.FLUSH) {
- // advance memory component indexes
- synchronized (this) {
- // we've already consumed the specified LSN/component id.
- // Now we can advance to the next component
- flushRequested[readIndex] = false;
- // if the component which just finished flushing is the component that will be modified next,
- // we set its first LSN to its previous LSN
- if (readIndex == writeIndex) {
- firstLSNs[writeIndex] = mutableLastLSNs[writeIndex];
- }
- readIndex = (readIndex + 1) % mutableLastLSNs.length;
+ if (newComponent == null) {
+ // failed operation. Nothing to do.
+ return;
+ }
+ putLSNIntoMetadata(newComponent, oldComponents);
+ putComponentIdIntoMetadata(opType, newComponent, oldComponents);
+ componentLsnMap.put(newComponent.getId(), getComponentLSN(oldComponents));
+ if (opType == LSMIOOperationType.MERGE) {
+ if (oldComponents == null) {
+ throw new IllegalStateException("Merge must have old components");
+ }
+ LongPointable markerLsn = LongPointable.FACTORY.createPointable(ComponentUtils
+ .getLong(oldComponents.get(0).getMetadata(), ComponentUtils.MARKER_LSN_KEY,
+ ComponentUtils.NOT_FOUND));
+ newComponent.getMetadata().put(ComponentUtils.MARKER_LSN_KEY, markerLsn);
+ } else if (opType == LSMIOOperationType.FLUSH) {
+ // advance memory component indexes
+ synchronized (this) {
+ // we've already consumed the specified LSN/component id.
+ // Now we can advance to the next component
+ flushRequested[readIndex] = false;
+ // if the component which just finished flushing is the component that will be modified next,
+ // we set its first LSN to its previous LSN
+ if (readIndex == writeIndex) {
+ firstLSNs[writeIndex] = mutableLastLSNs[writeIndex];
}
+ readIndex = (readIndex + 1) % mutableLastLSNs.length;
}
}
}
@Override
- public void afterFinalize(LSMIOOperationType opType, ILSMDiskComponent newComponent) {
+ public void afterFinalize(LSMIOOperationType opType, ILSMDiskComponent newComponent) throws HyracksDataException {
// The operation was complete and the next I/O operation for the LSM index didn't start yet
if (opType == LSMIOOperationType.FLUSH) {
hasFlushed = true;
+ if (newComponent != null) {
+ final Long lsn = componentLsnMap.remove(newComponent.getId());
+ if (lsn == null) {
+ throw new IllegalStateException("Unidentified flushed component: " + newComponent);
+ }
+ // empty component doesn't have any files
+ final Optional<String> componentFile = newComponent.getLSMComponentPhysicalFiles().stream().findAny();
+ if (componentFile.isPresent()) {
+ final ResourceReference ref = ResourceReference.of(componentFile.get());
+ final String componentEndTime = AbstractLSMIndexFileManager.getComponentEndTime(ref.getName());
+ indexCheckpointManagerProvider.get(ref).flushed(componentEndTime, lsn);
+ }
+ }
}
-
}
- public void putLSNIntoMetadata(ILSMDiskComponent newComponent, List<ILSMComponent> oldComponents)
+ private void putLSNIntoMetadata(ILSMDiskComponent newComponent, List<ILSMComponent> oldComponents)
throws HyracksDataException {
newComponent.getMetadata().put(LSN_KEY, LongPointable.FACTORY.createPointable(getComponentLSN(oldComponents)));
}
@@ -155,8 +182,8 @@
if (mergedComponents == null || mergedComponents.isEmpty()) {
return null;
}
- return LSMComponentIdUtils.union(mergedComponents.get(0).getId(),
- mergedComponents.get(mergedComponents.size() - 1).getId());
+ return LSMComponentIdUtils
+ .union(mergedComponents.get(0).getId(), mergedComponents.get(mergedComponents.size() - 1).getId());
}
@@ -186,7 +213,7 @@
}
}
- public void setFirstLSN(long firstLSN) {
+ public synchronized void setFirstLSN(long firstLSN) {
// We make sure that this method is only called on an empty component so the first LSN is not set incorrectly
firstLSNs[writeIndex] = firstLSN;
}
@@ -212,8 +239,7 @@
// Implies a flush IO operation. --> moves the flush pointer
// Flush operation of an LSM index are executed sequentially.
synchronized (this) {
- long lsn = mutableLastLSNs[readIndex];
- return lsn;
+ return mutableLastLSNs[readIndex];
}
}
// Get max LSN from the diskComponents. Implies a merge IO operation or Recovery operation.
@@ -246,15 +272,4 @@
component.resetId(componentId);
}
}
-
- /**
- * @param component
- * @param componentFilePath
- * @return The LSN byte offset in the LSM disk component if the index is valid,
- * otherwise {@link IMetadataPageManager#INVALID_LSN_OFFSET}.
- * @throws HyracksDataException
- */
- public abstract long getComponentFileLSNOffset(ILSMDiskComponent component, String componentFilePath)
- throws HyracksDataException;
-
}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/AbstractLSMIndexIOOperationCallbackFactory.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/AbstractLSMIndexIOOperationCallbackFactory.java
index 5dff7f4..ed56ab1 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/AbstractLSMIndexIOOperationCallbackFactory.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/AbstractLSMIndexIOOperationCallbackFactory.java
@@ -21,6 +21,8 @@
import java.io.ObjectStreamException;
+import org.apache.asterix.common.api.INcApplicationContext;
+import org.apache.asterix.common.storage.IIndexCheckpointManagerProvider;
import org.apache.hyracks.api.application.INCServiceContext;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentId;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentIdGenerator;
@@ -49,6 +51,10 @@
return idGeneratorFactory.getComponentIdGenerator(ncCtx);
}
+ protected IIndexCheckpointManagerProvider getIndexCheckpointManagerProvider() {
+ return ((INcApplicationContext) ncCtx.getApplicationContext()).getIndexCheckpointManagerProvider();
+ }
+
private void readObjectNoData() throws ObjectStreamException {
idGeneratorFactory = new ILSMComponentIdGeneratorFactory() {
private static final long serialVersionUID = 1L;
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMBTreeIOOperationCallback.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMBTreeIOOperationCallback.java
index c1ee03b..db6c609 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMBTreeIOOperationCallback.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMBTreeIOOperationCallback.java
@@ -19,28 +19,14 @@
package org.apache.asterix.common.ioopcallbacks;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.storage.am.btree.impls.BTree;
-import org.apache.hyracks.storage.am.common.api.IMetadataPageManager;
-import org.apache.hyracks.storage.am.lsm.btree.impls.LSMBTreeFileManager;
+import org.apache.asterix.common.storage.IIndexCheckpointManagerProvider;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentIdGenerator;
-import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
public class LSMBTreeIOOperationCallback extends AbstractLSMIOOperationCallback {
- public LSMBTreeIOOperationCallback(ILSMIndex index, ILSMComponentIdGenerator idGenerator) {
- super(index, idGenerator);
- }
-
- @Override
- public long getComponentFileLSNOffset(ILSMDiskComponent diskComponent, String diskComponentFilePath)
- throws HyracksDataException {
- if (diskComponentFilePath.endsWith(LSMBTreeFileManager.BTREE_SUFFIX)) {
- IMetadataPageManager metadataPageManager =
- (IMetadataPageManager) ((BTree) diskComponent.getIndex()).getPageManager();
- return metadataPageManager.getFileOffset(metadataPageManager.createMetadataFrame(), LSN_KEY);
- }
- return INVALID;
+ public LSMBTreeIOOperationCallback(ILSMIndex index, ILSMComponentIdGenerator idGenerator,
+ IIndexCheckpointManagerProvider checkpointManagerProvider) {
+ super(index, idGenerator, checkpointManagerProvider);
}
}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMBTreeIOOperationCallbackFactory.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMBTreeIOOperationCallbackFactory.java
index 4ef12ef..95245cb 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMBTreeIOOperationCallbackFactory.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMBTreeIOOperationCallbackFactory.java
@@ -33,6 +33,6 @@
@Override
public ILSMIOOperationCallback createIoOpCallback(ILSMIndex index) {
- return new LSMBTreeIOOperationCallback(index, getComponentIdGenerator());
+ return new LSMBTreeIOOperationCallback(index, getComponentIdGenerator(), getIndexCheckpointManagerProvider());
}
}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMBTreeWithBuddyIOOperationCallback.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMBTreeWithBuddyIOOperationCallback.java
index b43fb2f..da1446b 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMBTreeWithBuddyIOOperationCallback.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMBTreeWithBuddyIOOperationCallback.java
@@ -18,28 +18,14 @@
*/
package org.apache.asterix.common.ioopcallbacks;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.storage.am.btree.impls.BTree;
-import org.apache.hyracks.storage.am.common.api.IMetadataPageManager;
-import org.apache.hyracks.storage.am.lsm.btree.impls.LSMBTreeWithBuddyFileManager;
+import org.apache.asterix.common.storage.IIndexCheckpointManagerProvider;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentIdGenerator;
-import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
public class LSMBTreeWithBuddyIOOperationCallback extends AbstractLSMIOOperationCallback {
- public LSMBTreeWithBuddyIOOperationCallback(ILSMIndex lsmIndex, ILSMComponentIdGenerator idGenerator) {
- super(lsmIndex, idGenerator);
- }
-
- @Override
- public long getComponentFileLSNOffset(ILSMDiskComponent diskComponent, String diskComponentFilePath)
- throws HyracksDataException {
- if (diskComponentFilePath.endsWith(LSMBTreeWithBuddyFileManager.BTREE_SUFFIX)) {
- IMetadataPageManager metadataPageManager =
- (IMetadataPageManager) ((BTree) diskComponent.getIndex()).getPageManager();
- return metadataPageManager.getFileOffset(metadataPageManager.createMetadataFrame(), LSN_KEY);
- }
- return INVALID;
+ public LSMBTreeWithBuddyIOOperationCallback(ILSMIndex lsmIndex, ILSMComponentIdGenerator idGenerator,
+ IIndexCheckpointManagerProvider checkpointManagerProvider) {
+ super(lsmIndex, idGenerator, checkpointManagerProvider);
}
}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMBTreeWithBuddyIOOperationCallbackFactory.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMBTreeWithBuddyIOOperationCallbackFactory.java
index 6727bf6..6c75ed6 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMBTreeWithBuddyIOOperationCallbackFactory.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMBTreeWithBuddyIOOperationCallbackFactory.java
@@ -32,6 +32,7 @@
@Override
public ILSMIOOperationCallback createIoOpCallback(ILSMIndex index) {
- return new LSMBTreeWithBuddyIOOperationCallback(index, getComponentIdGenerator());
+ return new LSMBTreeWithBuddyIOOperationCallback(index, getComponentIdGenerator(),
+ getIndexCheckpointManagerProvider());
}
}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMInvertedIndexIOOperationCallback.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMInvertedIndexIOOperationCallback.java
index 015cd38..3ba9bcd 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMInvertedIndexIOOperationCallback.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMInvertedIndexIOOperationCallback.java
@@ -19,29 +19,14 @@
package org.apache.asterix.common.ioopcallbacks;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.storage.am.common.api.IMetadataPageManager;
+import org.apache.asterix.common.storage.IIndexCheckpointManagerProvider;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentIdGenerator;
-import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
-import org.apache.hyracks.storage.am.lsm.invertedindex.impls.LSMInvertedIndexDiskComponent;
-import org.apache.hyracks.storage.am.lsm.invertedindex.impls.LSMInvertedIndexFileManager;
public class LSMInvertedIndexIOOperationCallback extends AbstractLSMIOOperationCallback {
- public LSMInvertedIndexIOOperationCallback(ILSMIndex index, ILSMComponentIdGenerator idGenerator) {
- super(index, idGenerator);
- }
-
- @Override
- public long getComponentFileLSNOffset(ILSMDiskComponent diskComponent, String diskComponentFilePath)
- throws HyracksDataException {
- if (diskComponentFilePath.endsWith(LSMInvertedIndexFileManager.DELETED_KEYS_BTREE_SUFFIX)) {
- LSMInvertedIndexDiskComponent invIndexComponent = (LSMInvertedIndexDiskComponent) diskComponent;
- IMetadataPageManager metadataPageManager =
- (IMetadataPageManager) invIndexComponent.getBuddyIndex().getPageManager();
- return metadataPageManager.getFileOffset(metadataPageManager.createMetadataFrame(), LSN_KEY);
- }
- return INVALID;
+ public LSMInvertedIndexIOOperationCallback(ILSMIndex index, ILSMComponentIdGenerator idGenerator,
+ IIndexCheckpointManagerProvider checkpointManagerProvider) {
+ super(index, idGenerator, checkpointManagerProvider);
}
}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMInvertedIndexIOOperationCallbackFactory.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMInvertedIndexIOOperationCallbackFactory.java
index a2712d1..fb73d19 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMInvertedIndexIOOperationCallbackFactory.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMInvertedIndexIOOperationCallbackFactory.java
@@ -33,6 +33,7 @@
@Override
public ILSMIOOperationCallback createIoOpCallback(ILSMIndex index) {
- return new LSMInvertedIndexIOOperationCallback(index, getComponentIdGenerator());
+ return new LSMInvertedIndexIOOperationCallback(index, getComponentIdGenerator(),
+ getIndexCheckpointManagerProvider());
}
}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMRTreeIOOperationCallback.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMRTreeIOOperationCallback.java
index bc79074..f3e80ec 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMRTreeIOOperationCallback.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMRTreeIOOperationCallback.java
@@ -19,28 +19,14 @@
package org.apache.asterix.common.ioopcallbacks;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.storage.am.common.api.IMetadataPageManager;
+import org.apache.asterix.common.storage.IIndexCheckpointManagerProvider;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentIdGenerator;
-import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
-import org.apache.hyracks.storage.am.lsm.rtree.impls.LSMRTreeFileManager;
-import org.apache.hyracks.storage.am.rtree.impls.RTree;
public class LSMRTreeIOOperationCallback extends AbstractLSMIOOperationCallback {
- public LSMRTreeIOOperationCallback(ILSMIndex index, ILSMComponentIdGenerator idGenerator) {
- super(index, idGenerator);
- }
-
- @Override
- public long getComponentFileLSNOffset(ILSMDiskComponent diskComponent, String diskComponentFilePath)
- throws HyracksDataException {
- if (diskComponentFilePath.endsWith(LSMRTreeFileManager.RTREE_SUFFIX)) {
- IMetadataPageManager metadataPageManager =
- (IMetadataPageManager) ((RTree) diskComponent.getIndex()).getPageManager();
- return metadataPageManager.getFileOffset(metadataPageManager.createMetadataFrame(), LSN_KEY);
- }
- return INVALID;
+ public LSMRTreeIOOperationCallback(ILSMIndex index, ILSMComponentIdGenerator idGenerator,
+ IIndexCheckpointManagerProvider checkpointManagerProvodier) {
+ super(index, idGenerator, checkpointManagerProvodier);
}
}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMRTreeIOOperationCallbackFactory.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMRTreeIOOperationCallbackFactory.java
index 087aaae..94be0bb 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMRTreeIOOperationCallbackFactory.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMRTreeIOOperationCallbackFactory.java
@@ -33,6 +33,6 @@
@Override
public ILSMIOOperationCallback createIoOpCallback(ILSMIndex index) {
- return new LSMRTreeIOOperationCallback(index, getComponentIdGenerator());
+ return new LSMRTreeIOOperationCallback(index, getComponentIdGenerator(), getIndexCheckpointManagerProvider());
}
}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/DatasetResourceReference.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/DatasetResourceReference.java
index d05321e..c488b65 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/DatasetResourceReference.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/DatasetResourceReference.java
@@ -28,6 +28,7 @@
private int datasetId;
private int partitionId;
+ private long resourceId;
private DatasetResourceReference() {
super();
@@ -53,6 +54,10 @@
return partitionId;
}
+ public long getResourceId() {
+ return resourceId;
+ }
+
private static DatasetResourceReference parse(LocalResource localResource) {
final DatasetResourceReference datasetResourceReference = new DatasetResourceReference();
final String filePath = Paths.get(localResource.getPath(), StorageConstants.METADATA_FILE_NAME).toString();
@@ -73,5 +78,28 @@
final DatasetLocalResource dsResource = (DatasetLocalResource) localResource.getResource();
lrr.datasetId = dsResource.getDatasetId();
lrr.partitionId = dsResource.getPartition();
+ lrr.resourceId = localResource.getId();
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o != null && o instanceof ResourceReference) {
+ ResourceReference that = (ResourceReference) o;
+ return getRelativePath().toString().equals(that.getRelativePath().toString());
+ }
+ return false;
+ }
+
+ @Override
+ public int hashCode() {
+ return getRelativePath().toString().hashCode();
+ }
+
+ @Override
+ public String toString() {
+ return getRelativePath().toString();
}
}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IIndexCheckpointManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IIndexCheckpointManager.java
new file mode 100644
index 0000000..afa3823
--- /dev/null
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IIndexCheckpointManager.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.storage;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public interface IIndexCheckpointManager {
+
+ /**
+ * Initializes the first checkpoint of an index with low watermark {@code lsn}
+ *
+ * @param lsn
+ * @throws HyracksDataException
+ */
+ void init(long lsn) throws HyracksDataException;
+
+ /**
+ * Called when a new LSM disk component is flushed. When called, the index checkpoiint is updated
+ * with the latest valid {@code componentTimestamp} and low watermark {@code lsn}
+ *
+ * @param componentTimestamp
+ * @param lsn
+ * @throws HyracksDataException
+ */
+ void flushed(String componentTimestamp, long lsn) throws HyracksDataException;
+
+ /**
+ * Called when a new LSM disk component is replicated from master. When called, the index checkpoiint is updated
+ * with the latest valid {@code componentTimestamp} and the local lsn mapping of {@code masterLsn} is set as the
+ * new low watermark.
+ *
+ * @param componentTimestamp
+ * @param masterLsn
+ * @throws HyracksDataException
+ */
+ void replicated(String componentTimestamp, long masterLsn) throws HyracksDataException;
+
+ /**
+ * Called when a flush log is received and replicated from master. The mapping between
+ * {@code masterLsn} and {@code localLsn} is updated in the checkpoint.
+ *
+ * @param masterLsn
+ * @param localLsn
+ * @throws HyracksDataException
+ */
+ void masterFlush(long masterLsn, long localLsn) throws HyracksDataException;
+
+ /**
+ * The index low watermark
+ *
+ * @return The low watermark
+ * @throws HyracksDataException
+ */
+ long getLowWatermark() throws HyracksDataException;
+
+ /**
+ * True if a mapping exists between {@code masterLsn} and a localLsn. Otherwise false.
+ *
+ * @param masterLsn
+ * @return True if the mapping exists. Otherwise false.
+ * @throws HyracksDataException
+ */
+ boolean isFlushed(long masterLsn) throws HyracksDataException;
+
+ /**
+ * Advance the index low watermark to {@code lsn}
+ *
+ * @param lsn
+ * @throws HyracksDataException
+ */
+ void advanceLowWatermark(long lsn) throws HyracksDataException;
+
+ /**
+ * Deletes all checkpoints
+ */
+ void delete();
+}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IIndexCheckpointManagerProvider.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IIndexCheckpointManagerProvider.java
new file mode 100644
index 0000000..e6cef57
--- /dev/null
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IIndexCheckpointManagerProvider.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.storage;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public interface IIndexCheckpointManagerProvider {
+
+ /**
+ * Gets {@link IIndexCheckpointManager} for the index referenced by {@code ref}
+ *
+ * @param ref
+ * @return The index checkpoint manager.
+ * @throws HyracksDataException
+ */
+ IIndexCheckpointManager get(ResourceReference ref) throws HyracksDataException;
+
+ /**
+ * Closes any resources used by the index checkpoint manager referenced by {@code ref}
+ *
+ * @param ref
+ */
+ void close(ResourceReference ref);
+}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IndexCheckpoint.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IndexCheckpoint.java
new file mode 100644
index 0000000..6e845e1
--- /dev/null
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IndexCheckpoint.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.storage;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+public class IndexCheckpoint {
+
+ private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+ private static final long INITIAL_CHECKPOINT_ID = 0;
+ private long id;
+ private String validComponentTimestamp;
+ private long lowWatermark;
+ private Map<Long, Long> masterNodeFlushMap;
+
+ public static IndexCheckpoint first(long lowWatermark) {
+ IndexCheckpoint firstCheckpoint = new IndexCheckpoint();
+ firstCheckpoint.id = INITIAL_CHECKPOINT_ID;
+ firstCheckpoint.lowWatermark = lowWatermark;
+ firstCheckpoint.validComponentTimestamp = null;
+ firstCheckpoint.masterNodeFlushMap = new HashMap<>();
+ return firstCheckpoint;
+ }
+
+ public static IndexCheckpoint next(IndexCheckpoint latest, long lowWatermark, String validComponentTimestamp) {
+ if (lowWatermark < latest.getLowWatermark()) {
+ throw new IllegalStateException("Low watermark should always be increasing");
+ }
+ IndexCheckpoint next = new IndexCheckpoint();
+ next.id = latest.getId() + 1;
+ next.lowWatermark = lowWatermark;
+ next.validComponentTimestamp = validComponentTimestamp;
+ next.masterNodeFlushMap = latest.getMasterNodeFlushMap();
+ // remove any lsn from the map that wont be used anymore
+ next.masterNodeFlushMap.values().removeIf(lsn -> lsn <= lowWatermark);
+ return next;
+ }
+
+ @JsonCreator
+ private IndexCheckpoint() {
+ }
+
+ public String getValidComponentTimestamp() {
+ return validComponentTimestamp;
+ }
+
+ public long getLowWatermark() {
+ return lowWatermark;
+ }
+
+ public Map<Long, Long> getMasterNodeFlushMap() {
+ return masterNodeFlushMap;
+ }
+
+ public long getId() {
+ return id;
+ }
+
+ public String asJson() throws HyracksDataException {
+ try {
+ return OBJECT_MAPPER.writeValueAsString(this);
+ } catch (JsonProcessingException e) {
+ throw HyracksDataException.create(e);
+ }
+ }
+
+ public static IndexCheckpoint fromJson(String json) throws HyracksDataException {
+ try {
+ return OBJECT_MAPPER.readValue(json, IndexCheckpoint.class);
+ } catch (IOException e) {
+ throw HyracksDataException.create(e);
+ }
+ }
+}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/ResourceReference.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/ResourceReference.java
index bd057fa..4aa6982 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/ResourceReference.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/ResourceReference.java
@@ -107,4 +107,26 @@
ref.root = tokens[--offset];
ref.rebalance = String.valueOf(0);
}
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o != null && o instanceof ResourceReference) {
+ ResourceReference that = (ResourceReference) o;
+ return getRelativePath().toString().equals(that.getRelativePath().toString());
+ }
+ return false;
+ }
+
+ @Override
+ public int hashCode() {
+ return getRelativePath().toString().hashCode();
+ }
+
+ @Override
+ public String toString() {
+ return getRelativePath().toString();
+ }
}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StorageConstants.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StorageConstants.java
index 6262f71..220b089 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StorageConstants.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StorageConstants.java
@@ -19,6 +19,7 @@
package org.apache.asterix.common.utils;
import org.apache.hyracks.storage.am.common.api.ITreeIndexFrame;
+import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndexFileManager;
/**
* A static class that stores storage constants
@@ -27,6 +28,12 @@
public static final String STORAGE_ROOT_DIR_NAME = "storage";
public static final String PARTITION_DIR_PREFIX = "partition_";
+ /**
+ * Any file that shares the same directory as the LSM index files must
+ * begin with ".". Otherwise {@link AbstractLSMIndexFileManager} will try to
+ * use them as index files.
+ */
+ public static final String INDEX_CHECKPOINT_FILE_PREFIX = ".idx_checkpoint_";
public static final String METADATA_FILE_NAME = ".metadata";
public static final String LEGACY_DATASET_INDEX_NAME_SEPARATOR = "_idx_";
diff --git a/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/context/CorrelatedPrefixMergePolicyTest.java b/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/context/CorrelatedPrefixMergePolicyTest.java
index 2928d90..3aa7b17 100644
--- a/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/context/CorrelatedPrefixMergePolicyTest.java
+++ b/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/context/CorrelatedPrefixMergePolicyTest.java
@@ -41,6 +41,7 @@
import org.apache.hyracks.storage.am.lsm.common.api.ILSMMergePolicy;
import org.apache.hyracks.storage.am.lsm.common.impls.LSMComponentId;
import org.apache.hyracks.storage.common.IIndexAccessParameters;
+import org.apache.hyracks.storage.common.LocalResource;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;
@@ -237,8 +238,8 @@
Mockito.when(index.createAccessor(Mockito.any(IIndexAccessParameters.class))).thenReturn(accessor);
Mockito.when(index.isPrimaryIndex()).thenReturn(isPrimary);
-
- return new IndexInfo(index, DATASET_ID, 0, partition);
+ final LocalResource localResource = Mockito.mock(LocalResource.class);
+ return new IndexInfo(index, DATASET_ID, localResource, partition);
}
}
diff --git a/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/AbstractLSMIOOperationCallbackTest.java b/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/AbstractLSMIOOperationCallbackTest.java
index 0f2ea50..1c57d1b 100644
--- a/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/AbstractLSMIOOperationCallbackTest.java
+++ b/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/AbstractLSMIOOperationCallbackTest.java
@@ -21,6 +21,8 @@
import org.apache.asterix.common.ioopcallbacks.AbstractLSMIOOperationCallback;
import org.apache.asterix.common.ioopcallbacks.LSMBTreeIOOperationCallback;
+import org.apache.asterix.common.storage.IIndexCheckpointManager;
+import org.apache.asterix.common.storage.IIndexCheckpointManagerProvider;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentId;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentIdGenerator;
@@ -45,8 +47,8 @@
ILSMIndex mockIndex = Mockito.mock(ILSMIndex.class);
Mockito.when(mockIndex.getNumberOfAllMemoryComponents()).thenReturn(2);
Mockito.when(mockIndex.getCurrentMemoryComponent()).thenReturn(Mockito.mock(AbstractLSMMemoryComponent.class));
- LSMBTreeIOOperationCallback callback =
- new LSMBTreeIOOperationCallback(mockIndex, new LSMComponentIdGenerator());
+ LSMBTreeIOOperationCallback callback = new LSMBTreeIOOperationCallback(mockIndex, new LSMComponentIdGenerator(),
+ mockIndexCheckpointManagerProvider());
//request to flush first component
callback.updateLastLSN(1);
@@ -57,13 +59,14 @@
callback.beforeOperation(LSMIOOperationType.FLUSH);
Assert.assertEquals(1, callback.getComponentLSN(null));
- callback.afterOperation(LSMIOOperationType.FLUSH, null, mockDiskComponent());
- callback.afterFinalize(LSMIOOperationType.FLUSH, mockDiskComponent());
+ final ILSMDiskComponent diskComponent1 = mockDiskComponent();
+ callback.afterOperation(LSMIOOperationType.FLUSH, null, diskComponent1);
+ callback.afterFinalize(LSMIOOperationType.FLUSH, diskComponent1);
Assert.assertEquals(2, callback.getComponentLSN(null));
-
- callback.afterOperation(LSMIOOperationType.FLUSH, null, mockDiskComponent());
- callback.afterFinalize(LSMIOOperationType.FLUSH, mockDiskComponent());
+ final ILSMDiskComponent diskComponent2 = mockDiskComponent();
+ callback.afterOperation(LSMIOOperationType.FLUSH, null, diskComponent2);
+ callback.afterFinalize(LSMIOOperationType.FLUSH, diskComponent2);
}
@Test
@@ -72,8 +75,8 @@
Mockito.when(mockIndex.getNumberOfAllMemoryComponents()).thenReturn(2);
Mockito.when(mockIndex.getCurrentMemoryComponent()).thenReturn(Mockito.mock(AbstractLSMMemoryComponent.class));
- LSMBTreeIOOperationCallback callback =
- new LSMBTreeIOOperationCallback(mockIndex, new LSMComponentIdGenerator());
+ LSMBTreeIOOperationCallback callback = new LSMBTreeIOOperationCallback(mockIndex, new LSMComponentIdGenerator(),
+ mockIndexCheckpointManagerProvider());
//request to flush first component
callback.updateLastLSN(1);
@@ -90,11 +93,13 @@
//the scheduleFlush request would fail this time
Assert.assertEquals(1, callback.getComponentLSN(null));
- callback.afterOperation(LSMIOOperationType.FLUSH, null, mockDiskComponent());
- callback.afterFinalize(LSMIOOperationType.FLUSH, mockDiskComponent());
-
+ final ILSMDiskComponent diskComponent1 = mockDiskComponent();
+ callback.afterOperation(LSMIOOperationType.FLUSH, null, diskComponent1);
+ callback.afterFinalize(LSMIOOperationType.FLUSH, diskComponent1);
+ final ILSMDiskComponent diskComponent2 = mockDiskComponent();
Assert.assertEquals(2, callback.getComponentLSN(null));
- callback.afterFinalize(LSMIOOperationType.FLUSH, mockDiskComponent());
+ callback.afterOperation(LSMIOOperationType.FLUSH, null, diskComponent2);
+ callback.afterFinalize(LSMIOOperationType.FLUSH, diskComponent2);
}
@Test
@@ -103,9 +108,8 @@
Mockito.when(mockIndex.getNumberOfAllMemoryComponents()).thenReturn(2);
Mockito.when(mockIndex.getCurrentMemoryComponent()).thenReturn(Mockito.mock(AbstractLSMMemoryComponent.class));
- LSMBTreeIOOperationCallback callback =
- new LSMBTreeIOOperationCallback(mockIndex, new LSMComponentIdGenerator());
-
+ LSMBTreeIOOperationCallback callback = new LSMBTreeIOOperationCallback(mockIndex, new LSMComponentIdGenerator(),
+ mockIndexCheckpointManagerProvider());
//request to flush first component
callback.updateLastLSN(1);
callback.beforeOperation(LSMIOOperationType.FLUSH);
@@ -144,7 +148,8 @@
ILSMMemoryComponent mockComponent = Mockito.mock(AbstractLSMMemoryComponent.class);
Mockito.when(mockIndex.getCurrentMemoryComponent()).thenReturn(mockComponent);
- LSMBTreeIOOperationCallback callback = new LSMBTreeIOOperationCallback(mockIndex, idGenerator);
+ LSMBTreeIOOperationCallback callback =
+ new LSMBTreeIOOperationCallback(mockIndex, idGenerator, mockIndexCheckpointManagerProvider());
ILSMComponentId initialId = idGenerator.getId();
// simulate a partition is flushed before allocated
@@ -162,7 +167,8 @@
Mockito.when(mockIndex.getNumberOfAllMemoryComponents()).thenReturn(2);
ILSMMemoryComponent mockComponent = Mockito.mock(AbstractLSMMemoryComponent.class);
Mockito.when(mockIndex.getCurrentMemoryComponent()).thenReturn(mockComponent);
- LSMBTreeIOOperationCallback callback = new LSMBTreeIOOperationCallback(mockIndex, idGenerator);
+ LSMBTreeIOOperationCallback callback =
+ new LSMBTreeIOOperationCallback(mockIndex, idGenerator, mockIndexCheckpointManagerProvider());
ILSMComponentId id = idGenerator.getId();
callback.allocated(mockComponent);
@@ -178,8 +184,9 @@
callback.beforeOperation(LSMIOOperationType.FLUSH);
callback.recycled(mockComponent, true);
- callback.afterOperation(LSMIOOperationType.FLUSH, null, mockDiskComponent());
- callback.afterFinalize(LSMIOOperationType.FLUSH, mockDiskComponent());
+ final ILSMDiskComponent diskComponent = mockDiskComponent();
+ callback.afterOperation(LSMIOOperationType.FLUSH, null, diskComponent);
+ callback.afterFinalize(LSMIOOperationType.FLUSH, diskComponent);
checkMemoryComponent(expectedId, mockComponent);
}
}
@@ -191,7 +198,8 @@
Mockito.when(mockIndex.getNumberOfAllMemoryComponents()).thenReturn(2);
ILSMMemoryComponent mockComponent = Mockito.mock(AbstractLSMMemoryComponent.class);
Mockito.when(mockIndex.getCurrentMemoryComponent()).thenReturn(mockComponent);
- LSMBTreeIOOperationCallback callback = new LSMBTreeIOOperationCallback(mockIndex, idGenerator);
+ LSMBTreeIOOperationCallback callback =
+ new LSMBTreeIOOperationCallback(mockIndex, idGenerator, mockIndexCheckpointManagerProvider());
ILSMComponentId id = idGenerator.getId();
callback.allocated(mockComponent);
@@ -216,7 +224,8 @@
ILSMMemoryComponent mockComponent = Mockito.mock(AbstractLSMMemoryComponent.class);
Mockito.when(mockIndex.getCurrentMemoryComponent()).thenReturn(mockComponent);
Mockito.when(mockIndex.getNumberOfAllMemoryComponents()).thenReturn(2);
- LSMBTreeIOOperationCallback callback = new LSMBTreeIOOperationCallback(mockIndex, idGenerator);
+ LSMBTreeIOOperationCallback callback =
+ new LSMBTreeIOOperationCallback(mockIndex, idGenerator, mockIndexCheckpointManagerProvider());
ILSMComponentId id = idGenerator.getId();
callback.allocated(mockComponent);
@@ -230,7 +239,9 @@
callback.updateLastLSN(0);
callback.beforeOperation(LSMIOOperationType.FLUSH);
- callback.afterFinalize(LSMIOOperationType.FLUSH, Mockito.mock(ILSMDiskComponent.class));
+ final ILSMDiskComponent diskComponent = mockDiskComponent();
+ callback.afterOperation(LSMIOOperationType.FLUSH, null, diskComponent);
+ callback.afterFinalize(LSMIOOperationType.FLUSH, diskComponent);
// another flush is to be scheduled before the component is recycled
idGenerator.refresh();
@@ -243,7 +254,9 @@
// schedule the next flush
callback.updateLastLSN(0);
callback.beforeOperation(LSMIOOperationType.FLUSH);
- callback.afterFinalize(LSMIOOperationType.FLUSH, Mockito.mock(ILSMDiskComponent.class));
+ final ILSMDiskComponent diskComponent2 = mockDiskComponent();
+ callback.afterOperation(LSMIOOperationType.FLUSH, null, diskComponent2);
+ callback.afterFinalize(LSMIOOperationType.FLUSH, diskComponent2);
callback.recycled(mockComponent, true);
checkMemoryComponent(nextId, mockComponent);
}
@@ -263,5 +276,14 @@
return component;
}
- protected abstract AbstractLSMIOOperationCallback getIoCallback();
+ protected IIndexCheckpointManagerProvider mockIndexCheckpointManagerProvider() throws HyracksDataException {
+ IIndexCheckpointManagerProvider indexCheckpointManagerProvider =
+ Mockito.mock(IIndexCheckpointManagerProvider.class);
+ IIndexCheckpointManager indexCheckpointManager = Mockito.mock(IIndexCheckpointManager.class);
+ Mockito.doNothing().when(indexCheckpointManager).flushed(Mockito.any(), Mockito.anyLong());
+ Mockito.doReturn(indexCheckpointManager).when(indexCheckpointManagerProvider).get(Mockito.any());
+ return indexCheckpointManagerProvider;
+ }
+
+ protected abstract AbstractLSMIOOperationCallback getIoCallback() throws HyracksDataException;
}
diff --git a/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/LSMBTreeIOOperationCallbackTest.java b/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/LSMBTreeIOOperationCallbackTest.java
index c22e2e3..a4bc399 100644
--- a/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/LSMBTreeIOOperationCallbackTest.java
+++ b/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/LSMBTreeIOOperationCallbackTest.java
@@ -21,6 +21,7 @@
import org.apache.asterix.common.ioopcallbacks.AbstractLSMIOOperationCallback;
import org.apache.asterix.common.ioopcallbacks.LSMBTreeIOOperationCallback;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
import org.apache.hyracks.storage.am.lsm.common.impls.LSMComponentIdGenerator;
import org.mockito.Mockito;
@@ -28,10 +29,11 @@
public class LSMBTreeIOOperationCallbackTest extends AbstractLSMIOOperationCallbackTest {
@Override
- protected AbstractLSMIOOperationCallback getIoCallback() {
+ protected AbstractLSMIOOperationCallback getIoCallback() throws HyracksDataException {
ILSMIndex mockIndex = Mockito.mock(ILSMIndex.class);
Mockito.when(mockIndex.getNumberOfAllMemoryComponents()).thenReturn(2);
- return new LSMBTreeIOOperationCallback(mockIndex, new LSMComponentIdGenerator());
+ return new LSMBTreeIOOperationCallback(mockIndex, new LSMComponentIdGenerator(),
+ mockIndexCheckpointManagerProvider());
}
}
diff --git a/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/LSMBTreeWithBuddyIOOperationCallbackTest.java b/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/LSMBTreeWithBuddyIOOperationCallbackTest.java
index 356c80a..5f37c78 100644
--- a/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/LSMBTreeWithBuddyIOOperationCallbackTest.java
+++ b/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/LSMBTreeWithBuddyIOOperationCallbackTest.java
@@ -21,6 +21,7 @@
import org.apache.asterix.common.ioopcallbacks.AbstractLSMIOOperationCallback;
import org.apache.asterix.common.ioopcallbacks.LSMBTreeWithBuddyIOOperationCallback;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
import org.apache.hyracks.storage.am.lsm.common.impls.LSMComponentIdGenerator;
import org.mockito.Mockito;
@@ -28,10 +29,11 @@
public class LSMBTreeWithBuddyIOOperationCallbackTest extends AbstractLSMIOOperationCallbackTest {
@Override
- protected AbstractLSMIOOperationCallback getIoCallback() {
+ protected AbstractLSMIOOperationCallback getIoCallback() throws HyracksDataException {
ILSMIndex mockIndex = Mockito.mock(ILSMIndex.class);
Mockito.when(mockIndex.getNumberOfAllMemoryComponents()).thenReturn(2);
- return new LSMBTreeWithBuddyIOOperationCallback(mockIndex, new LSMComponentIdGenerator());
+ return new LSMBTreeWithBuddyIOOperationCallback(mockIndex, new LSMComponentIdGenerator(),
+ mockIndexCheckpointManagerProvider());
}
}
diff --git a/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/LSMInvertedIndexIOOperationCallbackTest.java b/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/LSMInvertedIndexIOOperationCallbackTest.java
index ac4595e..343bc59 100644
--- a/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/LSMInvertedIndexIOOperationCallbackTest.java
+++ b/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/LSMInvertedIndexIOOperationCallbackTest.java
@@ -21,6 +21,7 @@
import org.apache.asterix.common.ioopcallbacks.AbstractLSMIOOperationCallback;
import org.apache.asterix.common.ioopcallbacks.LSMInvertedIndexIOOperationCallback;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
import org.apache.hyracks.storage.am.lsm.common.impls.LSMComponentIdGenerator;
import org.mockito.Mockito;
@@ -28,10 +29,11 @@
public class LSMInvertedIndexIOOperationCallbackTest extends AbstractLSMIOOperationCallbackTest {
@Override
- protected AbstractLSMIOOperationCallback getIoCallback() {
+ protected AbstractLSMIOOperationCallback getIoCallback() throws HyracksDataException {
ILSMIndex mockIndex = Mockito.mock(ILSMIndex.class);
Mockito.when(mockIndex.getNumberOfAllMemoryComponents()).thenReturn(2);
- return new LSMInvertedIndexIOOperationCallback(mockIndex, new LSMComponentIdGenerator());
+ return new LSMInvertedIndexIOOperationCallback(mockIndex, new LSMComponentIdGenerator(),
+ mockIndexCheckpointManagerProvider());
}
}
diff --git a/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/LSMRTreeIOOperationCallbackTest.java b/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/LSMRTreeIOOperationCallbackTest.java
index 0131e3f..10d95d8 100644
--- a/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/LSMRTreeIOOperationCallbackTest.java
+++ b/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/LSMRTreeIOOperationCallbackTest.java
@@ -21,6 +21,7 @@
import org.apache.asterix.common.ioopcallbacks.AbstractLSMIOOperationCallback;
import org.apache.asterix.common.ioopcallbacks.LSMRTreeIOOperationCallback;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
import org.apache.hyracks.storage.am.lsm.common.impls.LSMComponentIdGenerator;
import org.mockito.Mockito;
@@ -28,10 +29,11 @@
public class LSMRTreeIOOperationCallbackTest extends AbstractLSMIOOperationCallbackTest {
@Override
- protected AbstractLSMIOOperationCallback getIoCallback() {
+ protected AbstractLSMIOOperationCallback getIoCallback() throws HyracksDataException {
ILSMIndex mockIndex = Mockito.mock(ILSMIndex.class);
Mockito.when(mockIndex.getNumberOfAllMemoryComponents()).thenReturn(2);
- return new LSMRTreeIOOperationCallback(mockIndex, new LSMComponentIdGenerator());
+ return new LSMRTreeIOOperationCallback(mockIndex, new LSMComponentIdGenerator(),
+ mockIndexCheckpointManagerProvider());
}
}
diff --git a/asterixdb/asterix-installer/src/test/java/org/apache/asterix/installer/test/AsterixLifecycleIT.java b/asterixdb/asterix-installer/src/test/java/org/apache/asterix/installer/test/AsterixLifecycleIT.java
index 814e109..1434629 100644
--- a/asterixdb/asterix-installer/src/test/java/org/apache/asterix/installer/test/AsterixLifecycleIT.java
+++ b/asterixdb/asterix-installer/src/test/java/org/apache/asterix/installer/test/AsterixLifecycleIT.java
@@ -19,43 +19,18 @@
package org.apache.asterix.installer.test;
import java.io.File;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-import java.util.logging.Logger;
-import org.apache.asterix.event.error.VerificationUtil;
-import org.apache.asterix.event.model.AsterixInstance;
import org.apache.asterix.event.model.AsterixInstance.State;
-import org.apache.asterix.event.model.AsterixRuntimeState;
-import org.apache.asterix.event.service.ServiceProvider;
-import org.apache.asterix.installer.command.CommandHandler;
-import org.apache.asterix.test.common.TestExecutor;
-import org.apache.asterix.testframework.context.TestCaseContext;
import org.junit.AfterClass;
import org.junit.BeforeClass;
-import org.junit.FixMethodOrder;
-import org.junit.Test;
-import org.junit.runners.MethodSorters;
-import org.junit.runners.Parameterized.Parameters;
-
-@FixMethodOrder(MethodSorters.NAME_ASCENDING)
public class AsterixLifecycleIT {
- private static final int NUM_NC = 2;
- private static final CommandHandler cmdHandler = new CommandHandler();
- private static final String PATH_BASE = "src/test/resources/integrationts/lifecycle";
private static final String PATH_ACTUAL = "target" + File.separator + "ittest" + File.separator;
- private static final Logger LOGGER = Logger.getLogger(AsterixLifecycleIT.class.getName());
- private static List<TestCaseContext> testCaseCollection;
- private final TestExecutor testExecutor = new TestExecutor();
@BeforeClass
public static void setUp() throws Exception {
AsterixInstallerIntegrationUtil.init(AsterixInstallerIntegrationUtil.LOCAL_CLUSTER_PATH);
- TestCaseContext.Builder b = new TestCaseContext.Builder();
- testCaseCollection = b.build(new File(PATH_BASE));
File outdir = new File(PATH_ACTUAL);
outdir.mkdirs();
}
@@ -70,88 +45,8 @@
}
}
- @Parameters
- public static Collection<Object[]> tests() throws Exception {
- Collection<Object[]> testArgs = new ArrayList<>();
- return testArgs;
- }
-
public static void restartInstance() throws Exception {
AsterixInstallerIntegrationUtil.transformIntoRequiredState(State.INACTIVE);
AsterixInstallerIntegrationUtil.transformIntoRequiredState(State.ACTIVE);
}
-
- @Test
- public void test_1_StopActiveInstance() throws Exception {
- try {
- LOGGER.info("Starting test: test_1_StopActiveInstance");
- AsterixInstallerIntegrationUtil.transformIntoRequiredState(State.ACTIVE);
- String command = "stop -n " + AsterixInstallerIntegrationUtil.ASTERIX_INSTANCE_NAME;
- cmdHandler.processCommand(command.split(" "));
- Thread.sleep(4000);
- AsterixInstance instance = ServiceProvider.INSTANCE.getLookupService()
- .getAsterixInstance(AsterixInstallerIntegrationUtil.ASTERIX_INSTANCE_NAME);
- AsterixRuntimeState state = VerificationUtil.getAsterixRuntimeState(instance);
- assert (state.getFailedNCs().size() == NUM_NC && !state.isCcRunning());
- LOGGER.info("PASSED: test_1_StopActiveInstance");
- } catch (Exception e) {
- throw new Exception("Test configure installer " + "\" FAILED!", e);
- }
- }
-
- @Test
- public void test_2_StartActiveInstance() throws Exception {
- try {
- LOGGER.info("Starting test: test_2_StartActiveInstance");
- AsterixInstallerIntegrationUtil.transformIntoRequiredState(State.INACTIVE);
- String command = "start -n " + AsterixInstallerIntegrationUtil.ASTERIX_INSTANCE_NAME;
- cmdHandler.processCommand(command.split(" "));
- AsterixInstance instance = ServiceProvider.INSTANCE.getLookupService()
- .getAsterixInstance(AsterixInstallerIntegrationUtil.ASTERIX_INSTANCE_NAME);
- AsterixRuntimeState state = VerificationUtil.getAsterixRuntimeState(instance);
- assert (state.getFailedNCs().size() == 0 && state.isCcRunning());
- LOGGER.info("PASSED: test_2_StartActiveInstance");
- } catch (Exception e) {
- throw new Exception("Test configure installer " + "\" FAILED!", e);
- }
- }
-
- @Test
- public void test_3_DeleteActiveInstance() throws Exception {
- try {
- LOGGER.info("Starting test: test_3_DeleteActiveInstance");
- AsterixInstallerIntegrationUtil.transformIntoRequiredState(State.INACTIVE);
- String command = "delete -n " + AsterixInstallerIntegrationUtil.ASTERIX_INSTANCE_NAME;
- cmdHandler.processCommand(command.split(" "));
- AsterixInstance instance = ServiceProvider.INSTANCE.getLookupService()
- .getAsterixInstance(AsterixInstallerIntegrationUtil.ASTERIX_INSTANCE_NAME);
- assert (instance == null);
- LOGGER.info("PASSED: test_3_DeleteActiveInstance");
- } catch (Exception e) {
- throw new Exception("Test delete active instance " + "\" FAILED!", e);
- } finally {
- // recreate instance
- AsterixInstallerIntegrationUtil.createInstance();
- }
- }
-
- @Test
- public void test() throws Exception {
- for (TestCaseContext testCaseCtx : testCaseCollection) {
- testExecutor.executeTest(PATH_ACTUAL, testCaseCtx, null, false);
- }
- }
-
- public static void main(String[] args) throws Exception {
- try {
- setUp();
- new AsterixLifecycleIT().test();
- } catch (Exception e) {
- e.printStackTrace();
- LOGGER.info("TEST CASE(S) FAILED");
- } finally {
- tearDown();
- }
- }
-
}
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/logging/RemoteLogMapping.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/logging/RemoteLogMapping.java
index 3b2aff7..0444952 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/logging/RemoteLogMapping.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/logging/RemoteLogMapping.java
@@ -27,10 +27,6 @@
private long localLSN;
public AtomicInteger numOfFlushedIndexes = new AtomicInteger();
- public String getRemoteNodeID() {
- return remoteNodeID;
- }
-
public void setRemoteNodeID(String remoteNodeID) {
this.remoteNodeID = remoteNodeID;
}
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationChannel.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationChannel.java
index 3143284..dfc23d3 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationChannel.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationChannel.java
@@ -27,9 +27,6 @@
import java.nio.channels.FileChannel;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
@@ -39,13 +36,17 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
+import java.util.function.Predicate;
import java.util.logging.Level;
import java.util.logging.Logger;
+import java.util.stream.Collectors;
import org.apache.asterix.common.api.IDatasetLifecycleManager;
+import org.apache.asterix.common.api.INcApplicationContext;
import org.apache.asterix.common.cluster.ClusterPartition;
import org.apache.asterix.common.config.ReplicationProperties;
import org.apache.asterix.common.context.IndexInfo;
+import org.apache.asterix.common.dataflow.DatasetLocalResource;
import org.apache.asterix.common.exceptions.ACIDException;
import org.apache.asterix.common.ioopcallbacks.AbstractLSMIOOperationCallback;
import org.apache.asterix.common.replication.IReplicaResourcesManager;
@@ -55,6 +56,9 @@
import org.apache.asterix.common.replication.IReplicationThread;
import org.apache.asterix.common.replication.ReplicaEvent;
import org.apache.asterix.common.storage.DatasetResourceReference;
+import org.apache.asterix.common.storage.IIndexCheckpointManager;
+import org.apache.asterix.common.storage.IIndexCheckpointManagerProvider;
+import org.apache.asterix.common.storage.ResourceReference;
import org.apache.asterix.common.transactions.IAppRuntimeContextProvider;
import org.apache.asterix.common.transactions.ILogManager;
import org.apache.asterix.common.transactions.LogRecord;
@@ -72,9 +76,13 @@
import org.apache.asterix.replication.storage.LSMIndexFileProperties;
import org.apache.asterix.replication.storage.ReplicaResourcesManager;
import org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository;
+import org.apache.asterix.transaction.management.service.logging.LogBuffer;
import org.apache.hyracks.api.application.INCServiceContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.storage.am.lsm.common.api.LSMOperationType;
import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndex;
+import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndexFileManager;
+import org.apache.hyracks.storage.common.LocalResource;
import org.apache.hyracks.util.StorageUtil;
import org.apache.hyracks.util.StorageUtil.StorageUnit;
@@ -89,7 +97,6 @@
private final String localNodeID;
private final ILogManager logManager;
private final ReplicaResourcesManager replicaResourcesManager;
- private SocketChannel socketChannel = null;
private ServerSocketChannel serverSocketChannel = null;
private final IReplicationManager replicationManager;
private final ReplicationProperties replicationProperties;
@@ -98,6 +105,7 @@
private final LinkedBlockingQueue<LSMComponentLSNSyncTask> lsmComponentRemoteLSN2LocalLSNMappingTaskQ;
private final LinkedBlockingQueue<LogRecord> pendingNotificationRemoteLogsQ;
private final Map<String, LSMComponentProperties> lsmComponentId2PropertiesMap;
+ private final Map<Long, RemoteLogMapping> localLsn2RemoteMapping;
private final Map<String, RemoteLogMapping> replicaUniqueLSN2RemoteMapping;
private final LSMComponentsSyncService lsmComponentLSNMappingService;
private final Set<Integer> nodeHostedPartitions;
@@ -105,6 +113,7 @@
private final Object flushLogslock = new Object();
private final IDatasetLifecycleManager dsLifecycleManager;
private final PersistentLocalResourceRepository localResourceRep;
+ private final IIndexCheckpointManagerProvider indexCheckpointManagerProvider;
public ReplicationChannel(String nodeId, ReplicationProperties replicationProperties, ILogManager logManager,
IReplicaResourcesManager replicaResoucesManager, IReplicationManager replicationManager,
@@ -122,6 +131,7 @@
pendingNotificationRemoteLogsQ = new LinkedBlockingQueue<>();
lsmComponentId2PropertiesMap = new ConcurrentHashMap<>();
replicaUniqueLSN2RemoteMapping = new ConcurrentHashMap<>();
+ localLsn2RemoteMapping = new ConcurrentHashMap<>();
lsmComponentLSNMappingService = new LSMComponentsSyncService();
replicationNotifier = new ReplicationNotifier();
replicationThreads = Executors.newCachedThreadPool(ncServiceContext.getThreadFactory());
@@ -136,6 +146,8 @@
}
nodeHostedPartitions = new HashSet<>(clientsPartitions.size());
nodeHostedPartitions.addAll(clientsPartitions);
+ this.indexCheckpointManagerProvider =
+ ((INcApplicationContext) ncServiceContext.getApplicationContext()).getIndexCheckpointManagerProvider();
}
@Override
@@ -156,7 +168,7 @@
//start accepting replication requests
while (true) {
- socketChannel = serverSocketChannel.accept();
+ SocketChannel socketChannel = serverSocketChannel.accept();
socketChannel.configureBlocking(true);
//start a new thread to handle the request
replicationThreads.execute(new ReplicationThread(socketChannel));
@@ -349,16 +361,19 @@
}
if (afp.isLSMComponentFile()) {
String componentId = LSMComponentProperties.getLSMComponentID(afp.getFilePath());
- if (afp.getLSNByteOffset() > AbstractLSMIOOperationCallback.INVALID) {
- LSMComponentLSNSyncTask syncTask = new LSMComponentLSNSyncTask(componentId,
- destFile.getAbsolutePath(), afp.getLSNByteOffset());
+ final LSMComponentProperties lsmComponentProperties = lsmComponentId2PropertiesMap.get(componentId);
+ // merge operations do not generate flush logs
+ if (afp.requiresAck() && lsmComponentProperties.getOpType() == LSMOperationType.FLUSH) {
+ LSMComponentLSNSyncTask syncTask =
+ new LSMComponentLSNSyncTask(componentId, destFile.getAbsolutePath());
lsmComponentRemoteLSN2LocalLSNMappingTaskQ.offer(syncTask);
} else {
updateLSMComponentRemainingFiles(componentId);
}
} else {
//index metadata file
- replicaResourcesManager.initializeReplicaIndexLSNMap(indexPath, logManager.getAppendLSN());
+ final ResourceReference indexRef = ResourceReference.of(destFile.getAbsolutePath());
+ indexCheckpointManagerProvider.get(indexRef).init(logManager.getAppendLSN());
}
}
}
@@ -402,8 +417,7 @@
try (RandomAccessFile fromFile = new RandomAccessFile(filePath, "r");
FileChannel fileChannel = fromFile.getChannel();) {
long fileSize = fileChannel.size();
- fileProperties.initialize(filePath, fileSize, partition.getNodeId(), false,
- AbstractLSMIOOperationCallback.INVALID, false);
+ fileProperties.initialize(filePath, fileSize, partition.getNodeId(), false, false);
outBuffer = ReplicationProtocol.writeFileReplicationRequest(outBuffer, fileProperties,
ReplicationRequestType.REPLICATE_FILE);
@@ -462,7 +476,7 @@
switch (remoteLog.getLogType()) {
case LogType.UPDATE:
case LogType.ENTITY_COMMIT:
- //if the log partition belongs to a partitions hosted on this node, replicate it
+ //if the log partition belongs to a partitions hosted on this node, replicated it
if (nodeHostedPartitions.contains(remoteLog.getResourcePartition())) {
logManager.log(remoteLog);
}
@@ -479,13 +493,21 @@
case LogType.FLUSH:
//store mapping information for flush logs to use them in incoming LSM components.
RemoteLogMapping flushLogMap = new RemoteLogMapping();
+ LogRecord flushLog = new LogRecord();
+ TransactionUtil.formFlushLogRecord(flushLog, remoteLog.getDatasetId(), null,
+ remoteLog.getNodeId(), remoteLog.getNumOfFlushedIndexes());
+ flushLog.setReplicationThread(this);
+ flushLog.setLogSource(LogSource.REMOTE);
flushLogMap.setRemoteNodeID(remoteLog.getNodeId());
flushLogMap.setRemoteLSN(remoteLog.getLSN());
- logManager.log(remoteLog);
- //the log LSN value is updated by logManager.log(.) to a local value
- flushLogMap.setLocalLSN(remoteLog.getLSN());
- flushLogMap.numOfFlushedIndexes.set(remoteLog.getNumOfFlushedIndexes());
- replicaUniqueLSN2RemoteMapping.put(flushLogMap.getNodeUniqueLSN(), flushLogMap);
+ synchronized (localLsn2RemoteMapping) {
+ logManager.log(flushLog);
+ //the log LSN value is updated by logManager.log(.) to a local value
+ flushLogMap.setLocalLSN(flushLog.getLSN());
+ flushLogMap.numOfFlushedIndexes.set(flushLog.getNumOfFlushedIndexes());
+ replicaUniqueLSN2RemoteMapping.put(flushLogMap.getNodeUniqueLSN(), flushLogMap);
+ localLsn2RemoteMapping.put(flushLog.getLSN(), flushLogMap);
+ }
synchronized (flushLogslock) {
flushLogslock.notify();
}
@@ -497,18 +519,55 @@
}
/**
- * this method is called sequentially by LogPage (notifyReplicationTerminator)
- * for JOB_COMMIT and JOB_ABORT log types.
+ * this method is called sequentially by {@link LogBuffer#notifyReplicationTermination()}
+ * for JOB_COMMIT, JOB_ABORT, and FLUSH log types.
*/
@Override
public void notifyLogReplicationRequester(LogRecord logRecord) {
- pendingNotificationRemoteLogsQ.offer(logRecord);
+ switch (logRecord.getLogType()) {
+ case LogType.JOB_COMMIT:
+ case LogType.ABORT:
+ pendingNotificationRemoteLogsQ.offer(logRecord);
+ break;
+ case LogType.FLUSH:
+ final RemoteLogMapping remoteLogMapping;
+ synchronized (localLsn2RemoteMapping) {
+ remoteLogMapping = localLsn2RemoteMapping.remove(logRecord.getLSN());
+ }
+ checkpointReplicaIndexes(remoteLogMapping, logRecord.getDatasetId());
+ break;
+ default:
+ throw new IllegalStateException("Unexpected log type: " + logRecord.getLogType());
+ }
}
@Override
public SocketChannel getReplicationClientSocket() {
return socketChannel;
}
+
+ private void checkpointReplicaIndexes(RemoteLogMapping remoteLogMapping, int datasetId) {
+ try {
+ Predicate<LocalResource> replicaIndexesPredicate = lr -> {
+ DatasetLocalResource dls = (DatasetLocalResource) lr.getResource();
+ return dls.getDatasetId() == datasetId && !localResourceRep.getActivePartitions()
+ .contains(dls.getPartition());
+ };
+ final Map<Long, LocalResource> resources = localResourceRep.getResources(replicaIndexesPredicate);
+ final List<DatasetResourceReference> replicaIndexesRef =
+ resources.values().stream().map(DatasetResourceReference::of).collect(Collectors.toList());
+ for (DatasetResourceReference replicaIndexRef : replicaIndexesRef) {
+ final IIndexCheckpointManager indexCheckpointManager =
+ indexCheckpointManagerProvider.get(replicaIndexRef);
+ synchronized (indexCheckpointManager) {
+ indexCheckpointManager
+ .masterFlush(remoteLogMapping.getRemoteLSN(), remoteLogMapping.getLocalLSN());
+ }
+ }
+ } catch (Exception e) {
+ LOGGER.log(Level.SEVERE, "Failed to checkpoint replica indexes", e);
+ }
+ }
}
/**
@@ -541,7 +600,6 @@
* the received LSM components to a local LSN.
*/
private class LSMComponentsSyncService extends Thread {
- private static final int BULKLOAD_LSN = 0;
@Override
public void run() {
@@ -560,90 +618,22 @@
LOGGER.log(Level.SEVERE, "Unexpected exception during LSN synchronization", e);
}
}
-
}
}
private void syncLSMComponentFlushLSN(LSMComponentProperties lsmCompProp, LSMComponentLSNSyncTask syncTask)
throws InterruptedException, IOException {
- long remoteLSN = lsmCompProp.getOriginalLSN();
- //LSN=0 (bulkload) does not need to be updated and there is no flush log corresponding to it
- if (remoteLSN == BULKLOAD_LSN) {
- //since this is the first LSM component of this index,
- //then set the mapping in the LSN_MAP to the current log LSN because
- //no other log could've been received for this index since bulkload replication is synchronous.
- lsmCompProp.setReplicaLSN(logManager.getAppendLSN());
- return;
- }
-
- //path to the LSM component file
- Path path = Paths.get(syncTask.getComponentFilePath());
- if (lsmCompProp.getReplicaLSN() == null) {
- if (lsmCompProp.getOpType() == LSMOperationType.FLUSH) {
- //need to look up LSN mapping from memory
- RemoteLogMapping remoteLogMap = replicaUniqueLSN2RemoteMapping.get(lsmCompProp.getNodeUniqueLSN());
- //wait until flush log arrives, and verify the LSM component file still exists
- //The component file could be deleted if its NC fails.
- while (remoteLogMap == null && Files.exists(path)) {
- synchronized (flushLogslock) {
- flushLogslock.wait();
- }
- remoteLogMap = replicaUniqueLSN2RemoteMapping.get(lsmCompProp.getNodeUniqueLSN());
- }
-
- /**
- * file has been deleted due to its remote primary replica failure
- * before its LSN could've been synchronized.
- */
- if (remoteLogMap == null) {
- return;
- }
- lsmCompProp.setReplicaLSN(remoteLogMap.getLocalLSN());
- } else if (lsmCompProp.getOpType() == LSMOperationType.MERGE) {
- //need to load the LSN mapping from disk
- Map<Long, Long> lsmMap = replicaResourcesManager
- .getReplicaIndexLSNMap(lsmCompProp.getReplicaComponentPath(replicaResourcesManager));
- Long mappingLSN = lsmMap.get(lsmCompProp.getOriginalLSN());
- if (mappingLSN == null) {
- /**
- * this shouldn't happen unless this node just recovered and
- * the first component it received is a merged component due
- * to an on-going merge operation while recovery on the remote
- * replica. In this case, we use the current append LSN since
- * no new records exist for this index, otherwise they would've
- * been flushed. This could be prevented by waiting for any IO
- * to finish on the remote replica during recovery.
- */
- mappingLSN = logManager.getAppendLSN();
- }
- lsmCompProp.setReplicaLSN(mappingLSN);
+ final String componentFilePath = syncTask.getComponentFilePath();
+ final ResourceReference indexRef = ResourceReference.of(componentFilePath);
+ final IIndexCheckpointManager indexCheckpointManager = indexCheckpointManagerProvider.get(indexRef);
+ synchronized (indexCheckpointManager) {
+ long masterLsn = lsmCompProp.getOriginalLSN();
+ // wait until the lsn mapping is flushed to disk
+ while (!indexCheckpointManager.isFlushed(masterLsn)) {
+ indexCheckpointManager.wait();
}
- }
-
- if (Files.notExists(path)) {
- /**
- * This could happen when a merged component arrives and deletes
- * the flushed component (which we are trying to update) before
- * its flush log arrives since logs and components are received
- * on different threads.
- */
- return;
- }
-
- File destFile = new File(syncTask.getComponentFilePath());
- //prepare local LSN buffer
- ByteBuffer metadataBuffer = ByteBuffer.allocate(Long.BYTES);
- metadataBuffer.putLong(lsmCompProp.getReplicaLSN());
- metadataBuffer.flip();
-
- //replace the remote LSN value by the local one
- try (RandomAccessFile fileOutputStream = new RandomAccessFile(destFile, "rw");
- FileChannel fileChannel = fileOutputStream.getChannel()) {
- long lsnStartOffset = syncTask.getLSNByteOffset();
- while (metadataBuffer.hasRemaining()) {
- lsnStartOffset += fileChannel.write(metadataBuffer, lsnStartOffset);
- }
- fileChannel.force(true);
+ indexCheckpointManager
+ .replicated(AbstractLSMIndexFileManager.getComponentEndTime(indexRef.getName()), masterLsn);
}
}
}
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationManager.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationManager.java
index 48c7083..5cf7eab 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationManager.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationManager.java
@@ -54,7 +54,6 @@
import org.apache.asterix.common.cluster.ClusterPartition;
import org.apache.asterix.common.config.ReplicationProperties;
-import org.apache.asterix.common.dataflow.LSMIndexUtil;
import org.apache.asterix.common.replication.IReplicaResourcesManager;
import org.apache.asterix.common.replication.IReplicationManager;
import org.apache.asterix.common.replication.IReplicationStrategy;
@@ -63,6 +62,8 @@
import org.apache.asterix.common.replication.ReplicaEvent;
import org.apache.asterix.common.replication.ReplicationJob;
import org.apache.asterix.common.storage.DatasetResourceReference;
+import org.apache.asterix.common.storage.IIndexCheckpointManagerProvider;
+import org.apache.asterix.common.storage.ResourceReference;
import org.apache.asterix.common.transactions.IAppRuntimeContextProvider;
import org.apache.asterix.common.transactions.ILogManager;
import org.apache.asterix.common.transactions.ILogRecord;
@@ -84,7 +85,6 @@
import org.apache.hyracks.api.replication.IReplicationJob.ReplicationExecutionType;
import org.apache.hyracks.api.replication.IReplicationJob.ReplicationJobType;
import org.apache.hyracks.api.replication.IReplicationJob.ReplicationOperation;
-import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexReplicationJob;
import org.apache.hyracks.util.StorageUtil;
import org.apache.hyracks.util.StorageUtil.StorageUnit;
@@ -136,6 +136,7 @@
private final ByteBuffer txnLogsBatchSizeBuffer = ByteBuffer.allocate(Integer.BYTES);
private final IReplicationStrategy replicationStrategy;
private final PersistentLocalResourceRepository localResourceRepo;
+ private final IIndexCheckpointManagerProvider indexCheckpointManagerProvider;
//TODO this class needs to be refactored by moving its private classes to separate files
//and possibly using MessageBroker to send/receive remote replicas events.
@@ -148,6 +149,8 @@
this.replicaResourcesManager = (ReplicaResourcesManager) remoteResoucesManager;
this.asterixAppRuntimeContextProvider = asterixAppRuntimeContextProvider;
this.logManager = logManager;
+ this.indexCheckpointManagerProvider =
+ asterixAppRuntimeContextProvider.getAppContext().getIndexCheckpointManagerProvider();
localResourceRepo =
(PersistentLocalResourceRepository) asterixAppRuntimeContextProvider.getLocalResourceRepository();
this.hostIPAddressFirstOctet = replicationProperties.getReplicaIPAddress(nodeId).substring(0, 3);
@@ -284,7 +287,6 @@
if (!replicationStrategy.isMatch(indexFileRef.getDatasetId())) {
return;
}
-
int jobPartitionId = indexFileRef.getPartitionId();
ByteBuffer responseBuffer = null;
@@ -327,25 +329,10 @@
FileChannel fileChannel = fromFile.getChannel();) {
long fileSize = fileChannel.size();
-
- if (LSMComponentJob != null) {
- /**
- * since this is LSM_COMPONENT REPLICATE job, the job will contain
- * only the component being replicated.
- */
- ILSMDiskComponent diskComponent = LSMComponentJob.getLSMIndexOperationContext()
- .getComponentsToBeReplicated().get(0);
- long lsnOffset = LSMIndexUtil.getComponentFileLSNOffset(LSMComponentJob.getLSMIndex(),
- diskComponent, filePath);
- asterixFileProperties.initialize(filePath, fileSize, nodeId, isLSMComponentFile,
- lsnOffset, remainingFiles == 0);
- } else {
- asterixFileProperties.initialize(filePath, fileSize, nodeId, isLSMComponentFile, -1L,
- remainingFiles == 0);
- }
+ asterixFileProperties.initialize(filePath, fileSize, nodeId, isLSMComponentFile,
+ remainingFiles == 0);
requestBuffer = ReplicationProtocol.writeFileReplicationRequest(requestBuffer,
asterixFileProperties, ReplicationRequestType.REPLICATE_FILE);
-
Iterator<Map.Entry<String, SocketChannel>> iterator = replicasSockets.entrySet().iterator();
while (iterator.hasNext()) {
Map.Entry<String, SocketChannel> entry = iterator.next();
@@ -378,8 +365,7 @@
} else if (job.getOperation() == ReplicationOperation.DELETE) {
for (String filePath : job.getJobFiles()) {
remainingFiles--;
- asterixFileProperties.initialize(filePath, -1, nodeId, isLSMComponentFile, -1L,
- remainingFiles == 0);
+ asterixFileProperties.initialize(filePath, -1, nodeId, isLSMComponentFile, remainingFiles == 0);
ReplicationProtocol.writeFileReplicationRequest(requestBuffer, asterixFileProperties,
ReplicationRequestType.DELETE_FILE);
@@ -1026,10 +1012,10 @@
ByteBuffer requestBuffer = ByteBuffer.allocate(INITIAL_BUFFER_SIZE);
for (String replicaId : replicaIds) {
//1. identify replica indexes with LSN less than nonSharpCheckpointTargetLSN.
- Map<Long, String> laggingIndexes =
+ Map<Long, DatasetResourceReference> laggingIndexes =
replicaResourcesManager.getLaggingReplicaIndexesId2PathMap(replicaId, nonSharpCheckpointTargetLSN);
- if (laggingIndexes.size() > 0) {
+ if (!laggingIndexes.isEmpty()) {
//2. send request to remote replicas that have lagging indexes.
ReplicaIndexFlushRequest laggingIndexesResponse = null;
try (SocketChannel socketChannel = getReplicaSocket(replicaId)) {
@@ -1052,16 +1038,14 @@
}
/*
- * 4. update the LSN_MAP for indexes that were not flushed
+ * 4. update checkpoints for indexes that were not flushed
* to the current append LSN to indicate no operations happened
* since the checkpoint start.
*/
if (laggingIndexesResponse != null) {
for (Long resouceId : laggingIndexesResponse.getLaggingRescouresIds()) {
- String indexPath = laggingIndexes.get(resouceId);
- Map<Long, Long> indexLSNMap = replicaResourcesManager.getReplicaIndexLSNMap(indexPath);
- indexLSNMap.put(ReplicaResourcesManager.REPLICA_INDEX_CREATION_LSN, startLSN);
- replicaResourcesManager.updateReplicaIndexLSNMap(indexPath, indexLSNMap);
+ final DatasetResourceReference datasetResourceReference = laggingIndexes.get(resouceId);
+ indexCheckpointManagerProvider.get(datasetResourceReference).advanceLowWatermark(startLSN);
}
}
}
@@ -1141,12 +1125,11 @@
fileChannel.force(true);
}
- //we need to create LSN map for .metadata files that belong to remote replicas
+ //we need to create initial map for .metadata files that belong to remote replicas
if (!fileProperties.isLSMComponentFile() && !fileProperties.getNodeId().equals(nodeId)) {
- //replica index
- replicaResourcesManager.initializeReplicaIndexLSNMap(indexPath, logManager.getAppendLSN());
+ final ResourceReference indexRef = ResourceReference.of(destFile.getAbsolutePath());
+ indexCheckpointManagerProvider.get(indexRef).init(logManager.getAppendLSN());
}
-
responseFunction = ReplicationProtocol.getRequestType(socketChannel, dataBuffer);
}
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/storage/LSMComponentLSNSyncTask.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/storage/LSMComponentLSNSyncTask.java
index f11adc2..08c0ec7 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/storage/LSMComponentLSNSyncTask.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/storage/LSMComponentLSNSyncTask.java
@@ -19,33 +19,21 @@
package org.apache.asterix.replication.storage;
public class LSMComponentLSNSyncTask {
+
private String componentFilePath;
private String componentId;
- private long LSNByteOffset;
- public LSMComponentLSNSyncTask(String componentId, String componentFilePath, long LSNByteOffset) {
+ public LSMComponentLSNSyncTask(String componentId, String componentFilePath) {
this.componentId = componentId;
this.componentFilePath = componentFilePath;
- this.LSNByteOffset = LSNByteOffset;
}
public String getComponentFilePath() {
return componentFilePath;
}
- public void setComponentFilePath(String componentFilePath) {
- this.componentFilePath = componentFilePath;
- }
-
public String getComponentId() {
return componentId;
}
- public void setComponentId(String componentId) {
- this.componentId = componentId;
- }
-
- public long getLSNByteOffset() {
- return LSNByteOffset;
- }
}
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/storage/LSMComponentProperties.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/storage/LSMComponentProperties.java
index 7ca6f2f..bf987d0 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/storage/LSMComponentProperties.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/storage/LSMComponentProperties.java
@@ -52,9 +52,10 @@
this.nodeId = nodeId;
componentId = LSMComponentProperties.getLSMComponentID((String) job.getJobFiles().toArray()[0]);
numberOfFiles = new AtomicInteger(job.getJobFiles().size());
- originalLSN = LSMComponentProperties.getLSMComponentLSN((AbstractLSMIndex) job.getLSMIndex(),
- job.getLSMIndexOperationContext());
opType = job.getLSMOpType();
+ originalLSN = opType == LSMOperationType.FLUSH ?
+ LSMComponentProperties.getLSMComponentLSN((AbstractLSMIndex) job.getLSMIndex(),
+ job.getLSMIndexOperationContext()) : 0;
}
public LSMComponentProperties() {
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/storage/LSMIndexFileProperties.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/storage/LSMIndexFileProperties.java
index f2747fe..2ebf2cb 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/storage/LSMIndexFileProperties.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/storage/LSMIndexFileProperties.java
@@ -31,27 +31,25 @@
private boolean lsmComponentFile;
private String filePath;
private boolean requiresAck = false;
- private long LSNByteOffset;
public LSMIndexFileProperties() {
}
public LSMIndexFileProperties(String filePath, long fileSize, String nodeId, boolean lsmComponentFile,
- long LSNByteOffset, boolean requiresAck) {
- initialize(filePath, fileSize, nodeId, lsmComponentFile, LSNByteOffset, requiresAck);
+ boolean requiresAck) {
+ initialize(filePath, fileSize, nodeId, lsmComponentFile, requiresAck);
}
public LSMIndexFileProperties(LSMComponentProperties lsmComponentProperties) {
- initialize(lsmComponentProperties.getComponentId(), -1, lsmComponentProperties.getNodeId(), false, -1L, false);
+ initialize(lsmComponentProperties.getComponentId(), -1, lsmComponentProperties.getNodeId(), false, false);
}
- public void initialize(String filePath, long fileSize, String nodeId, boolean lsmComponentFile, long LSNByteOffset,
+ public void initialize(String filePath, long fileSize, String nodeId, boolean lsmComponentFile,
boolean requiresAck) {
this.filePath = filePath;
this.fileSize = fileSize;
this.nodeId = nodeId;
this.lsmComponentFile = lsmComponentFile;
- this.LSNByteOffset = LSNByteOffset;
this.requiresAck = requiresAck;
}
@@ -61,7 +59,6 @@
dos.writeUTF(filePath);
dos.writeLong(fileSize);
dos.writeBoolean(lsmComponentFile);
- dos.writeLong(LSNByteOffset);
dos.writeBoolean(requiresAck);
}
@@ -70,10 +67,9 @@
String filePath = input.readUTF();
long fileSize = input.readLong();
boolean lsmComponentFile = input.readBoolean();
- long LSNByteOffset = input.readLong();
boolean requiresAck = input.readBoolean();
- LSMIndexFileProperties fileProp = new LSMIndexFileProperties(filePath, fileSize, nodeId, lsmComponentFile,
- LSNByteOffset, requiresAck);
+ LSMIndexFileProperties fileProp =
+ new LSMIndexFileProperties(filePath, fileSize, nodeId, lsmComponentFile, requiresAck);
return fileProp;
}
@@ -108,11 +104,6 @@
sb.append("File Size: " + fileSize + " ");
sb.append("Node ID: " + nodeId + " ");
sb.append("isLSMComponentFile : " + lsmComponentFile + " ");
- sb.append("LSN Byte Offset: " + LSNByteOffset);
return sb.toString();
}
-
- public long getLSNByteOffset() {
- return LSNByteOffset;
- }
}
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/storage/ReplicaResourcesManager.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/storage/ReplicaResourcesManager.java
index 7eea4a4..2ff74a8 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/storage/ReplicaResourcesManager.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/storage/ReplicaResourcesManager.java
@@ -19,27 +19,26 @@
package org.apache.asterix.replication.storage;
import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileOutputStream;
import java.io.FilenameFilter;
import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
-import java.util.logging.Level;
-import java.util.logging.Logger;
+import java.util.stream.Collectors;
import org.apache.asterix.common.cluster.ClusterPartition;
import org.apache.asterix.common.config.MetadataProperties;
+import org.apache.asterix.common.dataflow.DatasetLocalResource;
import org.apache.asterix.common.replication.IReplicaResourcesManager;
+import org.apache.asterix.common.storage.DatasetResourceReference;
+import org.apache.asterix.common.storage.IIndexCheckpointManagerProvider;
import org.apache.asterix.common.utils.StorageConstants;
import org.apache.asterix.common.utils.StoragePathUtil;
import org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository;
@@ -50,16 +49,15 @@
import org.apache.hyracks.storage.common.LocalResource;
public class ReplicaResourcesManager implements IReplicaResourcesManager {
- private static final Logger LOGGER = Logger.getLogger(ReplicaResourcesManager.class.getName());
- public final static String LSM_COMPONENT_MASK_SUFFIX = "_mask";
- private final static String REPLICA_INDEX_LSN_MAP_NAME = ".LSN_MAP";
- public static final long REPLICA_INDEX_CREATION_LSN = -1;
+ public static final String LSM_COMPONENT_MASK_SUFFIX = "_mask";
private final PersistentLocalResourceRepository localRepository;
private final Map<String, ClusterPartition[]> nodePartitions;
+ private final IIndexCheckpointManagerProvider indexCheckpointManagerProvider;
- public ReplicaResourcesManager(ILocalResourceRepository localRepository,
- MetadataProperties metadataProperties) {
+ public ReplicaResourcesManager(ILocalResourceRepository localRepository, MetadataProperties metadataProperties,
+ IIndexCheckpointManagerProvider indexCheckpointManagerProvider) {
this.localRepository = (PersistentLocalResourceRepository) localRepository;
+ this.indexCheckpointManagerProvider = indexCheckpointManagerProvider;
nodePartitions = metadataProperties.getNodePartitions();
}
@@ -86,12 +84,6 @@
return indexPath.toString();
}
- public void initializeReplicaIndexLSNMap(String indexPath, long currentLSN) throws IOException {
- HashMap<Long, Long> lsnMap = new HashMap<Long, Long>();
- lsnMap.put(REPLICA_INDEX_CREATION_LSN, currentLSN);
- updateReplicaIndexLSNMap(indexPath, lsnMap);
- }
-
public void createRemoteLSMComponentMask(LSMComponentProperties lsmComponentProperties) throws IOException {
String maskPath = lsmComponentProperties.getMaskPath(this);
Path path = Paths.get(maskPath);
@@ -106,13 +98,6 @@
String maskPath = lsmComponentProperties.getMaskPath(this);
Path path = Paths.get(maskPath);
Files.deleteIfExists(path);
-
- //add component LSN to the index LSNs map
- Map<Long, Long> lsnMap = getReplicaIndexLSNMap(lsmComponentProperties.getReplicaComponentPath(this));
- lsnMap.put(lsmComponentProperties.getOriginalLSN(), lsmComponentProperties.getReplicaLSN());
-
- //update map on disk
- updateReplicaIndexLSNMap(lsmComponentProperties.getReplicaComponentPath(this), lsnMap);
}
public Set<File> getReplicaIndexes(String replicaId) throws HyracksDataException {
@@ -128,58 +113,37 @@
public long getPartitionsMinLSN(Set<Integer> partitions) throws HyracksDataException {
long minRemoteLSN = Long.MAX_VALUE;
for (Integer partition : partitions) {
- //for every index in replica
- Set<File> remoteIndexes = localRepository.getPartitionIndexes(partition);
- for (File indexFolder : remoteIndexes) {
- //read LSN map
- try {
- //get max LSN per index
- long remoteIndexMaxLSN = getReplicaIndexMaxLSN(indexFolder);
-
- //get min of all maximums
- minRemoteLSN = Math.min(minRemoteLSN, remoteIndexMaxLSN);
- } catch (IOException e) {
- LOGGER.log(Level.INFO,
- indexFolder.getAbsolutePath() + " Couldn't read LSN map for index " + indexFolder);
- continue;
- }
+ final List<DatasetResourceReference> partitionResources = localRepository.getResources(resource -> {
+ DatasetLocalResource dsResource = (DatasetLocalResource) resource.getResource();
+ return dsResource.getPartition() == partition;
+ }).values().stream().map(DatasetResourceReference::of).collect(Collectors.toList());
+ for (DatasetResourceReference indexRef : partitionResources) {
+ long remoteIndexMaxLSN = indexCheckpointManagerProvider.get(indexRef).getLowWatermark();
+ minRemoteLSN = Math.min(minRemoteLSN, remoteIndexMaxLSN);
}
}
return minRemoteLSN;
}
- public Map<Long, String> getLaggingReplicaIndexesId2PathMap(String replicaId, long targetLSN) throws IOException {
- Map<Long, String> laggingReplicaIndexes = new HashMap<Long, String>();
- try {
- //for every index in replica
- Set<File> remoteIndexes = getReplicaIndexes(replicaId);
- for (File indexFolder : remoteIndexes) {
- if (getReplicaIndexMaxLSN(indexFolder) < targetLSN) {
- File localResource = new File(
- indexFolder + File.separator + StorageConstants.METADATA_FILE_NAME);
- LocalResource resource = PersistentLocalResourceRepository.readLocalResource(localResource);
- laggingReplicaIndexes.put(resource.getId(), indexFolder.getAbsolutePath());
+ public Map<Long, DatasetResourceReference> getLaggingReplicaIndexesId2PathMap(String replicaId, long targetLSN)
+ throws HyracksDataException {
+ Map<Long, DatasetResourceReference> laggingReplicaIndexes = new HashMap<>();
+ final List<Integer> replicaPartitions =
+ Arrays.stream(nodePartitions.get(replicaId)).map(ClusterPartition::getPartitionId)
+ .collect(Collectors.toList());
+ for (int patition : replicaPartitions) {
+ final Map<Long, LocalResource> partitionResources = localRepository.getPartitionResources(patition);
+ final List<DatasetResourceReference> indexesRefs =
+ partitionResources.values().stream().map(DatasetResourceReference::of).collect(Collectors.toList());
+ for (DatasetResourceReference ref : indexesRefs) {
+ if (indexCheckpointManagerProvider.get(ref).getLowWatermark() < targetLSN) {
+ laggingReplicaIndexes.put(ref.getResourceId(), ref);
}
}
- } catch (HyracksDataException e) {
- e.printStackTrace();
}
-
return laggingReplicaIndexes;
}
- private long getReplicaIndexMaxLSN(File indexFolder) throws IOException {
- long remoteIndexMaxLSN = 0;
- //get max LSN per index
- Map<Long, Long> lsnMap = getReplicaIndexLSNMap(indexFolder.getAbsolutePath());
- if (lsnMap != null) {
- for (Long lsn : lsnMap.values()) {
- remoteIndexMaxLSN = Math.max(remoteIndexMaxLSN, lsn);
- }
- }
- return remoteIndexMaxLSN;
- }
-
public void cleanInvalidLSMComponents(String replicaId) {
//for every index in replica
Set<File> remoteIndexes = null;
@@ -214,28 +178,6 @@
}
}
- @SuppressWarnings({ "unchecked" })
- public synchronized Map<Long, Long> getReplicaIndexLSNMap(String indexPath) throws IOException {
- try (FileInputStream fis = new FileInputStream(indexPath + File.separator + REPLICA_INDEX_LSN_MAP_NAME);
- ObjectInputStream oisFromFis = new ObjectInputStream(fis)) {
- Map<Long, Long> lsnMap = null;
- try {
- lsnMap = (Map<Long, Long>) oisFromFis.readObject();
- } catch (ClassNotFoundException e) {
- e.printStackTrace();
- }
- return lsnMap;
- }
- }
-
- public synchronized void updateReplicaIndexLSNMap(String indexPath, Map<Long, Long> lsnMap) throws IOException {
- try (FileOutputStream fos = new FileOutputStream(indexPath + File.separator + REPLICA_INDEX_LSN_MAP_NAME);
- ObjectOutputStream oosToFos = new ObjectOutputStream(fos)) {
- oosToFos.writeObject(lsnMap);
- oosToFos.flush();
- }
- }
-
/**
* @param partition
* @return Absolute paths to all partition files
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
index e87a39b..ba43fb7 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
@@ -29,6 +29,7 @@
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
+import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
@@ -38,8 +39,6 @@
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Predicate;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
import org.apache.asterix.common.cluster.ClusterPartition;
import org.apache.asterix.common.config.MetadataProperties;
@@ -48,10 +47,12 @@
import org.apache.asterix.common.replication.IReplicationManager;
import org.apache.asterix.common.replication.ReplicationJob;
import org.apache.asterix.common.storage.DatasetResourceReference;
+import org.apache.asterix.common.storage.IIndexCheckpointManagerProvider;
import org.apache.asterix.common.storage.ResourceReference;
import org.apache.asterix.common.utils.StorageConstants;
import org.apache.asterix.common.utils.StoragePathUtil;
import org.apache.commons.io.FileUtils;
+import org.apache.commons.io.filefilter.IOFileFilter;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.io.FileReference;
import org.apache.hyracks.api.io.IIOManager;
@@ -72,7 +73,29 @@
public static final Predicate<Path> INDEX_COMPONENTS = path -> !path.endsWith(StorageConstants.METADATA_FILE_NAME);
// Private constants
private static final int MAX_CACHED_RESOURCES = 1000;
- private static final int RESOURCES_TREE_DEPTH_FROM_STORAGE_ROOT = 6;
+ private static final IOFileFilter METADATA_FILES_FILTER = new IOFileFilter() {
+ @Override
+ public boolean accept(File file) {
+ return file.getName().equals(StorageConstants.METADATA_FILE_NAME);
+ }
+
+ @Override
+ public boolean accept(File dir, String name) {
+ return false;
+ }
+ };
+
+ private static final IOFileFilter ALL_DIR_FILTER = new IOFileFilter() {
+ @Override
+ public boolean accept(File file) {
+ return true;
+ }
+
+ @Override
+ public boolean accept(File dir, String name) {
+ return true;
+ }
+ };
// Finals
private final IIOManager ioManager;
@@ -85,15 +108,17 @@
private IReplicationManager replicationManager;
private Set<Integer> nodeInactivePartitions;
private final Path[] storageRoots;
+ private final IIndexCheckpointManagerProvider indexCheckpointManagerProvider;
- public PersistentLocalResourceRepository(IIOManager ioManager, String nodeId,
- MetadataProperties metadataProperties) {
+ public PersistentLocalResourceRepository(IIOManager ioManager, String nodeId, MetadataProperties metadataProperties,
+ IIndexCheckpointManagerProvider indexCheckpointManagerProvider) {
this.ioManager = ioManager;
+ this.indexCheckpointManagerProvider = indexCheckpointManagerProvider;
storageRoots = new Path[ioManager.getIODevices().size()];
final List<IODeviceHandle> ioDevices = ioManager.getIODevices();
for (int i = 0; i < ioDevices.size(); i++) {
- storageRoots[i] =
- Paths.get(ioDevices.get(i).getMount().getAbsolutePath(), StorageConstants.STORAGE_ROOT_DIR_NAME);
+ storageRoots[i] = Paths.get(ioDevices.get(i).getMount().getAbsolutePath(),
+ StorageConstants.STORAGE_ROOT_DIR_NAME);
}
createStorageRoots();
resourceCache = CacheBuilder.newBuilder().maximumSize(MAX_CACHED_RESOURCES).build();
@@ -119,7 +144,7 @@
}
@Override
- public LocalResource get(String relativePath) throws HyracksDataException {
+ public synchronized LocalResource get(String relativePath) throws HyracksDataException {
LocalResource resource = resourceCache.getIfPresent(relativePath);
if (resource == null) {
FileReference resourceFile = getLocalResourceFileByName(ioManager, relativePath);
@@ -153,7 +178,7 @@
}
resourceCache.put(resource.getPath(), resource);
-
+ indexCheckpointManagerProvider.get(DatasetResourceReference.of(resource)).init(0);
//if replication enabled, send resource metadata info to remote nodes
if (isReplicationEnabled) {
createReplicationJob(ReplicationOperation.REPLICATE, resourceFile);
@@ -164,18 +189,16 @@
public synchronized void delete(String relativePath) throws HyracksDataException {
FileReference resourceFile = getLocalResourceFileByName(ioManager, relativePath);
if (resourceFile.getFile().exists()) {
- try {
- // Invalidate before deleting the file just in case file deletion throws some exception.
- // Since it's just a cache invalidation, it should not affect correctness.
- resourceCache.invalidate(relativePath);
- IoUtil.delete(resourceFile);
- } finally {
- // Regardless of successfully deleted or not, the operation should be replicated.
- //if replication enabled, delete resource from remote replicas
- if (isReplicationEnabled) {
- createReplicationJob(ReplicationOperation.DELETE, resourceFile);
- }
+ if (isReplicationEnabled) {
+ createReplicationJob(ReplicationOperation.DELETE, resourceFile);
}
+ // delete all checkpoints
+ final LocalResource localResource = readLocalResource(resourceFile.getFile());
+ indexCheckpointManagerProvider.get(DatasetResourceReference.of(localResource)).delete();
+ // Invalidate before deleting the file just in case file deletion throws some exception.
+ // Since it's just a cache invalidation, it should not affect correctness.
+ resourceCache.invalidate(relativePath);
+ IoUtil.delete(resourceFile);
} else {
throw HyracksDataException
.create(org.apache.hyracks.api.exceptions.ErrorCode.RESOURCE_DOES_NOT_EXIST, relativePath);
@@ -188,13 +211,13 @@
return ioManager.resolve(fileName);
}
- public Map<Long, LocalResource> getResources(Predicate<LocalResource> filter) throws HyracksDataException {
+ public synchronized Map<Long, LocalResource> getResources(Predicate<LocalResource> filter)
+ throws HyracksDataException {
Map<Long, LocalResource> resourcesMap = new HashMap<>();
for (Path root : storageRoots) {
- try (Stream<Path> stream = Files.find(root, RESOURCES_TREE_DEPTH_FROM_STORAGE_ROOT,
- (path, attr) -> path.getFileName().toString().equals(StorageConstants.METADATA_FILE_NAME))) {
- final List<File> resourceMetadataFiles = stream.map(Path::toFile).collect(Collectors.toList());
- for (File file : resourceMetadataFiles) {
+ final Collection<File> files = FileUtils.listFiles(root.toFile(), METADATA_FILES_FILTER, ALL_DIR_FILTER);
+ try {
+ for (File file : files) {
final LocalResource localResource = PersistentLocalResourceRepository.readLocalResource(file);
if (filter.test(localResource)) {
resourcesMap.put(localResource.getId(), localResource);
@@ -321,6 +344,13 @@
return indexes;
}
+ public Map<Long, LocalResource> getPartitionResources(int partition) throws HyracksDataException {
+ return getResources(resource -> {
+ DatasetLocalResource dsResource = (DatasetLocalResource) resource.getResource();
+ return dsResource.getPartition() == partition;
+ });
+ }
+
/**
* Given any index file, an absolute {@link FileReference} is returned which points to where the index of
* {@code indexFile} is located.
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepositoryFactory.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepositoryFactory.java
index 43024b6..33c6260 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepositoryFactory.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepositoryFactory.java
@@ -19,6 +19,7 @@
package org.apache.asterix.transaction.management.resource;
import org.apache.asterix.common.config.MetadataProperties;
+import org.apache.asterix.common.storage.IIndexCheckpointManagerProvider;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.io.IIOManager;
import org.apache.hyracks.storage.common.ILocalResourceRepository;
@@ -28,16 +29,19 @@
private final IIOManager ioManager;
private final String nodeId;
private final MetadataProperties metadataProperties;
+ private final IIndexCheckpointManagerProvider indexCheckpointManagerProvider;
public PersistentLocalResourceRepositoryFactory(IIOManager ioManager, String nodeId,
- MetadataProperties metadataProperties) {
+ MetadataProperties metadataProperties, IIndexCheckpointManagerProvider indexCheckpointManagerProvider) {
this.ioManager = ioManager;
this.nodeId = nodeId;
this.metadataProperties = metadataProperties;
+ this.indexCheckpointManagerProvider = indexCheckpointManagerProvider;
}
@Override
public ILocalResourceRepository createRepository() throws HyracksDataException {
- return new PersistentLocalResourceRepository(ioManager, nodeId, metadataProperties);
+ return new PersistentLocalResourceRepository(ioManager, nodeId, metadataProperties,
+ indexCheckpointManagerProvider);
}
}
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogBuffer.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogBuffer.java
index 6ebf52c..96a31c6 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogBuffer.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogBuffer.java
@@ -111,8 +111,8 @@
logRecord.isFlushed(false);
flushQ.add(logRecord);
}
- } else if (logRecord.getLogSource() == LogSource.REMOTE
- && (logRecord.getLogType() == LogType.JOB_COMMIT || logRecord.getLogType() == LogType.ABORT)) {
+ } else if (logRecord.getLogSource() == LogSource.REMOTE && (logRecord.getLogType() == LogType.JOB_COMMIT
+ || logRecord.getLogType() == LogType.ABORT || logRecord.getLogType() == LogType.FLUSH)) {
remoteJobsQ.add(logRecord);
}
this.notify();
@@ -260,10 +260,9 @@
} else if (logRecord.getLogType() == LogType.WAIT) {
notifyWaitTermination();
}
- } else if (logRecord.getLogSource() == LogSource.REMOTE) {
- if (logRecord.getLogType() == LogType.JOB_COMMIT || logRecord.getLogType() == LogType.ABORT) {
- notifyReplicationTermination();
- }
+ } else if (logRecord.getLogSource() == LogSource.REMOTE && (logRecord.getLogType() == LogType.JOB_COMMIT
+ || logRecord.getLogType() == LogType.ABORT || logRecord.getLogType() == LogType.FLUSH)) {
+ notifyReplicationTermination();
}
logRecord = logBufferTailReader.next();
}
@@ -295,10 +294,12 @@
public void notifyFlushTermination() throws ACIDException {
LogRecord logRecord = null;
- try {
- logRecord = (LogRecord) flushQ.take();
- } catch (InterruptedException e) {
- //ignore
+ while (logRecord == null) {
+ try {
+ logRecord = (LogRecord) flushQ.take();
+ } catch (InterruptedException e) { //NOSONAR LogFlusher should survive interrupts
+ //ignore
+ }
}
synchronized (logRecord) {
logRecord.isFlushed(true);
@@ -316,10 +317,12 @@
public void notifyReplicationTermination() {
LogRecord logRecord = null;
- try {
- logRecord = (LogRecord) remoteJobsQ.take();
- } catch (InterruptedException e) {
- //ignore
+ while (logRecord == null) {
+ try {
+ logRecord = (LogRecord) remoteJobsQ.take();
+ } catch (InterruptedException e) { //NOSONAR LogFlusher should survive interrupts
+ //ignore
+ }
}
logRecord.isFlushed(true);
IReplicationThread replicationThread = logRecord.getReplicationThread();
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndexFileManager.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndexFileManager.java
index d3c056d..a6ceba8 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndexFileManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndexFileManager.java
@@ -411,4 +411,8 @@
prevTimestamp = ts;
return ts;
}
+
+ public static String getComponentEndTime(String fileName) {
+ return fileName.split(DELIMITER)[1];
+ }
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java
index 393aa6a..5368591 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java
@@ -560,22 +560,31 @@
if (LOGGER.isLoggable(Level.INFO)) {
LOGGER.info("Started a flush operation for index: " + lsmIndex + " ...");
}
-
- ILSMDiskComponent newComponent = null;
- boolean failedOperation = false;
try {
- newComponent = lsmIndex.flush(operation);
- operation.getCallback().afterOperation(LSMIOOperationType.FLUSH, null, newComponent);
- newComponent.markAsValid(lsmIndex.isDurable());
- } catch (Throwable e) { // NOSONAR Log and re-throw
- failedOperation = true;
- if (LOGGER.isLoggable(Level.SEVERE)) {
- LOGGER.log(Level.SEVERE, "Flush failed on " + lsmIndex, e);
+ ILSMDiskComponent newComponent = null;
+ boolean failedOperation = false;
+ try {
+ newComponent = lsmIndex.flush(operation);
+ operation.getCallback().afterOperation(LSMIOOperationType.FLUSH, null, newComponent);
+ newComponent.markAsValid(lsmIndex.isDurable());
+ } catch (Throwable e) { // NOSONAR Log and re-throw
+ failedOperation = true;
+ if (LOGGER.isLoggable(Level.SEVERE)) {
+ LOGGER.log(Level.SEVERE, "Flush failed on " + lsmIndex, e);
+ }
+ throw e;
+ } finally {
+ exitComponents(ctx, LSMOperationType.FLUSH, newComponent, failedOperation);
+ operation.getCallback().afterFinalize(LSMIOOperationType.FLUSH, newComponent);
+
}
- throw e;
} finally {
- exitComponents(ctx, LSMOperationType.FLUSH, newComponent, failedOperation);
- operation.getCallback().afterFinalize(LSMIOOperationType.FLUSH, newComponent);
+ /*
+ * Completion of flush/merge operations is done explicitly here to make sure all generated files during
+ * io operations is completed before the io operation is declared complete
+ */
+ opTracker.completeOperation(lsmIndex, LSMOperationType.FLUSH, ctx.getSearchOperationCallback(),
+ ctx.getModificationCallback());
}
if (LOGGER.isLoggable(Level.INFO)) {
LOGGER.info("Finished the flush operation for index: " + lsmIndex);
@@ -612,37 +621,42 @@
if (LOGGER.isLoggable(Level.INFO)) {
LOGGER.info("Started a merge operation for index: " + lsmIndex + " ...");
}
-
- ILSMDiskComponent newComponent = null;
- boolean failedOperation = false;
try {
- newComponent = lsmIndex.merge(operation);
- operation.getCallback().afterOperation(LSMIOOperationType.MERGE, ctx.getComponentHolder(), newComponent);
- newComponent.markAsValid(lsmIndex.isDurable());
- } catch (Throwable e) { // NOSONAR: Log and re-throw
- failedOperation = true;
- if (LOGGER.isLoggable(Level.SEVERE)) {
- LOGGER.log(Level.SEVERE, "Failed merge operation on " + lsmIndex, e);
+ ILSMDiskComponent newComponent = null;
+ boolean failedOperation = false;
+ try {
+ newComponent = lsmIndex.merge(operation);
+ operation.getCallback()
+ .afterOperation(LSMIOOperationType.MERGE, ctx.getComponentHolder(), newComponent);
+ newComponent.markAsValid(lsmIndex.isDurable());
+ } catch (Throwable e) { // NOSONAR: Log and re-throw
+ failedOperation = true;
+ if (LOGGER.isLoggable(Level.SEVERE)) {
+ LOGGER.log(Level.SEVERE, "Failed merge operation on " + lsmIndex, e);
+ }
+ throw e;
+ } finally {
+ exitComponents(ctx, LSMOperationType.MERGE, newComponent, failedOperation);
+ operation.getCallback().afterFinalize(LSMIOOperationType.MERGE, newComponent);
}
- throw e;
} finally {
- exitComponents(ctx, LSMOperationType.MERGE, newComponent, failedOperation);
- // Completion of the merge operation is called here to and not on afterOperation because
- // Deletion of the old components comes after afterOperation is called and the number of
- // io operation should not be decremented before the operation is complete to avoid
- // index destroy from competing with the merge on deletion of the files.
- // The order becomes:
- // 1. scheduleMerge
- // 2. enterComponents
- // 3. beforeOperation (increment the numOfIoOperations)
- // 4. merge
- // 5. exitComponents
- // 6. afterOperation (no op)
- // 7. delete components
- // 8. completeOperation (decrement the numOfIoOperations)
+ /*
+ * Completion of the merge operation is called here to and not on afterOperation because
+ * deletion of old components comes after afterOperation is called and the number of
+ * io operation should not be decremented before the operation is complete to avoid
+ * index destroy from competing with the merge on deletion of the files.
+ * The order becomes:
+ * 1. scheduleMerge
+ * 2. enterComponents
+ * 3. beforeOperation (increment the numOfIoOperations)
+ * 4. merge
+ * 5. exitComponents
+ * 6. afterOperation (no op)
+ * 7. delete components
+ * 8. completeOperation (decrement the numOfIoOperations)
+ */
opTracker.completeOperation(lsmIndex, LSMOperationType.MERGE, ctx.getSearchOperationCallback(),
ctx.getModificationCallback());
- operation.getCallback().afterFinalize(LSMIOOperationType.MERGE, newComponent);
}
if (LOGGER.isLoggable(Level.INFO)) {
LOGGER.info("Finished the merge operation for index: " + lsmIndex);