ASTERIXDB-1425 & ASTERIXDB-1450: Fix LogReader random reads
- Fix random reads for truncated logs (ASTERIXDB-1425).
- Fix log file partition size boundary check (ASTERIXDB-1450).
- Fix deadlock between LogReader and LogFlusher.
- Prevent checkpoints from deleting log files being accessed by rollbacks.
- Make rollbacks start from LSN = max(txnFirstLSN, minMemoryLSN).
- Make default log partition size 256MB instead of 2GB.
Change-Id: I1c75ca4a7c8fe197451126392389d4baecbd7e45
Reviewed-on: https://asterix-gerrit.ics.uci.edu/867
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Ian Maxon <imaxon@apache.org>
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixTransactionProperties.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixTransactionProperties.java
index c4a5e8e..356dad3 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixTransactionProperties.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixTransactionProperties.java
@@ -32,7 +32,7 @@
private static final int TXN_LOG_BUFFER_PAGESIZE_DEFAULT = (128 << 10); // 128KB
private static final String TXN_LOG_PARTITIONSIZE_KEY = "txn.log.partitionsize";
- private static final long TXN_LOG_PARTITIONSIZE_DEFAULT = ((long) 2 << 30); // 2GB
+ private static final long TXN_LOG_PARTITIONSIZE_DEFAULT = StorageUtil.getSizeInBytes(256L, StorageUnit.MEGABYTE);
private static final String TXN_LOG_CHECKPOINT_LSNTHRESHOLD_KEY = "txn.log.checkpoint.lsnthreshold";
private static final int TXN_LOG_CHECKPOINT_LSNTHRESHOLD_DEFAULT = (64 << 20); // 64M
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogManager.java
index cff4184..97d4897 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogManager.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogManager.java
@@ -19,6 +19,7 @@
package org.apache.asterix.common.transactions;
import java.io.IOException;
+import java.nio.channels.FileChannel;
import org.apache.asterix.common.exceptions.ACIDException;
import org.apache.asterix.common.replication.IReplicationManager;
@@ -85,4 +86,23 @@
*/
public int getNumLogPages();
+ /**
+ * Opens a file channel to the log file which contains {@code LSN}.
+ * The start position of the file channel will be at the first LSN of the file.
+ *
+ * @param LSN
+ * @return
+ * @throws IOException
+ * if the log file does not exist.
+ */
+ public TxnLogFile getLogFile(long LSN) throws IOException;
+
+ /**
+ * Closes the log file.
+ *
+ * @param logFileRef
+ * @param fileChannel
+ * @throws IOException
+ */
+ public void closeLogFile(TxnLogFile logFileRef, FileChannel fileChannel) throws IOException;
}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/TxnLogFile.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/TxnLogFile.java
new file mode 100644
index 0000000..e535206
--- /dev/null
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/TxnLogFile.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.transactions;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+
+public class TxnLogFile {
+
+ private final FileChannel fileChannel;
+ private final long logFileId;
+ private final long fileBeginLSN;
+ private final ILogManager logManager;
+ private boolean open = true;
+
+ public TxnLogFile(ILogManager logManager, FileChannel fileChannel, long logFileId, long fileBeginLSN) {
+ this.logManager = logManager;
+ this.fileChannel = fileChannel;
+ this.logFileId = logFileId;
+ this.fileBeginLSN = fileBeginLSN;
+ }
+
+ public void position(long newPosition) throws IOException {
+ fileChannel.position(newPosition);
+ }
+
+ public long size() throws IOException {
+ return fileChannel.size();
+ }
+
+ public int read(ByteBuffer readBuffer) throws IOException {
+ return fileChannel.read(readBuffer);
+ }
+
+ public long getLogFileId() {
+ return logFileId;
+ }
+
+ public synchronized void close() throws IOException {
+ if (open) {
+ logManager.closeLogFile(this, fileChannel);
+ open = false;
+ }
+ }
+
+ public long getFileBeginLSN() {
+ return fileBeginLSN;
+ }
+}
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java
index f10520b..be0435a 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java
@@ -27,6 +27,7 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
+import java.util.HashMap;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
@@ -47,6 +48,7 @@
import org.apache.asterix.common.transactions.LogManagerProperties;
import org.apache.asterix.common.transactions.LogType;
import org.apache.asterix.common.transactions.MutableLong;
+import org.apache.asterix.common.transactions.TxnLogFile;
import org.apache.asterix.transaction.management.service.transaction.TransactionSubsystem;
import org.apache.hyracks.api.lifecycle.ILifeCycleComponent;
@@ -74,6 +76,7 @@
private final String nodeId;
protected LinkedBlockingQueue<ILogRecord> flushLogsQ;
private final FlushLogsLogger flushLogsLogger;
+ private final HashMap<Long, Integer> txnLogFileId2ReaderCount = new HashMap<>();
public LogManager(TransactionSubsystem txnSubsystem) {
this.txnSubsystem = txnSubsystem;
@@ -148,7 +151,13 @@
"Aborted job(" + txnCtx.getJobId() + ") tried to write non-abort type log record.");
}
}
- if (getLogFileOffset(appendLSN.get()) + logRecord.getLogSize() > logFileSize) {
+
+ /**
+ * To eliminate the case where the modulo of the next appendLSN = 0 (the next
+ * appendLSN = the first LSN of the next log file), we do not allow a log to be
+ * written at the last offset of the current file.
+ */
+ if (getLogFileOffset(appendLSN.get()) + logRecord.getLogSize() >= logFileSize) {
prepareNextLogFile();
appendPage.isFull(true);
getAndInitNewPage();
@@ -194,8 +203,28 @@
}
protected void prepareNextLogFile() {
+ //wait until all log records have been flushed in the current file
+ synchronized (flushLSN) {
+ while (flushLSN.get() != appendLSN.get()) {
+ //notification will come from LogBuffer.internalFlush(.)
+ try {
+ flushLSN.wait();
+ } catch (InterruptedException e) {
+ if (LOGGER.isLoggable(Level.SEVERE)) {
+ LOGGER.severe("Preparing new log file was interrupted");
+ }
+ Thread.currentThread().interrupt();
+ }
+ }
+ }
+ //move appendLSN and flushLSN to the first LSN of the next log file
appendLSN.addAndGet(logFileSize - getLogFileOffset(appendLSN.get()));
+ flushLSN.set(appendLSN.get());
appendChannel = getFileChannel(appendLSN.get(), true);
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Created new txn log file with id(" + getLogFileId(appendLSN.get()) + ") starting with LSN = "
+ + appendLSN.get());
+ }
appendPage.isLastPage(true);
//[Notice]
//the current log file channel is closed if
@@ -323,15 +352,32 @@
@Override
public void deleteOldLogFiles(long checkpointLSN) {
-
Long checkpointLSNLogFileID = getLogFileId(checkpointLSN);
List<Long> logFileIds = getLogFileIds();
if (logFileIds != null) {
- for (Long id : logFileIds) {
- if (id < checkpointLSNLogFileID) {
+ //sort log files from oldest to newest
+ Collections.sort(logFileIds);
+ /**
+ * At this point, any future LogReader should read from LSN >= checkpointLSN
+ */
+ synchronized (txnLogFileId2ReaderCount) {
+ for (Long id : logFileIds) {
+ /**
+ * Stop deletion if:
+ * The log file which contains the checkpointLSN has been reached.
+ * The oldest log file being accessed by a LogReader has been reached.
+ */
+ if (id >= checkpointLSNLogFileID
+ || (txnLogFileId2ReaderCount.containsKey(id) && txnLogFileId2ReaderCount.get(id) > 0)) {
+ break;
+ }
+
+ //delete old log file
File file = new File(getLogFilePath(id));
- if (!file.delete()) {
- throw new IllegalStateException("Failed to delete a file: " + file.getAbsolutePath());
+ file.delete();
+ txnLogFileId2ReaderCount.remove(id);
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Deleted log file " + file.getAbsolutePath());
}
}
}
@@ -365,6 +411,7 @@
throw new IllegalStateException("Failed to close a fileChannel of a log file");
}
}
+ txnLogFileId2ReaderCount.clear();
List<Long> logFileIds = getLogFileIds();
if (logFileIds != null) {
for (Long id : logFileIds) {
@@ -434,7 +481,7 @@
return (new File(path)).mkdir();
}
- public FileChannel getFileChannel(long lsn, boolean create) {
+ private FileChannel getFileChannel(long lsn, boolean create) {
FileChannel newFileChannel = null;
try {
long fileId = getLogFileId(lsn);
@@ -496,6 +543,55 @@
return numLogPages;
}
+ @Override
+ public TxnLogFile getLogFile(long LSN) throws IOException {
+ long fileId = getLogFileId(LSN);
+ String logFilePath = getLogFilePath(fileId);
+ File file = new File(logFilePath);
+ if (!file.exists()) {
+ throw new IOException("Log file with id(" + fileId + ") was not found. Requested LSN: " + LSN);
+ }
+ RandomAccessFile raf = new RandomAccessFile(new File(logFilePath), "r");
+ FileChannel newFileChannel = raf.getChannel();
+ TxnLogFile logFile = new TxnLogFile(this, newFileChannel, fileId, fileId * logFileSize);
+ touchLogFile(fileId);
+ return logFile;
+ }
+
+ @Override
+ public void closeLogFile(TxnLogFile logFileRef, FileChannel fileChannel) throws IOException {
+ if (!fileChannel.isOpen()) {
+ throw new IllegalStateException("File channel is not open");
+ }
+ fileChannel.close();
+ untouchLogFile(logFileRef.getLogFileId());
+ }
+
+ private void touchLogFile(long fileId) {
+ synchronized (txnLogFileId2ReaderCount) {
+ if (txnLogFileId2ReaderCount.containsKey(fileId)) {
+ txnLogFileId2ReaderCount.put(fileId, txnLogFileId2ReaderCount.get(fileId) + 1);
+ } else {
+ txnLogFileId2ReaderCount.put(fileId, 1);
+ }
+ }
+ }
+
+ private void untouchLogFile(long fileId) {
+ synchronized (txnLogFileId2ReaderCount) {
+ if (txnLogFileId2ReaderCount.containsKey(fileId)) {
+ int newReaderCount = txnLogFileId2ReaderCount.get(fileId) - 1;
+ if (newReaderCount < 0) {
+ throw new IllegalStateException(
+ "Invalid log file reader count (ID=" + fileId + ", count: " + newReaderCount + ")");
+ }
+ txnLogFileId2ReaderCount.put(fileId, newReaderCount);
+ } else {
+ throw new IllegalStateException("Trying to close log file id(" + fileId + ") which was not opened.");
+ }
+ }
+ }
+
/**
* This class is used to log FLUSH logs.
* FLUSH logs are flushed on a different thread to avoid a possible deadlock in LogBuffer batchUnlock which calls PrimaryIndexOpeartionTracker.completeOperation
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogReader.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogReader.java
index 0b1d320..1592aba 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogReader.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogReader.java
@@ -20,15 +20,16 @@
import java.io.IOException;
import java.nio.ByteBuffer;
-import java.nio.channels.FileChannel;
import java.util.logging.Logger;
import org.apache.asterix.common.exceptions.ACIDException;
+import org.apache.asterix.common.transactions.ILogManager;
import org.apache.asterix.common.transactions.ILogReader;
import org.apache.asterix.common.transactions.ILogRecord;
import org.apache.asterix.common.transactions.ILogRecord.RecordReadStatus;
import org.apache.asterix.common.transactions.LogRecord;
import org.apache.asterix.common.transactions.MutableLong;
+import org.apache.asterix.common.transactions.TxnLogFile;
/**
* NOTE: Many method calls of this class are not thread safe.
@@ -38,7 +39,7 @@
public static final boolean IS_DEBUG_MODE = false;//true
private static final Logger LOGGER = Logger.getLogger(LogReader.class.getName());
- private final LogManager logMgr;
+ private final ILogManager logMgr;
private final long logFileSize;
private final int logPageSize;
private final MutableLong flushLSN;
@@ -48,14 +49,15 @@
private long readLSN;
private long bufferBeginLSN;
private long fileBeginLSN;
- private FileChannel fileChannel;
+ private TxnLogFile logFile;
private enum ReturnState {
FLUSH,
EOF
};
- public LogReader(LogManager logMgr, long logFileSize, int logPageSize, MutableLong flushLSN, boolean isRecoveryMode) {
+ public LogReader(ILogManager logMgr, long logFileSize, int logPageSize, MutableLong flushLSN,
+ boolean isRecoveryMode) {
this.logMgr = logMgr;
this.logFileSize = logFileSize;
this.logPageSize = logPageSize;
@@ -71,12 +73,13 @@
if (waitForFlushOrReturnIfEOF() == ReturnState.EOF) {
return;
}
- getFileChannel();
+ getLogFile();
fillLogReadBuffer();
}
/**
* Get the next log record from the log file.
+ *
* @return A deserialized log record, or null if we have reached the end of the file.
* @throws ACIDException
*/
@@ -119,11 +122,14 @@
continue;
}
case BAD_CHKSUM: {
- LOGGER.severe("Transaction log contains corrupt log records (perhaps due to medium error). Stopping recovery early.");
+ LOGGER.severe(
+ "Transaction log contains corrupt log records (perhaps due to medium error). Stopping recovery early.");
return null;
}
case OK:
break;
+ default:
+ throw new IllegalStateException("Unexpected log read status: " + status);
}
// break the loop by default
@@ -136,14 +142,14 @@
private ReturnState waitForFlushOrReturnIfEOF() {
synchronized (flushLSN) {
- while (readLSN >= flushLSN.get()) {
+ while (readLSN > flushLSN.get()) {
if (isRecoveryMode) {
return ReturnState.EOF;
}
try {
if (IS_DEBUG_MODE) {
- LOGGER.info("waitForFlushOrReturnIfEOF()| flushLSN: " + flushLSN.get() + ", readLSN: "
- + readLSN);
+ LOGGER.info(
+ "waitForFlushOrReturnIfEOF()| flushLSN: " + flushLSN.get() + ", readLSN: " + readLSN);
}
flushLSN.wait();
} catch (InterruptedException e) {
@@ -156,15 +162,16 @@
/**
* Continues log analysis between log file splits.
+ *
* @return true if log continues, false if EOF
* @throws ACIDException
*/
private boolean refillLogReadBuffer() throws ACIDException {
try {
- if (readLSN % logFileSize == fileChannel.size()) {
- fileChannel.close();
+ if (readLSN % logFileSize == logFile.size()) {
+ logFile.close();
readLSN += logFileSize - (readLSN % logFileSize);
- getFileChannel();
+ getLogFile();
}
return fillLogReadBuffer();
} catch (IOException e) {
@@ -174,6 +181,7 @@
/**
* Fills the log buffer with data from the log file at the current position
+ *
* @return false if EOF, true otherwise
* @throws ACIDException
*/
@@ -183,17 +191,17 @@
}
private boolean fillLogReadBuffer(int readSize, ByteBuffer readBuffer) throws ACIDException {
- int size=0;
- int read=0;
+ int size = 0;
+ int read = 0;
readBuffer.position(0);
readBuffer.limit(readSize);
try {
- fileChannel.position(readLSN % logFileSize);
+ logFile.position(readLSN % logFileSize);
//We loop here because read() may return 0, but this simply means we are waiting on IO.
//Therefore we want to break out only when either the buffer is full, or we reach EOF.
- while( size < readSize && read != -1) {
- read = fileChannel.read(readBuffer);
- if(read>0) {
+ while (size < readSize && read != -1) {
+ read = logFile.read(readBuffer);
+ if (read > 0) {
size += read;
}
}
@@ -202,7 +210,7 @@
}
readBuffer.position(0);
readBuffer.limit(size);
- if(size == 0 && read == -1){
+ if (size == 0 && read == -1) {
return false; //EOF
}
bufferBeginLSN = readLSN;
@@ -213,38 +221,37 @@
@Override
public ILogRecord read(long LSN) throws ACIDException {
readLSN = LSN;
+ //wait for the log to be flushed if needed before trying to read it.
synchronized (flushLSN) {
- while (readLSN >= flushLSN.get()) {
+ while (readLSN > flushLSN.get()) {
try {
flushLSN.wait();
} catch (InterruptedException e) {
- //ignore
+ Thread.currentThread().interrupt();
}
}
}
try {
- if (fileChannel == null) {
- getFileChannel();
+ if (logFile == null) {
+ //get the log file which contains readLSN
+ getLogFile();
fillLogReadBuffer();
- } else if (readLSN < fileBeginLSN || readLSN >= fileBeginLSN + fileChannel.size()) {
- fileChannel.close();
- getFileChannel();
+ } else if (readLSN < fileBeginLSN || readLSN >= fileBeginLSN + logFile.size()) {
+ //log is not in the current log file
+ logFile.close();
+ getLogFile();
fillLogReadBuffer();
} else if (readLSN < bufferBeginLSN || readLSN >= bufferBeginLSN + readBuffer.limit()) {
+ //log is not in the current read buffer
fillLogReadBuffer();
} else {
+ //log is either completely in the current read buffer or truncated
readBuffer.position((int) (readLSN - bufferBeginLSN));
}
} catch (IOException e) {
throw new ACIDException(e);
}
- boolean hasRemaining;
- if(readBuffer.position() == readBuffer.limit()){
- hasRemaining = refillLogReadBuffer();
- if(!hasRemaining){
- throw new ACIDException("LSN is out of bounds");
- }
- }
+
ByteBuffer readBuffer = this.readBuffer;
while (true) {
RecordReadStatus status = logRecord.readLogRecord(readBuffer);
@@ -256,14 +263,20 @@
continue;
}
case TRUNCATED: {
- throw new ACIDException("LSN is out of bounds");
+ if (!fillLogReadBuffer()) {
+ throw new IllegalStateException(
+ "Could not read LSN(" + LSN + ") from log file id " + logFile.getLogFileId());
+ }
+ //now read the complete log record
+ continue;
}
case BAD_CHKSUM: {
throw new ACIDException("Log record has incorrect checksum");
}
case OK:
break;
-
+ default:
+ throw new IllegalStateException("Unexpected log read status: " + status);
}
break;
}
@@ -272,16 +285,20 @@
return logRecord;
}
- private void getFileChannel() throws ACIDException {
- fileChannel = logMgr.getFileChannel(readLSN, false);
- fileBeginLSN = readLSN;
+ private void getLogFile() throws ACIDException {
+ try {
+ logFile = logMgr.getLogFile(readLSN);
+ fileBeginLSN = logFile.getFileBeginLSN();
+ } catch (IOException e) {
+ throw new ACIDException(e);
+ }
}
@Override
public void close() throws ACIDException {
try {
- if (fileChannel != null) {
- fileChannel.close();
+ if (logFile != null) {
+ logFile.close();
}
} catch (IOException e) {
throw new ACIDException(e);
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/RecoveryManager.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/RecoveryManager.java
index 3e5c6cf..afb926b 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/RecoveryManager.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/RecoveryManager.java
@@ -689,18 +689,34 @@
int abortedJobId = txnContext.getJobId().getId();
// Obtain the first/last log record LSNs written by the Job
long firstLSN = txnContext.getFirstLSN();
+ /**
+ * The effect of any log record with LSN below minFirstLSN has already been written to disk and
+ * will not be rolled back. Therefore, we will set the first LSN of the job to the maximum of
+ * minFirstLSN and the job's first LSN.
+ */
+ try {
+ long localMinFirstLSN = getLocalMinFirstLSN();
+ firstLSN = Math.max(firstLSN, localMinFirstLSN);
+ } catch (HyracksDataException e) {
+ throw new ACIDException(e);
+ }
long lastLSN = txnContext.getLastLSN();
-
- LOGGER.log(Level.INFO, "rollbacking transaction log records from " + firstLSN + " to " + lastLSN);
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("rollbacking transaction log records from " + firstLSN + " to " + lastLSN);
+ }
// check if the transaction actually wrote some logs.
- if (firstLSN == TransactionManagementConstants.LogManagerConstants.TERMINAL_LSN) {
- LOGGER.log(Level.INFO,
- "no need to roll back as there were no operations by the transaction " + txnContext.getJobId());
+ if (firstLSN == TransactionManagementConstants.LogManagerConstants.TERMINAL_LSN || firstLSN > lastLSN) {
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info(
+ "no need to roll back as there were no operations by the transaction " + txnContext.getJobId());
+ }
return;
}
// While reading log records from firstLsn to lastLsn, collect uncommitted txn's Lsns
- LOGGER.log(Level.INFO, "collecting loser transaction's LSNs from " + firstLSN + " to " + lastLSN);
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("collecting loser transaction's LSNs from " + firstLSN + " to " + lastLSN);
+ }
Map<TxnId, List<Long>> jobLoserEntity2LSNsMap = new HashMap<TxnId, List<Long>>();
TxnId tempKeyTxnId = new TxnId(-1, -1, -1, null, -1, false);
@@ -812,7 +828,6 @@
LOGGER.info("[RecoveryManager's rollback log count] update/entityCommit/undo:" + updateLogCount + "/"
+ entityCommitLogCount + "/" + undoCount);
}
-
} finally {
logReader.close();
}