Introduce CheckpointManager API
This change includes the following:
- s/CheckpointObject/Checkpoint
- Add AsterixDB storage version to checkpoints.
- Prevent any txn log access when a storage version mismatch is detected.
- Introduce CheckpointManager API and CheckpointProperties.
- Properly stop checkpointing thread on instance shutdown.
- Separate checkpointing logic when replication enabled/disabled.
Change-Id: I36c00ca195b93bbe1e53f39bb4a3b5a344657f0d
Reviewed-on: https://asterix-gerrit.ics.uci.edu/1380
Sonar-Qube: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
BAD: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Murtadha Hubail <hubailmor@gmail.com>
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
index a3ae9a0..b1ca062 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
@@ -278,7 +278,7 @@
lccm.register((ILifeCycleComponent) datasetLifecycleManager);
lccm.register((ILifeCycleComponent) txnSubsystem.getTransactionManager());
lccm.register((ILifeCycleComponent) txnSubsystem.getLockManager());
-
+ lccm.register(txnSubsystem.getCheckpointManager());
}
@Override
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java
index 1887f2e..8998c6b 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java
@@ -37,6 +37,7 @@
import org.apache.asterix.common.config.IPropertiesProvider;
import org.apache.asterix.common.config.MessagingProperties;
import org.apache.asterix.common.replication.IRemoteRecoveryManager;
+import org.apache.asterix.common.transactions.ICheckpointManager;
import org.apache.asterix.common.transactions.IRecoveryManager;
import org.apache.asterix.common.transactions.IRecoveryManager.SystemState;
import org.apache.asterix.common.utils.PrintUtil;
@@ -47,7 +48,6 @@
import org.apache.asterix.messaging.NCMessageBroker;
import org.apache.asterix.runtime.message.ReportMaxResourceIdMessage;
import org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository;
-import org.apache.asterix.transaction.management.service.recovery.RecoveryManager;
import org.apache.commons.io.FileUtils;
import org.apache.hyracks.api.application.INCApplicationContext;
import org.apache.hyracks.api.application.INCApplicationEntryPoint;
@@ -249,8 +249,8 @@
lccm.startAll();
if (!pendingFailbackCompletion) {
- IRecoveryManager recoveryMgr = runtimeContext.getTransactionSubsystem().getRecoveryManager();
- recoveryMgr.checkpoint(true, RecoveryManager.NON_SHARP_CHECKPOINT_TARGET_LSN);
+ ICheckpointManager checkpointMgr = runtimeContext.getTransactionSubsystem().getCheckpointManager();
+ checkpointMgr.doSharpCheckpoint();
if (isMetadataNode) {
runtimeContext.exportMetadataNodeStub();
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/logging/CheckpointingTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/logging/CheckpointingTest.java
index deb53ad..9fbf850 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/logging/CheckpointingTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/logging/CheckpointingTest.java
@@ -30,6 +30,7 @@
import org.apache.asterix.common.configuration.Property;
import org.apache.asterix.common.dataflow.LSMInsertDeleteOperatorNodePushable;
import org.apache.asterix.common.transactions.DatasetId;
+import org.apache.asterix.common.transactions.ICheckpointManager;
import org.apache.asterix.common.transactions.IRecoveryManager;
import org.apache.asterix.common.transactions.ITransactionContext;
import org.apache.asterix.external.util.DataflowUtils;
@@ -125,6 +126,7 @@
FrameTupleAppender tupleAppender = new FrameTupleAppender(frame);
IRecoveryManager recoveryManager = nc.getTransactionSubsystem().getRecoveryManager();
+ ICheckpointManager checkpointManager = nc.getTransactionSubsystem().getCheckpointManager();
LogManager logManager = (LogManager) nc.getTransactionSubsystem().getLogManager();
// Number of log files after node startup should be one
int numberOfLogFiles = logManager.getLogFileIds().size();
@@ -154,7 +156,7 @@
* recovery)
*/
int numberOfLogFilesBeforeCheckpoint = logManager.getLogFileIds().size();
- recoveryManager.checkpoint(false, logManager.getAppendLSN());
+ checkpointManager.tryCheckpoint(logManager.getAppendLSN());
int numberOfLogFilesAfterCheckpoint = logManager.getLogFileIds().size();
Assert.assertEquals(numberOfLogFilesBeforeCheckpoint, numberOfLogFilesAfterCheckpoint);
@@ -175,7 +177,7 @@
* At this point, the low-water mark is not in the initialLowWaterMarkFileId, so
* a checkpoint should delete it.
*/
- recoveryManager.checkpoint(false, recoveryManager.getMinFirstLSN());
+ checkpointManager.tryCheckpoint(recoveryManager.getMinFirstLSN());
// Validate initialLowWaterMarkFileId was deleted
for (Long fileId : logManager.getLogFileIds()) {
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/Checkpoint.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/Checkpoint.java
new file mode 100644
index 0000000..8bbdab7
--- /dev/null
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/Checkpoint.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.transactions;
+
+import java.io.Serializable;
+
+public class Checkpoint implements Serializable, Comparable<Checkpoint> {
+
+ private static final long serialVersionUID = 1L;
+
+ private final long checkpointLsn;
+ private final long minMCTFirstLsn;
+ private final int maxJobId;
+ private final long timeStamp;
+ private final boolean sharp;
+ private final int storageVersion;
+
+ public Checkpoint(long checkpointLsn, long minMCTFirstLsn, int maxJobId, long timeStamp, boolean sharp,
+ int storageVersion) {
+ this.checkpointLsn = checkpointLsn;
+ this.minMCTFirstLsn = minMCTFirstLsn;
+ this.maxJobId = maxJobId;
+ this.timeStamp = timeStamp;
+ this.sharp = sharp;
+ this.storageVersion = storageVersion;
+ }
+
+ public long getCheckpointLsn() {
+ return checkpointLsn;
+ }
+
+ public long getMinMCTFirstLsn() {
+ return minMCTFirstLsn;
+ }
+
+ public int getMaxJobId() {
+ return maxJobId;
+ }
+
+ public long getTimeStamp() {
+ return timeStamp;
+ }
+
+ public boolean isSharp() {
+ return sharp;
+ }
+
+ public int getStorageVersion() {
+ return storageVersion;
+ }
+
+ @Override
+ public int compareTo(Checkpoint checkpoint) {
+ long compareTimeStamp = checkpoint.getTimeStamp();
+
+ // Descending order
+ long diff = compareTimeStamp - this.timeStamp;
+ if (diff > 0) {
+ return 1;
+ } else if (diff == 0) {
+ return 0;
+ } else {
+ return -1;
+ }
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) {
+ return true;
+ }
+ if (obj == null) {
+ return false;
+ }
+ if (!(obj instanceof Checkpoint)) {
+ return false;
+ }
+ Checkpoint other = (Checkpoint) obj;
+ return compareTo(other) == 0;
+ }
+
+ @Override
+ public int hashCode() {
+ final int prime = 31;
+ int result = 1;
+ result = prime * result + (int) (checkpointLsn ^ (checkpointLsn >>> 32));
+ result = prime * result + maxJobId;
+ result = prime * result + (int) (minMCTFirstLsn ^ (minMCTFirstLsn >>> 32));
+ result = prime * result + (sharp ? 1231 : 1237);
+ result = prime * result + storageVersion;
+ result = prime * result + (int) (timeStamp ^ (timeStamp >>> 32));
+ return result;
+ }
+}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/CheckpointProperties.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/CheckpointProperties.java
new file mode 100644
index 0000000..b8af3a6
--- /dev/null
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/CheckpointProperties.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.transactions;
+
+import org.apache.asterix.common.config.TransactionProperties;
+
+public class CheckpointProperties {
+
+ private final String checkpointDirPath;
+ private final int lsnThreshold;
+ private final int pollFrequency;
+ private final int historyToKeep;
+
+ public CheckpointProperties(TransactionProperties txnProperties, String nodeId) {
+ // Currently we use the log files directory for checkpoints
+ checkpointDirPath = txnProperties.getLogDirectory(nodeId);
+ lsnThreshold = txnProperties.getCheckpointLSNThreshold();
+ pollFrequency = txnProperties.getCheckpointPollFrequency();
+ historyToKeep = txnProperties.getCheckpointHistory();
+ }
+
+ public int getLsnThreshold() {
+ return lsnThreshold;
+ }
+
+ public int getPollFrequency() {
+ return pollFrequency;
+ }
+
+ public int getHistoryToKeep() {
+ return historyToKeep;
+ }
+
+ public String getCheckpointDirPath() {
+ return checkpointDirPath;
+ }
+}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ICheckpointManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ICheckpointManager.java
new file mode 100644
index 0000000..9e7eb0d
--- /dev/null
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ICheckpointManager.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.transactions;
+
+import org.apache.asterix.common.exceptions.ACIDException;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.lifecycle.ILifeCycleComponent;
+
+public interface ICheckpointManager extends ILifeCycleComponent {
+
+ /**
+ * @return The latest checkpoint on disk if any exists. Otherwise null.
+ * @throws ACIDException
+ * when a checkpoint file cannot be read.
+ */
+ Checkpoint getLatest() throws ACIDException;
+
+ /**
+ * Performs a sharp checkpoint.
+ *
+ * @throws HyracksDataException
+ */
+ void doSharpCheckpoint() throws HyracksDataException;
+
+ /**
+ * Attempts to perform a soft checkpoint at the specified {@code checkpointTargetLSN}.
+ *
+ * @param checkpointTargetLSN
+ * @return The LSN recorded on the captured checkpoint.
+ * @throws HyracksDataException
+ */
+ long tryCheckpoint(long checkpointTargetLSN) throws HyracksDataException;
+}
\ No newline at end of file
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogManager.java
index 97d4897..aa018ba 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogManager.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogManager.java
@@ -105,4 +105,9 @@
* @throws IOException
*/
public void closeLogFile(TxnLogFile logFileRef, FileChannel fileChannel) throws IOException;
+
+ /**
+ * Deletes all current log files and start the next log file partition
+ */
+ void renewLogFiles();
}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/IRecoveryManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/IRecoveryManager.java
index a3115e7..6816116 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/IRecoveryManager.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/IRecoveryManager.java
@@ -82,20 +82,6 @@
public void rollbackTransaction(ITransactionContext txnContext) throws ACIDException;
/**
- * Makes a system checkpoint.
- *
- * @param isSharpCheckpoint
- * a flag indicating whether to perform a sharp or non-sharp checkpoint.
- * @param nonSharpCheckpointTargetLSN
- * if a non-sharp checkpoint to be performed, what is the minimum LSN it should target.
- * @return the LSN at which the checkpoint was performed.
- * @throws ACIDException
- * @throws HyracksDataException
- */
- public long checkpoint(boolean isSharpCheckpoint, long nonSharpCheckpointTargetLSN)
- throws ACIDException, HyracksDataException;
-
- /**
* @return min first LSN of the open indexes (including remote indexes if replication is enabled)
* @throws HyracksDataException
*/
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ITransactionManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ITransactionManager.java
index c81faf0..af056ae 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ITransactionManager.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ITransactionManager.java
@@ -124,4 +124,9 @@
*/
public ITransactionSubsystem getTransactionProvider();
+ /**
+ * @return The current max job id.
+ */
+ int getMaxJobId();
+
}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ITransactionSubsystem.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ITransactionSubsystem.java
index dc1e6ed..b3a3eba 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ITransactionSubsystem.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ITransactionSubsystem.java
@@ -18,7 +18,6 @@
*/
package org.apache.asterix.common.transactions;
-
public interface ITransactionSubsystem {
public ILogManager getLogManager();
@@ -33,4 +32,5 @@
public String getId();
+ public ICheckpointManager getCheckpointManager();
}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StorageConstants.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StorageConstants.java
index 382c94b..a885e93 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StorageConstants.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StorageConstants.java
@@ -18,12 +18,19 @@
*/
package org.apache.asterix.common.utils;
+import org.apache.hyracks.storage.am.common.api.ITreeIndexMetaDataFrame;
+
/**
* A static class that stores storage constants
*/
public class StorageConstants {
public static final String METADATA_ROOT = "root_metadata";
+ /** The storage version of AsterixDB related artifacts (e.g. log files, checkpoint files, etc..). */
+ private static final int LOCAL_STORAGE_VERSION = 1;
+
+ /** The storage version of AsterixDB stack. */
+ public static final int VERSION = LOCAL_STORAGE_VERSION + ITreeIndexMetaDataFrame.VERSION;
private StorageConstants() {
}
-}
+}
\ No newline at end of file
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java
index 57d5c39..25096f6 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java
@@ -351,7 +351,8 @@
return logFileSize * fileId + offset;
}
- public void renewLogFiles() throws IOException {
+ @Override
+ public void renewLogFiles() {
terminateLogFlusher();
long lastMaxLogFileId = deleteAllLogFiles();
initializeLogManager(lastMaxLogFileId + 1);
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/AbstractCheckpointManager.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/AbstractCheckpointManager.java
new file mode 100644
index 0000000..0b86ea5
--- /dev/null
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/AbstractCheckpointManager.java
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.transaction.management.service.recovery;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.FilenameFilter;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.logging.Logger;
+
+import org.apache.asterix.common.exceptions.ACIDException;
+import org.apache.asterix.common.transactions.Checkpoint;
+import org.apache.asterix.common.transactions.CheckpointProperties;
+import org.apache.asterix.common.transactions.ICheckpointManager;
+import org.apache.asterix.common.transactions.ILogManager;
+import org.apache.asterix.common.transactions.ITransactionManager;
+import org.apache.asterix.common.transactions.ITransactionSubsystem;
+import org.apache.asterix.common.utils.StorageConstants;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+/**
+ * An abstract implementation of {@link ICheckpointManager}.
+ * The AbstractCheckpointManager contains the implementation of
+ * the base operations on checkpoints such as persisting and deleting them.
+ */
+public abstract class AbstractCheckpointManager implements ICheckpointManager {
+
+ private static final Logger LOGGER = Logger.getLogger(AbstractCheckpointManager.class.getName());
+ private static final String CHECKPOINT_FILENAME_PREFIX = "checkpoint_";
+ public static final long SHARP_CHECKPOINT_LSN = -1;
+ private static final FilenameFilter filter = (File dir, String name) -> name.startsWith(CHECKPOINT_FILENAME_PREFIX);
+ private final File checkpointDir;
+ private final int historyToKeep;
+ private final int lsnThreshold;
+ private final int pollFrequency;
+ protected final ITransactionSubsystem txnSubsystem;
+ private CheckpointThread checkpointer;
+
+ public AbstractCheckpointManager(ITransactionSubsystem txnSubsystem, CheckpointProperties checkpointProperties) {
+ this.txnSubsystem = txnSubsystem;
+ String checkpointDirPath = checkpointProperties.getCheckpointDirPath();
+ if (!checkpointDirPath.endsWith(File.separator)) {
+ checkpointDirPath += File.separator;
+ }
+ checkpointDir = new File(checkpointDirPath);
+ // Create the checkpoint directory if missing
+ if (!checkpointDir.exists()) {
+ (new File(checkpointDirPath)).mkdir();
+ }
+ lsnThreshold = checkpointProperties.getLsnThreshold();
+ pollFrequency = checkpointProperties.getPollFrequency();
+ // We must keep at least the latest checkpoint
+ historyToKeep = checkpointProperties.getHistoryToKeep() == 0 ? 1 : checkpointProperties.getHistoryToKeep();
+ }
+
+ @Override
+ public Checkpoint getLatest() throws ACIDException {
+ // Read all checkpointObjects from the existing checkpoint files
+ File[] checkpoints = checkpointDir.listFiles(filter);
+ if (checkpoints == null || checkpoints.length == 0) {
+ return null;
+ }
+
+ Checkpoint checkpointObject;
+ List<Checkpoint> checkpointObjectList = new ArrayList<>();
+ for (File file : checkpoints) {
+ try (FileInputStream fis = new FileInputStream(file);
+ ObjectInputStream oisFromFis = new ObjectInputStream(fis)) {
+ checkpointObject = (Checkpoint) oisFromFis.readObject();
+ checkpointObjectList.add(checkpointObject);
+ } catch (IOException | ClassNotFoundException e) {
+ throw new ACIDException("Failed to read a checkpoint file", e);
+ }
+ }
+ // Sort checkpointObjects in descending order by timeStamp to find out the most recent one.
+ Collections.sort(checkpointObjectList);
+
+ // Return the most recent one (the first one in sorted list)
+ return checkpointObjectList.get(0);
+ }
+
+ @Override
+ public void start() {
+ checkpointer = new CheckpointThread(this, txnSubsystem.getLogManager(), lsnThreshold, pollFrequency);
+ checkpointer.start();
+ }
+
+ @Override
+ public void stop(boolean dumpState, OutputStream ouputStream) throws IOException {
+ checkpointer.shutdown();
+ checkpointer.interrupt();
+ try {
+ // Wait until checkpoint thread stops
+ checkpointer.join();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ }
+
+ @Override
+ public void dumpState(OutputStream os) throws IOException {
+ // Nothing to dump
+ }
+
+ protected void capture(long minMCTFirstLSN, boolean sharp) throws HyracksDataException {
+ ILogManager logMgr = txnSubsystem.getLogManager();
+ ITransactionManager txnMgr = txnSubsystem.getTransactionManager();
+ Checkpoint checkpointObject = new Checkpoint(logMgr.getAppendLSN(), minMCTFirstLSN, txnMgr.getMaxJobId(),
+ System.currentTimeMillis(), sharp, StorageConstants.VERSION);
+ persist(checkpointObject);
+ cleanup();
+ }
+
+ private void persist(Checkpoint checkpoint) throws HyracksDataException {
+ // Construct checkpoint file name
+ String fileName = checkpointDir.getAbsolutePath() + File.separator + CHECKPOINT_FILENAME_PREFIX
+ + Long.toString(checkpoint.getTimeStamp());
+ //TODO: replace java serialization
+ // Write checkpoint file to disk
+ try (FileOutputStream fos = new FileOutputStream(fileName);
+ ObjectOutputStream oosToFos = new ObjectOutputStream(fos)) {
+ oosToFos.writeObject(checkpoint);
+ oosToFos.flush();
+ } catch (IOException e) {
+ throw new HyracksDataException("Failed to write checkpoint to disk", e);
+ }
+ }
+
+ private void cleanup() {
+ File[] checkpointFiles = checkpointDir.listFiles(filter);
+ // Sort the filenames lexicographically to keep the latest checkpoint history files.
+ Arrays.sort(checkpointFiles);
+ for (int i = 0; i < checkpointFiles.length - historyToKeep; i++) {
+ if (!checkpointFiles[i].delete()) {
+ LOGGER.warning("Could not delete checkpoint file at: " + checkpointFiles[i].getAbsolutePath());
+ }
+ }
+ }
+}
\ No newline at end of file
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/CheckpointManager.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/CheckpointManager.java
new file mode 100644
index 0000000..ea711a5
--- /dev/null
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/CheckpointManager.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.transaction.management.service.recovery;
+
+import java.util.logging.Logger;
+
+import org.apache.asterix.common.api.IDatasetLifecycleManager;
+import org.apache.asterix.common.transactions.CheckpointProperties;
+import org.apache.asterix.common.transactions.ICheckpointManager;
+import org.apache.asterix.common.transactions.ITransactionSubsystem;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+/**
+ * An implementation of {@link ICheckpointManager} that defines the logic
+ * of checkpoints.
+ */
+public class CheckpointManager extends AbstractCheckpointManager {
+
+ private static final Logger LOGGER = Logger.getLogger(CheckpointManager.class.getName());
+
+ public CheckpointManager(ITransactionSubsystem txnSubsystem, CheckpointProperties checkpointProperties) {
+ super(txnSubsystem, checkpointProperties);
+ }
+
+ /**
+ * Performs a sharp checkpoint. All datasets are flushed and all transaction
+ * log files are deleted.
+ */
+ @Override
+ public synchronized void doSharpCheckpoint() throws HyracksDataException {
+ LOGGER.info("Starting sharp checkpoint...");
+ final IDatasetLifecycleManager datasetLifecycleManager = txnSubsystem.getAsterixAppRuntimeContextProvider()
+ .getDatasetLifecycleManager();
+ datasetLifecycleManager.flushAllDatasets();
+ capture(SHARP_CHECKPOINT_LSN, true);
+ txnSubsystem.getLogManager().renewLogFiles();
+ LOGGER.info("Completed sharp checkpoint.");
+ }
+
+ /***
+ * Attempts to perform a soft checkpoint at the specified {@code checkpointTargetLSN}.
+ * If a checkpoint cannot be captured due to datasets having LSN < {@code checkpointTargetLSN},
+ * an asynchronous flush is triggered on them. When a checkpoint is successful, all transaction
+ * log files that end with LSN < {@code checkpointTargetLSN} are deleted.
+ */
+ @Override
+ public synchronized long tryCheckpoint(long checkpointTargetLSN) throws HyracksDataException {
+ LOGGER.info("Attemping soft checkpoint...");
+ final long minFirstLSN = txnSubsystem.getRecoveryManager().getMinFirstLSN();
+ boolean checkpointSucceeded = minFirstLSN >= checkpointTargetLSN;
+ if (!checkpointSucceeded) {
+ // Flush datasets with indexes behind target checkpoint LSN
+ IDatasetLifecycleManager datasetLifecycleManager = txnSubsystem.getAsterixAppRuntimeContextProvider()
+ .getDatasetLifecycleManager();
+ datasetLifecycleManager.scheduleAsyncFlushForLaggingDatasets(checkpointTargetLSN);
+ }
+ capture(minFirstLSN, false);
+ if (checkpointSucceeded) {
+ txnSubsystem.getLogManager().deleteOldLogFiles(minFirstLSN);
+ LOGGER.info(String.format("soft checkpoint succeeded at LSN(%s)", minFirstLSN));
+ }
+ return minFirstLSN;
+ }
+}
\ No newline at end of file
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/CheckpointManagerFactory.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/CheckpointManagerFactory.java
new file mode 100644
index 0000000..68c5ce1
--- /dev/null
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/CheckpointManagerFactory.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.transaction.management.service.recovery;
+
+import org.apache.asterix.common.transactions.CheckpointProperties;
+import org.apache.asterix.common.transactions.ICheckpointManager;
+import org.apache.asterix.common.transactions.ITransactionSubsystem;
+
+public class CheckpointManagerFactory {
+
+ private CheckpointManagerFactory() {
+ throw new AssertionError();
+ }
+
+ public static ICheckpointManager create(ITransactionSubsystem txnSubsystem,
+ CheckpointProperties checkpointProperties, boolean replicationEnabled) {
+ if (!replicationEnabled) {
+ return new CheckpointManager(txnSubsystem, checkpointProperties);
+ } else {
+ return new ReplicationCheckpointManager(txnSubsystem, checkpointProperties);
+ }
+ }
+}
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/CheckpointObject.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/CheckpointObject.java
deleted file mode 100644
index 3356298..0000000
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/CheckpointObject.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.transaction.management.service.recovery;
-
-import java.io.Serializable;
-
-public class CheckpointObject implements Serializable, Comparable<CheckpointObject> {
-
- private static final long serialVersionUID = 1L;
-
- private final long checkpointLsn;
- private final long minMCTFirstLsn;
- private final int maxJobId;
- private final long timeStamp;
- private final boolean sharp;
-
- public CheckpointObject(long checkpointLsn, long minMCTFirstLsn, int maxJobId, long timeStamp, boolean sharp) {
- this.checkpointLsn = checkpointLsn;
- this.minMCTFirstLsn = minMCTFirstLsn;
- this.maxJobId = maxJobId;
- this.timeStamp = timeStamp;
- this.sharp = sharp;
- }
-
- public long getCheckpointLsn() {
- return checkpointLsn;
- }
-
- public long getMinMCTFirstLsn() {
- return minMCTFirstLsn;
- }
-
- public int getMaxJobId() {
- return maxJobId;
- }
-
- public long getTimeStamp() {
- return timeStamp;
- }
-
- public boolean isSharp() {
- return sharp;
- }
-
- @Override
- public int compareTo(CheckpointObject checkpointObject) {
- long compareTimeStamp = checkpointObject.getTimeStamp();
-
- //decending order
- long diff = compareTimeStamp - this.timeStamp;
- if (diff > 0) {
- return 1;
- } else if (diff == 0) {
- return 0;
- } else {
- return -1;
- }
-
- //ascending order
- //return this.timeStamp - compareTimeStamp;
- }
-}
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/CheckpointThread.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/CheckpointThread.java
index 851289e..39c7c98 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/CheckpointThread.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/CheckpointThread.java
@@ -18,15 +18,18 @@
*/
package org.apache.asterix.transaction.management.service.recovery;
-import java.io.IOError;
import java.util.logging.Level;
import java.util.logging.Logger;
-import org.apache.asterix.common.exceptions.ACIDException;
+import org.apache.asterix.common.transactions.ICheckpointManager;
import org.apache.asterix.common.transactions.ILogManager;
-import org.apache.asterix.common.transactions.IRecoveryManager;
import org.apache.hyracks.api.exceptions.HyracksDataException;
+/**
+ * A daemon thread that periodically attempts to perform checkpoints.
+ * A checkpoint attempt is made when the volume of transaction logs written
+ * since the last successful checkpoint exceeds a certain threshold.
+ */
public class CheckpointThread extends Thread {
private static final Logger LOGGER = Logger.getLogger(CheckpointThread.class.getName());
@@ -34,33 +37,34 @@
private long checkpointTermInSecs;
private final ILogManager logManager;
- private final IRecoveryManager recoveryMgr;
+ private final ICheckpointManager checkpointManager;
+ private volatile boolean shouldRun = true;
- public CheckpointThread(IRecoveryManager recoveryMgr, ILogManager logManager,
- long lsnThreshold, long checkpointTermInSecs) {
- this.recoveryMgr = recoveryMgr;
+ public CheckpointThread(ICheckpointManager checkpointManager, ILogManager logManager, long lsnThreshold,
+ long checkpointTermInSecs) {
+ this.checkpointManager = checkpointManager;
this.logManager = logManager;
this.lsnThreshold = lsnThreshold;
this.checkpointTermInSecs = checkpointTermInSecs;
+ setDaemon(true);
}
@Override
public void run() {
-
Thread.currentThread().setName("Checkpoint Thread");
-
long currentCheckpointAttemptMinLSN;
long lastCheckpointLSN = -1;
long currentLogLSN;
long targetCheckpointLSN;
- while (true) {
+ while (shouldRun) {
try {
sleep(checkpointTermInSecs * 1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
- //ignore
}
-
+ if (!shouldRun) {
+ return;
+ }
if (lastCheckpointLSN == -1) {
try {
//Since the system just started up after sharp checkpoint,
@@ -84,18 +88,20 @@
//3. next time checkpoint comes, it will be able to remove log files which have end range less than current targetCheckpointLSN
targetCheckpointLSN = lastCheckpointLSN + lsnThreshold;
- currentCheckpointAttemptMinLSN = recoveryMgr.checkpoint(false, targetCheckpointLSN);
+ currentCheckpointAttemptMinLSN = checkpointManager.tryCheckpoint(targetCheckpointLSN);
//checkpoint was completed at target LSN or above
if (currentCheckpointAttemptMinLSN >= targetCheckpointLSN) {
lastCheckpointLSN = currentCheckpointAttemptMinLSN;
}
-
- } catch (ACIDException | HyracksDataException e) {
- throw new IOError(e);
+ } catch (HyracksDataException e) {
+ LOGGER.log(Level.SEVERE, "Error during checkpoint", e);
}
}
}
}
+ public void shutdown() {
+ shouldRun = false;
+ }
}
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/RecoveryManager.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/RecoveryManager.java
index 825e5c9..f8b6384 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/RecoveryManager.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/RecoveryManager.java
@@ -20,13 +20,9 @@
import java.io.File;
import java.io.FileInputStream;
-import java.io.FileNotFoundException;
import java.io.FileOutputStream;
-import java.io.FilenameFilter;
import java.io.IOException;
import java.io.InputStream;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
@@ -34,7 +30,6 @@
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
@@ -48,15 +43,13 @@
import java.util.logging.Logger;
import org.apache.asterix.common.api.IDatasetLifecycleManager;
-import org.apache.asterix.common.cluster.ClusterPartition;
-import org.apache.asterix.common.config.MetadataProperties;
import org.apache.asterix.common.config.ClusterProperties;
-import org.apache.asterix.common.config.IPropertiesProvider;
import org.apache.asterix.common.exceptions.ACIDException;
import org.apache.asterix.common.ioopcallbacks.AbstractLSMIOOperationCallback;
import org.apache.asterix.common.replication.IReplicaResourcesManager;
-import org.apache.asterix.common.replication.IReplicationManager;
+import org.apache.asterix.common.transactions.Checkpoint;
import org.apache.asterix.common.transactions.IAppRuntimeContextProvider;
+import org.apache.asterix.common.transactions.ICheckpointManager;
import org.apache.asterix.common.transactions.ILogReader;
import org.apache.asterix.common.transactions.ILogRecord;
import org.apache.asterix.common.transactions.IRecoveryManager;
@@ -66,7 +59,6 @@
import org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository;
import org.apache.asterix.transaction.management.service.logging.LogManager;
import org.apache.asterix.transaction.management.service.transaction.TransactionManagementConstants;
-import org.apache.asterix.transaction.management.service.transaction.TransactionManager;
import org.apache.asterix.transaction.management.service.transaction.TransactionSubsystem;
import org.apache.commons.io.FileUtils;
import org.apache.hyracks.api.exceptions.HyracksDataException;
@@ -89,30 +81,22 @@
private static final Logger LOGGER = Logger.getLogger(RecoveryManager.class.getName());
private final TransactionSubsystem txnSubsystem;
private final LogManager logMgr;
- private final int checkpointHistory;
- private final long SHARP_CHECKPOINT_LSN = -1;
private final boolean replicationEnabled;
- public static final long NON_SHARP_CHECKPOINT_TARGET_LSN = -1;
private static final String RECOVERY_FILES_DIR_NAME = "recovery_temp";
private Map<Integer, JobEntityCommits> jobId2WinnerEntitiesMap = null;
private final long cachedEntityCommitsPerJobSize;
private final PersistentLocalResourceRepository localResourceRepository;
-
- /**
- * A file at a known location that contains the LSN of the last log record
- * traversed doing a successful checkpoint.
- */
- private static final String CHECKPOINT_FILENAME_PREFIX = "checkpoint_";
+ private final ICheckpointManager checkpointManager;
private SystemState state;
public RecoveryManager(TransactionSubsystem txnSubsystem) {
this.txnSubsystem = txnSubsystem;
logMgr = (LogManager) txnSubsystem.getLogManager();
- checkpointHistory = txnSubsystem.getTransactionProperties().getCheckpointHistory();
replicationEnabled = ClusterProperties.INSTANCE.isReplicationEnabled();
localResourceRepository = (PersistentLocalResourceRepository) txnSubsystem.getAsterixAppRuntimeContextProvider()
.getLocalResourceRepository();
cachedEntityCommitsPerJobSize = txnSubsystem.getTransactionProperties().getJobRecoveryMemorySize();
+ checkpointManager = txnSubsystem.getCheckpointManager();
}
/**
@@ -126,10 +110,8 @@
@Override
public SystemState getSystemState() throws ACIDException {
//read checkpoint file
- CheckpointObject checkpointObject = null;
- try {
- checkpointObject = readCheckpoint();
- } catch (FileNotFoundException e) {
+ Checkpoint checkpointObject = checkpointManager.getLatest();
+ if (checkpointObject == null) {
//The checkpoint file doesn't exist => Failure happened during NC initialization.
//Retry to initialize the NC by setting the state to NEW_UNIVERSE
state = SystemState.NEW_UNIVERSE;
@@ -140,7 +122,7 @@
}
if (replicationEnabled) {
- if (checkpointObject.getMinMCTFirstLsn() == SHARP_CHECKPOINT_LSN) {
+ if (checkpointObject.getMinMCTFirstLsn() == AbstractCheckpointManager.SHARP_CHECKPOINT_LSN) {
//no logs exist
state = SystemState.HEALTHY;
return state;
@@ -156,14 +138,14 @@
} else {
long readableSmallestLSN = logMgr.getReadableSmallestLSN();
if (logMgr.getAppendLSN() == readableSmallestLSN) {
- if (checkpointObject.getMinMCTFirstLsn() != SHARP_CHECKPOINT_LSN) {
+ if (checkpointObject.getMinMCTFirstLsn() != AbstractCheckpointManager.SHARP_CHECKPOINT_LSN) {
LOGGER.warning("Some(or all) of transaction log files are lost.");
//No choice but continuing when the log files are lost.
}
state = SystemState.HEALTHY;
return state;
} else if (checkpointObject.getCheckpointLsn() == logMgr.getAppendLSN()
- && checkpointObject.getMinMCTFirstLsn() == SHARP_CHECKPOINT_LSN) {
+ && checkpointObject.getMinMCTFirstLsn() == AbstractCheckpointManager.SHARP_CHECKPOINT_LSN) {
state = SystemState.HEALTHY;
return state;
} else {
@@ -180,7 +162,7 @@
LOGGER.log(Level.INFO, "starting recovery ...");
long readableSmallestLSN = logMgr.getReadableSmallestLSN();
- CheckpointObject checkpointObject = readCheckpoint();
+ Checkpoint checkpointObject = checkpointManager.getLatest();
long lowWaterMarkLSN = checkpointObject.getMinMCTFirstLsn();
if (lowWaterMarkLSN < readableSmallestLSN) {
lowWaterMarkLSN = readableSmallestLSN;
@@ -372,8 +354,8 @@
//#. get maxDiskLastLSN
ILSMIndex lsmIndex = index;
try {
- maxDiskLastLsn =
- ((AbstractLSMIOOperationCallback) lsmIndex.getIOOperationCallback())
+ maxDiskLastLsn = ((AbstractLSMIOOperationCallback) lsmIndex
+ .getIOOperationCallback())
.getComponentLSN(lsmIndex.getImmutableComponents());
} catch (HyracksDataException e) {
datasetLifecycleManager.close(localResource.getPath());
@@ -422,129 +404,6 @@
}
@Override
- public synchronized long checkpoint(boolean isSharpCheckpoint, long nonSharpCheckpointTargetLSN)
- throws ACIDException, HyracksDataException {
- long minMCTFirstLSN;
- boolean nonSharpCheckpointSucceeded = false;
-
- if (isSharpCheckpoint) {
- LOGGER.log(Level.INFO, "Starting sharp checkpoint ... ");
- }
-
- TransactionManager txnMgr = (TransactionManager) txnSubsystem.getTransactionManager();
- String logDir = logMgr.getLogManagerProperties().getLogDir();
-
- //get the filename of the previous checkpoint files which are about to be deleted
- //right after the new checkpoint file is written.
- File[] prevCheckpointFiles = getPreviousCheckpointFiles();
-
- IDatasetLifecycleManager datasetLifecycleManager =
- txnSubsystem.getAsterixAppRuntimeContextProvider().getDatasetLifecycleManager();
- //flush all in-memory components if it is the sharp checkpoint
- if (isSharpCheckpoint) {
- datasetLifecycleManager.flushAllDatasets();
- if (!replicationEnabled) {
- minMCTFirstLSN = SHARP_CHECKPOINT_LSN;
- } else {
- //if is shutting down, need to check if we need to keep any remote logs for dead replicas
- if (txnSubsystem.getAsterixAppRuntimeContextProvider().getAppContext().isShuttingdown()) {
- Set<String> deadReplicaIds = txnSubsystem.getAsterixAppRuntimeContextProvider().getAppContext()
- .getReplicationManager().getDeadReplicasIds();
- if (deadReplicaIds.isEmpty()) {
- minMCTFirstLSN = SHARP_CHECKPOINT_LSN;
- } else {
- //get min LSN of dead replicas remote resources
- IReplicaResourcesManager remoteResourcesManager = txnSubsystem
- .getAsterixAppRuntimeContextProvider().getAppContext().getReplicaResourcesManager();
- IPropertiesProvider propertiesProvider = (IPropertiesProvider) txnSubsystem
- .getAsterixAppRuntimeContextProvider().getAppContext();
- MetadataProperties metadataProperties = propertiesProvider.getMetadataProperties();
- Set<Integer> deadReplicasPartitions = new HashSet<>();
- //get partitions of the dead replicas that are not active on this node
- for (String deadReplicaId : deadReplicaIds) {
- ClusterPartition[] nodePartitons =
- metadataProperties.getNodePartitions().get(deadReplicaId);
- for (ClusterPartition partition : nodePartitons) {
- if (!localResourceRepository.getActivePartitions()
- .contains(partition.getPartitionId())) {
- deadReplicasPartitions.add(partition.getPartitionId());
- }
- }
- }
- minMCTFirstLSN = remoteResourcesManager.getPartitionsMinLSN(deadReplicasPartitions);
- }
- } else {
- //start up complete checkpoint. Avoid deleting remote recovery logs.
- minMCTFirstLSN = getMinFirstLSN();
- }
- }
- } else {
- minMCTFirstLSN = getMinFirstLSN();
- if (minMCTFirstLSN >= nonSharpCheckpointTargetLSN) {
- nonSharpCheckpointSucceeded = true;
- } else {
- //flush datasets with indexes behind target checkpoint LSN
- datasetLifecycleManager.scheduleAsyncFlushForLaggingDatasets(nonSharpCheckpointTargetLSN);
- if (replicationEnabled) {
- //request remote replicas to flush lagging indexes
- IReplicationManager replicationManager =
- txnSubsystem.getAsterixAppRuntimeContextProvider().getAppContext().getReplicationManager();
- try {
- replicationManager.requestFlushLaggingReplicaIndexes(nonSharpCheckpointTargetLSN);
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
- }
- }
-
- CheckpointObject checkpointObject = new CheckpointObject(logMgr.getAppendLSN(), minMCTFirstLSN,
- txnMgr.getMaxJobId(), System.currentTimeMillis(), isSharpCheckpoint);
-
- String fileName = getCheckpointFileName(logDir, Long.toString(checkpointObject.getTimeStamp()));
-
- try (FileOutputStream fos = new FileOutputStream(fileName);
- ObjectOutputStream oosToFos = new ObjectOutputStream(fos)) {
- oosToFos.writeObject(checkpointObject);
- oosToFos.flush();
- } catch (IOException e) {
- throw new ACIDException("Failed to checkpoint", e);
- }
-
- //#. delete the previous checkpoint files
- if (prevCheckpointFiles != null) {
- // sort the filenames lexicographically to keep the latest checkpointHistory files.
- Arrays.sort(prevCheckpointFiles);
- for (int i = 0; i < prevCheckpointFiles.length - this.checkpointHistory; ++i) {
- prevCheckpointFiles[i].delete();
- }
- }
-
- if (isSharpCheckpoint) {
- try {
- if (minMCTFirstLSN == SHARP_CHECKPOINT_LSN) {
- logMgr.renewLogFiles();
- } else {
- logMgr.deleteOldLogFiles(minMCTFirstLSN);
- }
- } catch (IOException e) {
- throw new HyracksDataException(e);
- }
- }
-
- if (nonSharpCheckpointSucceeded) {
- logMgr.deleteOldLogFiles(minMCTFirstLSN);
- }
-
- if (isSharpCheckpoint) {
- LOGGER.info("Completed sharp checkpoint.");
- }
-
- //return the min LSN that was recorded in the checkpoint
- return minMCTFirstLSN;
- }
-
- @Override
public long getMinFirstLSN() throws HyracksDataException {
long minFirstLSN = getLocalMinFirstLSN();
@@ -559,16 +418,16 @@
@Override
public long getLocalMinFirstLSN() throws HyracksDataException {
- IDatasetLifecycleManager datasetLifecycleManager =
- txnSubsystem.getAsterixAppRuntimeContextProvider().getDatasetLifecycleManager();
+ IDatasetLifecycleManager datasetLifecycleManager = txnSubsystem.getAsterixAppRuntimeContextProvider()
+ .getDatasetLifecycleManager();
List<IIndex> openIndexList = datasetLifecycleManager.getOpenResources();
long firstLSN;
//the min first lsn can only be the current append or smaller
long minFirstLSN = logMgr.getAppendLSN();
if (openIndexList.size() > 0) {
for (IIndex index : openIndexList) {
- AbstractLSMIOOperationCallback ioCallback =
- (AbstractLSMIOOperationCallback) ((ILSMIndex) index).getIOOperationCallback();
+ AbstractLSMIOOperationCallback ioCallback = (AbstractLSMIOOperationCallback) ((ILSMIndex) index)
+ .getIOOperationCallback();
if (!((AbstractLSMIndex) index).isCurrentMutableComponentEmpty() || ioCallback.hasPendingFlush()) {
firstLSN = ioCallback.getFirstLSN();
@@ -580,60 +439,12 @@
}
private long getRemoteMinFirstLSN() {
- IReplicaResourcesManager remoteResourcesManager =
- txnSubsystem.getAsterixAppRuntimeContextProvider().getAppContext().getReplicaResourcesManager();
+ IReplicaResourcesManager remoteResourcesManager = txnSubsystem.getAsterixAppRuntimeContextProvider()
+ .getAppContext().getReplicaResourcesManager();
long minRemoteLSN = remoteResourcesManager.getPartitionsMinLSN(localResourceRepository.getInactivePartitions());
return minRemoteLSN;
}
- private CheckpointObject readCheckpoint() throws ACIDException, FileNotFoundException {
- CheckpointObject checkpointObject = null;
-
- //read all checkpointObjects from the existing checkpoint files
- File[] prevCheckpointFiles = getPreviousCheckpointFiles();
- if (prevCheckpointFiles == null || prevCheckpointFiles.length == 0) {
- throw new FileNotFoundException("Checkpoint file is not found");
- }
-
- List<CheckpointObject> checkpointObjectList = new ArrayList<>();
- for (File file : prevCheckpointFiles) {
- try (FileInputStream fis = new FileInputStream(file);
- ObjectInputStream oisFromFis = new ObjectInputStream(fis)) {
- checkpointObject = (CheckpointObject) oisFromFis.readObject();
- checkpointObjectList.add(checkpointObject);
- } catch (Exception e) {
- throw new ACIDException("Failed to read a checkpoint file", e);
- }
- }
-
- //sort checkpointObjects in descending order by timeStamp to find out the most recent one.
- Collections.sort(checkpointObjectList);
-
- //return the most recent one (the first one in sorted list)
- return checkpointObjectList.get(0);
- }
-
- private File[] getPreviousCheckpointFiles() {
- String logDir = ((LogManager) txnSubsystem.getLogManager()).getLogManagerProperties().getLogDir();
- File parentDir = new File(logDir);
-
- FilenameFilter filter = new FilenameFilter() {
- @Override
- public boolean accept(File dir, String name) {
- return name.contains(CHECKPOINT_FILENAME_PREFIX);
- }
- };
-
- return parentDir.listFiles(filter);
- }
-
- private static String getCheckpointFileName(String baseDir, String suffix) {
- if (!baseDir.endsWith(System.getProperty("file.separator"))) {
- baseDir += System.getProperty("file.separator");
- }
- return baseDir + CHECKPOINT_FILENAME_PREFIX + suffix;
- }
-
@Override
public File createJobRecoveryFile(int jobId, String fileName) throws IOException {
String recoveryDirPath = getRecoveryDirPath();
@@ -794,8 +605,8 @@
//undo loserTxn's effect
LOGGER.log(Level.INFO, "undoing loser transaction's effect");
- IDatasetLifecycleManager datasetLifecycleManager =
- txnSubsystem.getAsterixAppRuntimeContextProvider().getDatasetLifecycleManager();
+ IDatasetLifecycleManager datasetLifecycleManager = txnSubsystem.getAsterixAppRuntimeContextProvider()
+ .getDatasetLifecycleManager();
//TODO sort loser entities by smallest LSN to undo in one pass.
Iterator<Entry<TxnId, List<Long>>> iter = jobLoserEntity2LSNsMap.entrySet().iterator();
int undoCount = 0;
@@ -836,12 +647,8 @@
@Override
public void stop(boolean dumpState, OutputStream os) throws IOException {
- try {
- checkpoint(true, NON_SHARP_CHECKPOINT_TARGET_LSN);
- } catch (HyracksDataException | ACIDException e) {
- e.printStackTrace();
- throw new IOException(e);
- }
+ // Shutdown checkpoint
+ checkpointManager.doSharpCheckpoint();
}
@Override
@@ -851,10 +658,10 @@
private static void undo(ILogRecord logRecord, IDatasetLifecycleManager datasetLifecycleManager) {
try {
- ILSMIndex index =
- (ILSMIndex) datasetLifecycleManager.getIndex(logRecord.getDatasetId(), logRecord.getResourceId());
- ILSMIndexAccessor indexAccessor =
- index.createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE);
+ ILSMIndex index = (ILSMIndex) datasetLifecycleManager.getIndex(logRecord.getDatasetId(),
+ logRecord.getResourceId());
+ ILSMIndexAccessor indexAccessor = index.createAccessor(NoOpOperationCallback.INSTANCE,
+ NoOpOperationCallback.INSTANCE);
if (logRecord.getNewOp() == IndexOperation.INSERT.ordinal()) {
indexAccessor.forceDelete(logRecord.getNewValue());
} else if (logRecord.getNewOp() == IndexOperation.DELETE.ordinal()) {
@@ -871,10 +678,9 @@
try {
int datasetId = logRecord.getDatasetId();
long resourceId = logRecord.getResourceId();
- ILSMIndex index =
- (ILSMIndex) datasetLifecycleManager.getIndex(datasetId, resourceId);
- ILSMIndexAccessor indexAccessor =
- index.createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE);
+ ILSMIndex index = (ILSMIndex) datasetLifecycleManager.getIndex(datasetId, resourceId);
+ ILSMIndexAccessor indexAccessor = index.createAccessor(NoOpOperationCallback.INSTANCE,
+ NoOpOperationCallback.INSTANCE);
if (logRecord.getNewOp() == IndexOperation.INSERT.ordinal()) {
indexAccessor.forceInsert(logRecord.getNewValue());
} else if (logRecord.getNewOp() == IndexOperation.DELETE.ordinal()) {
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/ReplicationCheckpointManager.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/ReplicationCheckpointManager.java
new file mode 100644
index 0000000..6fdee33
--- /dev/null
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/ReplicationCheckpointManager.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.transaction.management.service.recovery;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.logging.Logger;
+
+import org.apache.asterix.common.api.IDatasetLifecycleManager;
+import org.apache.asterix.common.cluster.ClusterPartition;
+import org.apache.asterix.common.config.IPropertiesProvider;
+import org.apache.asterix.common.config.MetadataProperties;
+import org.apache.asterix.common.replication.IReplicaResourcesManager;
+import org.apache.asterix.common.replication.IReplicationManager;
+import org.apache.asterix.common.transactions.CheckpointProperties;
+import org.apache.asterix.common.transactions.ICheckpointManager;
+import org.apache.asterix.common.transactions.ITransactionSubsystem;
+import org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+/**
+ * An implementation of {@link ICheckpointManager} that defines the logic
+ * of checkpoints when replication is enabled..
+ */
+public class ReplicationCheckpointManager extends AbstractCheckpointManager {
+
+ private static final Logger LOGGER = Logger.getLogger(ReplicationCheckpointManager.class.getName());
+
+ public ReplicationCheckpointManager(ITransactionSubsystem txnSubsystem, CheckpointProperties checkpointProperties) {
+ super(txnSubsystem, checkpointProperties);
+ }
+
+ /**
+ * Performs a sharp checkpoint. All datasets are flushed and all transaction
+ * log files are deleted except the files that are needed for dead replicas.
+ */
+ @Override
+ public synchronized void doSharpCheckpoint() throws HyracksDataException {
+ LOGGER.info("Starting sharp checkpoint...");
+ final IDatasetLifecycleManager datasetLifecycleManager = txnSubsystem.getAsterixAppRuntimeContextProvider()
+ .getDatasetLifecycleManager();
+ datasetLifecycleManager.flushAllDatasets();
+ long minFirstLSN;
+ // If shutting down, need to check if we need to keep any remote logs for dead replicas
+ if (txnSubsystem.getAsterixAppRuntimeContextProvider().getAppContext().isShuttingdown()) {
+ final Set<String> deadReplicaIds = txnSubsystem.getAsterixAppRuntimeContextProvider().getAppContext()
+ .getReplicationManager().getDeadReplicasIds();
+ if (deadReplicaIds.isEmpty()) {
+ // No dead replicas => no need to keep any log
+ minFirstLSN = SHARP_CHECKPOINT_LSN;
+ } else {
+ // Get min LSN of dead replicas remote resources
+ minFirstLSN = getDeadReplicasMinFirstLSN(deadReplicaIds);
+ }
+ } else {
+ // Start up complete checkpoint. Avoid deleting remote recovery logs.
+ minFirstLSN = txnSubsystem.getRecoveryManager().getMinFirstLSN();
+ }
+ capture(minFirstLSN, true);
+ if (minFirstLSN == SHARP_CHECKPOINT_LSN) {
+ // No need to keep any logs
+ txnSubsystem.getLogManager().renewLogFiles();
+ } else {
+ // Delete only log files with LSNs < any dead replica partition minimum LSN
+ txnSubsystem.getLogManager().deleteOldLogFiles(minFirstLSN);
+ }
+ LOGGER.info("Completed sharp checkpoint.");
+ }
+
+ /***
+ * Attempts to perform a soft checkpoint at the specified {@code checkpointTargetLSN}.
+ * If a checkpoint cannot be captured due to datasets having LSN < {@code checkpointTargetLSN},
+ * an asynchronous flush is triggered on them. If the checkpoint fails due to a replica index,
+ * a request is sent to the primary replica of the index to flush it.
+ * When a checkpoint is successful, all transaction log files that end with
+ * LSN < {@code checkpointTargetLSN} are deleted.
+ */
+ @Override
+ public synchronized long tryCheckpoint(long checkpointTargetLSN) throws HyracksDataException {
+ LOGGER.info("Attemping soft checkpoint...");
+ final long minFirstLSN = txnSubsystem.getRecoveryManager().getMinFirstLSN();
+ boolean checkpointSucceeded = minFirstLSN >= checkpointTargetLSN;
+ if (!checkpointSucceeded) {
+ // Flush datasets with indexes behind target checkpoint LSN
+ final IDatasetLifecycleManager datasetLifecycleManager = txnSubsystem.getAsterixAppRuntimeContextProvider()
+ .getDatasetLifecycleManager();
+ datasetLifecycleManager.scheduleAsyncFlushForLaggingDatasets(checkpointTargetLSN);
+ // Request remote replicas to flush lagging indexes
+ final IReplicationManager replicationManager = txnSubsystem.getAsterixAppRuntimeContextProvider()
+ .getAppContext().getReplicationManager();
+ try {
+ replicationManager.requestFlushLaggingReplicaIndexes(checkpointTargetLSN);
+ } catch (IOException e) {
+ throw new HyracksDataException(e);
+ }
+ }
+ capture(minFirstLSN, false);
+ if (checkpointSucceeded) {
+ txnSubsystem.getLogManager().deleteOldLogFiles(minFirstLSN);
+ LOGGER.info(String.format("soft checkpoint succeeded with at LSN(%s)", minFirstLSN));
+ }
+ return minFirstLSN;
+ }
+
+ private long getDeadReplicasMinFirstLSN(Set<String> deadReplicaIds) {
+ final IReplicaResourcesManager remoteResourcesManager = txnSubsystem.getAsterixAppRuntimeContextProvider()
+ .getAppContext().getReplicaResourcesManager();
+ final IPropertiesProvider propertiesProvider = (IPropertiesProvider) txnSubsystem
+ .getAsterixAppRuntimeContextProvider().getAppContext();
+ final MetadataProperties metadataProperties = propertiesProvider.getMetadataProperties();
+ final PersistentLocalResourceRepository localResourceRepository =
+ (PersistentLocalResourceRepository) txnSubsystem
+ .getAsterixAppRuntimeContextProvider().getLocalResourceRepository();
+ // Get partitions of the dead replicas that are not active on this node
+ final Set<Integer> deadReplicasPartitions = new HashSet<>();
+ for (String deadReplicaId : deadReplicaIds) {
+ final ClusterPartition[] nodePartitons = metadataProperties.getNodePartitions().get(deadReplicaId);
+ for (ClusterPartition partition : nodePartitons) {
+ if (!localResourceRepository.getActivePartitions().contains(partition.getPartitionId())) {
+ deadReplicasPartitions.add(partition.getPartitionId());
+ }
+ }
+ }
+ return remoteResourcesManager.getPartitionsMinLSN(deadReplicasPartitions);
+ }
+}
\ No newline at end of file
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionManager.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionManager.java
index f035029..b08ecbb 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionManager.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionManager.java
@@ -145,6 +145,7 @@
}
}
+ @Override
public int getMaxJobId() {
return maxJobId.get();
}
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionSubsystem.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionSubsystem.java
index ce1752a..09183fe 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionSubsystem.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionSubsystem.java
@@ -22,21 +22,25 @@
import java.util.concurrent.Future;
import java.util.logging.Logger;
-import org.apache.asterix.common.config.ReplicationProperties;
-import org.apache.asterix.common.config.TransactionProperties;
import org.apache.asterix.common.config.ClusterProperties;
import org.apache.asterix.common.config.IPropertiesProvider;
+import org.apache.asterix.common.config.ReplicationProperties;
+import org.apache.asterix.common.config.TransactionProperties;
import org.apache.asterix.common.exceptions.ACIDException;
+import org.apache.asterix.common.transactions.Checkpoint;
+import org.apache.asterix.common.transactions.CheckpointProperties;
import org.apache.asterix.common.transactions.IAppRuntimeContextProvider;
+import org.apache.asterix.common.transactions.ICheckpointManager;
import org.apache.asterix.common.transactions.ILockManager;
import org.apache.asterix.common.transactions.ILogManager;
import org.apache.asterix.common.transactions.IRecoveryManager;
import org.apache.asterix.common.transactions.ITransactionManager;
import org.apache.asterix.common.transactions.ITransactionSubsystem;
+import org.apache.asterix.common.utils.StorageConstants;
import org.apache.asterix.transaction.management.service.locking.ConcurrentLockManager;
import org.apache.asterix.transaction.management.service.logging.LogManager;
import org.apache.asterix.transaction.management.service.logging.LogManagerWithReplication;
-import org.apache.asterix.transaction.management.service.recovery.CheckpointThread;
+import org.apache.asterix.transaction.management.service.recovery.CheckpointManagerFactory;
import org.apache.asterix.transaction.management.service.recovery.RecoveryManager;
/**
@@ -50,8 +54,8 @@
private final ITransactionManager transactionManager;
private final IRecoveryManager recoveryManager;
private final IAppRuntimeContextProvider asterixAppRuntimeContextProvider;
- private final CheckpointThread checkpointThread;
private final TransactionProperties txnProperties;
+ private final ICheckpointManager checkpointManager;
//for profiling purpose
public static final boolean IS_PROFILE_MODE = false;//true
@@ -66,6 +70,15 @@
this.txnProperties = txnProperties;
this.transactionManager = new TransactionManager(this);
this.lockManager = new ConcurrentLockManager(txnProperties.getLockManagerShrinkTimer());
+ final boolean replicationEnabled = ClusterProperties.INSTANCE.isReplicationEnabled();
+ final CheckpointProperties checkpointProperties = new CheckpointProperties(txnProperties, id);
+ checkpointManager = CheckpointManagerFactory.create(this, checkpointProperties, replicationEnabled);
+ final Checkpoint latestCheckpoint = checkpointManager.getLatest();
+ if (latestCheckpoint != null && latestCheckpoint.getStorageVersion() != StorageConstants.VERSION) {
+ throw new IllegalStateException(
+ String.format("Storage version mismatch. Current version (%s). On disk version: (%s)",
+ latestCheckpoint.getStorageVersion(), StorageConstants.VERSION));
+ }
ReplicationProperties asterixReplicationProperties = null;
if (asterixAppRuntimeContextProvider != null) {
@@ -73,22 +86,13 @@
.getAppContext()).getReplicationProperties();
}
- if (asterixReplicationProperties != null && ClusterProperties.INSTANCE.isReplicationEnabled()) {
+ if (asterixReplicationProperties != null && replicationEnabled) {
this.logManager = new LogManagerWithReplication(this);
} else {
this.logManager = new LogManager(this);
}
-
this.recoveryManager = new RecoveryManager(this);
- if (asterixAppRuntimeContextProvider != null) {
- this.checkpointThread = new CheckpointThread(recoveryManager, logManager,
- this.txnProperties.getCheckpointLSNThreshold(), this.txnProperties.getCheckpointPollFrequency());
- this.checkpointThread.start();
- } else {
- this.checkpointThread = null;
- }
-
if (IS_PROFILE_MODE) {
ecp = new EntityCommitProfiler(this, this.txnProperties.getCommitProfilerReportInterval());
fecp = (Future<Object>) getAsterixAppRuntimeContextProvider().getThreadExecutor().submit(ecp);
@@ -133,6 +137,11 @@
++profilerEntityCommitLogCount;
}
+ @Override
+ public ICheckpointManager getCheckpointManager() {
+ return checkpointManager;
+ }
+
/**
* Thread for profiling entity level commit count
* This thread takes a report interval (in seconds) parameter and