fixed recovery bugs
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/context/ConstantMergePolicy.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/context/ConstantMergePolicy.java
index cf69bfe..dee652e 100644
--- a/asterix-common/src/main/java/edu/uci/ics/asterix/common/context/ConstantMergePolicy.java
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/context/ConstantMergePolicy.java
@@ -43,7 +43,7 @@
if (!ctx.isShuttingdown() && immutableComponents.size() >= threshold) {
ILSMIndexAccessor accessor = (ILSMIndexAccessor) index.createAccessor(NoOpOperationCallback.INSTANCE,
NoOpOperationCallback.INSTANCE);
- accessor.scheduleMerge(NoOpIOOperationCallback.INSTANCE);
+ accessor.scheduleMerge(index.getIOOperationCallback());
}
}
}
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/ioopcallbacks/AbstractLSMIOOperationCallback.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/ioopcallbacks/AbstractLSMIOOperationCallback.java
index 4ebd921..684068b 100644
--- a/asterix-common/src/main/java/edu/uci/ics/asterix/common/ioopcallbacks/AbstractLSMIOOperationCallback.java
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/ioopcallbacks/AbstractLSMIOOperationCallback.java
@@ -22,6 +22,7 @@
import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexMetaDataFrame;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMComponent;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback;
+import edu.uci.ics.hyracks.storage.am.lsm.common.impls.LSMOperationType;
import edu.uci.ics.hyracks.storage.common.buffercache.IBufferCache;
import edu.uci.ics.hyracks.storage.common.buffercache.ICachedPage;
import edu.uci.ics.hyracks.storage.common.file.BufferedFileHandle;
@@ -30,19 +31,35 @@
protected long firstLSN;
protected long lastLSN;
+ protected long[] immutableLastLSNs;
+ protected int readIndex;
+ protected int writeIndex;
public AbstractLSMIOOperationCallback() {
resetLSNs();
}
@Override
- public void beforeOperation() {
- // Do nothing.
+ public void setNumOfMutableComponents(int count) {
+ immutableLastLSNs = new long[count];
+ readIndex = 0;
+ writeIndex = 0;
}
@Override
- public void afterFinalize(ILSMComponent newComponent) {
- resetLSNs();
+ public void beforeOperation(LSMOperationType opType) {
+ if (opType == LSMOperationType.FLUSH) {
+ synchronized (this) {
+ immutableLastLSNs[writeIndex] = lastLSN;
+ writeIndex = (writeIndex + 1) % immutableLastLSNs.length;
+ resetLSNs();
+ }
+ }
+ }
+
+ @Override
+ public void afterFinalize(LSMOperationType opType, ILSMComponent newComponent) {
+ // Do nothing.
}
public abstract long getComponentLSN(List<ILSMComponent> oldComponents) throws HyracksDataException;
@@ -85,18 +102,18 @@
firstLSN = -1;
lastLSN = -1;
}
-
+
public void updateLastLSN(long lastLSN) {
if (firstLSN == -1) {
firstLSN = lastLSN;
}
this.lastLSN = Math.max(this.lastLSN, lastLSN);
}
-
+
public long getFirstLSN() {
return firstLSN;
}
-
+
public long getLastLSN() {
return lastLSN;
}
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/ioopcallbacks/LSMBTreeIOOperationCallback.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/ioopcallbacks/LSMBTreeIOOperationCallback.java
index 87c497c..8e9b44e 100644
--- a/asterix-common/src/main/java/edu/uci/ics/asterix/common/ioopcallbacks/LSMBTreeIOOperationCallback.java
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/ioopcallbacks/LSMBTreeIOOperationCallback.java
@@ -21,6 +21,7 @@
import edu.uci.ics.hyracks.storage.am.btree.impls.BTree;
import edu.uci.ics.hyracks.storage.am.lsm.btree.impls.LSMBTreeDiskComponent;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMComponent;
+import edu.uci.ics.hyracks.storage.am.lsm.common.impls.LSMOperationType;
public class LSMBTreeIOOperationCallback extends AbstractLSMIOOperationCallback {
@@ -29,7 +30,7 @@
}
@Override
- public void afterOperation(List<ILSMComponent> oldComponents, ILSMComponent newComponent)
+ public void afterOperation(LSMOperationType opType, List<ILSMComponent> oldComponents, ILSMComponent newComponent)
throws HyracksDataException {
if (newComponent != null) {
LSMBTreeDiskComponent btreeComponent = (LSMBTreeDiskComponent) newComponent;
@@ -41,7 +42,11 @@
public long getComponentLSN(List<ILSMComponent> diskComponents) throws HyracksDataException {
if (diskComponents == null) {
// Implies a flush IO operation.
- return lastLSN;
+ synchronized (this) {
+ long lsn = immutableLastLSNs[readIndex];
+ readIndex = (readIndex + 1) % immutableLastLSNs.length;
+ return lsn;
+ }
}
// Get max LSN from the diskComponents. Implies a merge IO operation or Recovery operation.
long maxLSN = -1;
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/ioopcallbacks/LSMInvertedIndexIOOperationCallback.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/ioopcallbacks/LSMInvertedIndexIOOperationCallback.java
index 60d4af1..5532f97 100644
--- a/asterix-common/src/main/java/edu/uci/ics/asterix/common/ioopcallbacks/LSMInvertedIndexIOOperationCallback.java
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/ioopcallbacks/LSMInvertedIndexIOOperationCallback.java
@@ -19,6 +19,7 @@
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMComponent;
+import edu.uci.ics.hyracks.storage.am.lsm.common.impls.LSMOperationType;
import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.impls.LSMInvertedIndexDiskComponent;
public class LSMInvertedIndexIOOperationCallback extends AbstractLSMIOOperationCallback {
@@ -28,9 +29,9 @@
}
@Override
- public void afterOperation(List<ILSMComponent> oldComponents, ILSMComponent newComponent)
+ public void afterOperation(LSMOperationType opType, List<ILSMComponent> oldComponents, ILSMComponent newComponent)
throws HyracksDataException {
- if (oldComponents != null && newComponent != null) {
+ if (newComponent != null) {
LSMInvertedIndexDiskComponent invIndexComponent = (LSMInvertedIndexDiskComponent) newComponent;
putLSNIntoMetadata(invIndexComponent.getDeletedKeysBTree(), oldComponents);
}
@@ -40,7 +41,11 @@
public long getComponentLSN(List<ILSMComponent> diskComponents) throws HyracksDataException {
if (diskComponents == null) {
// Implies a flush IO operation.
- return lastLSN;
+ synchronized (this) {
+ long lsn = immutableLastLSNs[readIndex];
+ readIndex = (readIndex + 1) % immutableLastLSNs.length;
+ return lsn;
+ }
}
// Get max LSN from the diskComponents. Implies a merge IO operation or Recovery operation.
long maxLSN = -1;
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/ioopcallbacks/LSMRTreeIOOperationCallback.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/ioopcallbacks/LSMRTreeIOOperationCallback.java
index 770d514..1497e17 100644
--- a/asterix-common/src/main/java/edu/uci/ics/asterix/common/ioopcallbacks/LSMRTreeIOOperationCallback.java
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/ioopcallbacks/LSMRTreeIOOperationCallback.java
@@ -19,6 +19,7 @@
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMComponent;
+import edu.uci.ics.hyracks.storage.am.lsm.common.impls.LSMOperationType;
import edu.uci.ics.hyracks.storage.am.lsm.rtree.impls.LSMRTreeDiskComponent;
public class LSMRTreeIOOperationCallback extends AbstractLSMIOOperationCallback {
@@ -28,9 +29,9 @@
}
@Override
- public void afterOperation(List<ILSMComponent> oldComponents, ILSMComponent newComponent)
+ public void afterOperation(LSMOperationType opType, List<ILSMComponent> oldComponents, ILSMComponent newComponent)
throws HyracksDataException {
- if (oldComponents != null && newComponent != null) {
+ if (newComponent != null) {
LSMRTreeDiskComponent rtreeComponent = (LSMRTreeDiskComponent) newComponent;
putLSNIntoMetadata(rtreeComponent.getRTree(), oldComponents);
putLSNIntoMetadata(rtreeComponent.getBTree(), oldComponents);
@@ -41,7 +42,11 @@
public long getComponentLSN(List<ILSMComponent> diskComponents) throws HyracksDataException {
if (diskComponents == null) {
// Implies a flush IO operation.
- return lastLSN;
+ synchronized (this) {
+ long lsn = immutableLastLSNs[readIndex];
+ readIndex = (readIndex + 1) % immutableLastLSNs.length;
+ return lsn;
+ }
}
// Get max LSN from the diskComponents. Implies a merge IO operation or Recovery operation.
long maxLSN = -1;
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogPage.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogPage.java
index 8921f35..a3f42a7 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogPage.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogPage.java
@@ -68,9 +68,8 @@
@Override
public void append(ILogRecord logRecord, long appendLSN) {
logRecord.writeLogRecord(appendBuffer);
- if (logRecord.getLogType() == LogType.UPDATE) {
- logRecord.getTxnCtx().setLastLSN(logRecord.getResourceId(), appendLSN);
- }
+ logRecord.getTxnCtx().setLastLSN(logRecord.getLogType() == LogType.UPDATE ? logRecord.getResourceId() : -1,
+ appendLSN);
synchronized (this) {
appendOffset += logRecord.getLogSize();
if (IS_DEBUG_MODE) {
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/recovery/RecoveryManager.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/recovery/RecoveryManager.java
index fcb9aa1..95ee767 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/recovery/RecoveryManager.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/recovery/RecoveryManager.java
@@ -143,6 +143,7 @@
int entityCommitLogCount = 0;
int jobCommitLogCount = 0;
int redoCount = 0;
+ int abortLogCount = 0;
int jobId = -1;
state = SystemState.RECOVERING;
@@ -210,7 +211,7 @@
entityCommitLogCount++;
break;
case LogType.ABORT:
- //ignore
+ abortLogCount++;
break;
default:
throw new ACIDException("Unsupported LogType: " + logRecord.getLogType());
@@ -287,6 +288,7 @@
* log record.
*******************************************************************/
if (localResource == null) {
+ logRecord = logReader.next();
continue;
}
/*******************************************************************/
@@ -338,8 +340,8 @@
if (LOGGER.isLoggable(Level.INFO)) {
LOGGER.info("[RecoveryMgr] recovery is completed.");
- LOGGER.info("[RecoveryMgr] Count: Update/EntityCommit/JobCommit/Redo = " + updateLogCount + "/"
- + entityCommitLogCount + "/" + jobCommitLogCount + "/" + redoCount);
+ LOGGER.info("[RecoveryMgr's recovery log count] update/entityCommit/jobCommit/abort/redo = " + updateLogCount + "/"
+ + entityCommitLogCount + "/" + jobCommitLogCount + "/" + abortLogCount + "/" + redoCount);
}
}
@@ -664,7 +666,7 @@
logReader.close();
if (LOGGER.isLoggable(Level.INFO)) {
LOGGER.info(" undone loser transaction's effect");
- LOGGER.info("UpdateLogCount/CommitLogCount/UndoCount:" + updateLogCount + "/" + entityCommitLogCount + "/"
+ LOGGER.info("[RecoveryManager's rollback log count] update/entityCommit/undo:" + updateLogCount + "/" + entityCommitLogCount + "/"
+ undoCount);
}
}
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionContext.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionContext.java
index 4468109..6fb91c8 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionContext.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionContext.java
@@ -128,9 +128,12 @@
synchronized (indexMap) {
firstLSN.compareAndSet(-1, LSN);
lastLSN.set(Math.max(lastLSN.get(), LSN));
- tempResourceIdForSetLSN.set(resourceId);
- AbstractLSMIOOperationCallback ioOpCallback = indexMap.get(tempResourceIdForSetLSN);
- ioOpCallback.updateLastLSN(LSN);
+ if (resourceId != -1) {
+ //Non-update log's resourceId is -1.
+ tempResourceIdForSetLSN.set(resourceId);
+ AbstractLSMIOOperationCallback ioOpCallback = indexMap.get(tempResourceIdForSetLSN);
+ ioOpCallback.updateLastLSN(LSN);
+ }
}
}