addressed code review comments and reverted the way that the logFlusher is stopped.
diff --git a/asterix-algebra/src/main/java/edu/uci/ics/asterix/algebra/operators/physical/CommitRuntime.java b/asterix-algebra/src/main/java/edu/uci/ics/asterix/algebra/operators/physical/CommitRuntime.java
index 76a2bee..a7c2fdb 100644
--- a/asterix-algebra/src/main/java/edu/uci/ics/asterix/algebra/operators/physical/CommitRuntime.java
+++ b/asterix-algebra/src/main/java/edu/uci/ics/asterix/algebra/operators/physical/CommitRuntime.java
@@ -68,7 +68,7 @@
public void open() throws HyracksDataException {
try {
transactionContext = transactionManager.getTransactionContext(jobId);
- transactionContext.isWriteTxn(isWriteTransaction);
+ transactionContext.setWriteTxn(isWriteTransaction);
} catch (ACIDException e) {
throw new HyracksDataException(e);
}
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/ITransactionContext.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/ITransactionContext.java
index 531b10f..dc33e69 100644
--- a/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/ITransactionContext.java
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/ITransactionContext.java
@@ -23,7 +23,7 @@
public JobId getJobId();
- public void isTimeout(boolean isTimeout);
+ public void setTimeout(boolean isTimeout);
public boolean isTimeout();
@@ -39,11 +39,11 @@
public boolean isWriteTxn();
- public void isWriteTxn(boolean isWriterTxn);
+ public void setWriteTxn(boolean isWriterTxn);
public String prettyPrint();
- public void isMetadataTransaction(boolean isMetadataTxn);
+ public void setMetadataTransaction(boolean isMetadataTxn);
public boolean isMetadataTransaction();
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/ITransactionManager.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/ITransactionManager.java
index 5404ceb..77e960b 100644
--- a/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/ITransactionManager.java
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/ITransactionManager.java
@@ -29,13 +29,6 @@
* transaction has committed. ABORTED: The transaction has aborted.
* TIMED_OUT: The transaction has timed out waiting to acquire a lock.
*/
-// public enum TransactionState {
-// ACTIVE,
-// COMMITTED,
-// ABORTED,
-// TIMED_OUT,
-// };
-
public static final int ACTIVE = 0;
public static final int COMMITTED = 1;
public static final int ABORTED = 2;
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataNode.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataNode.java
index f48f267..8765aae 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataNode.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataNode.java
@@ -110,7 +110,7 @@
@Override
public void beginTransaction(JobId transactionId) throws ACIDException, RemoteException {
ITransactionContext txnCtx = transactionSubsystem.getTransactionManager().beginTransaction(transactionId);
- txnCtx.isMetadataTransaction(true);
+ txnCtx.setMetadataTransaction(true);
}
@Override
@@ -274,7 +274,7 @@
ILSMIndexAccessor indexAccessor = lsmIndex.createAccessor(modCallback, NoOpOperationCallback.INSTANCE);
ITransactionContext txnCtx = transactionSubsystem.getTransactionManager().getTransactionContext(jobId);
- txnCtx.isWriteTxn(true);
+ txnCtx.setWriteTxn(true);
txnCtx.registerIndexAndCallback(resourceID, lsmIndex, (AbstractOperationCallback) modCallback,
metadataIndex.isPrimaryIndex());
@@ -582,7 +582,7 @@
ILSMIndexAccessor indexAccessor = lsmIndex.createAccessor(modCallback, NoOpOperationCallback.INSTANCE);
ITransactionContext txnCtx = transactionSubsystem.getTransactionManager().getTransactionContext(jobId);
- txnCtx.isWriteTxn(true);
+ txnCtx.setWriteTxn(true);
txnCtx.registerIndexAndCallback(resourceID, lsmIndex, (AbstractOperationCallback) modCallback,
metadataIndex.isPrimaryIndex());
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/job/listener/JobEventListenerFactory.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/job/listener/JobEventListenerFactory.java
index 2782e63..b44755c 100644
--- a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/job/listener/JobEventListenerFactory.java
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/job/listener/JobEventListenerFactory.java
@@ -50,7 +50,7 @@
ITransactionManager txnManager = ((IAsterixAppRuntimeContext) jobletContext.getApplicationContext()
.getApplicationObject()).getTransactionSubsystem().getTransactionManager();
ITransactionContext txnContext = txnManager.getTransactionContext(jobId);
- txnContext.isWriteTxn(transactionalWrite);
+ txnContext.setWriteTxn(transactionalWrite);
txnManager.completedTransaction(txnContext, new DatasetId(-1), -1,
!(jobStatus == JobStatus.FAILURE));
} catch (ACIDException e) {
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/LockManager.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/LockManager.java
index 759b45b..e0baa3f 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/LockManager.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/LockManager.java
@@ -972,7 +972,7 @@
jobHT.remove(jobId);
if (existWaiter) {
- txnContext.isTimeout(true);
+ txnContext.setTimeout(true);
txnContext.setTxnState(ITransactionManager.ABORTED);
}
@@ -1880,7 +1880,7 @@
}
private void requestAbort(ITransactionContext txnContext) throws ACIDException {
- txnContext.isTimeout(true);
+ txnContext.setTimeout(true);
throw new ACIDException("Transaction " + txnContext.getJobId()
+ " should abort (requested by the Lock Manager)");
}
@@ -2249,13 +2249,13 @@
unlock(tempDatasetIdObj, logRecord.getPKHashValue(), txnCtx);
txnCtx.notifyOptracker(false);
} else if (logRecord.getLogType() == LogType.JOB_COMMIT) {
- ((LogPage) logPage).notifyJobCommitter();
tempJobIdObj.setId(logRecord.getJobId());
txnCtx = txnSubsystem.getTransactionManager().getTransactionContext(tempJobIdObj);
if (txnCtx == null) {
throw new IllegalStateException("TransactionContext[" + tempJobIdObj + "] doesn't exist.");
}
txnCtx.notifyOptracker(true);
+ ((LogPage) logPage).notifyJobCommitter();
}
logRecord = logPageReader.next();
}
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogManager.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogManager.java
index 7e83140..7fce48d 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogManager.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogManager.java
@@ -163,7 +163,7 @@
return txnSubsystem;
}
- public synchronized long getAppendLSN() {
+ public long getAppendLSN() {
return appendLSN;
}
@@ -260,7 +260,7 @@
return logFileSize * fileId + offset;
}
- public synchronized void renewLogFiles() {
+ public void renewLogFiles() {
terminateLogFlusher();
deleteAllLogFiles();
initializeLogManager();
@@ -268,6 +268,11 @@
private void terminateLogFlusher() {
logFlusher.terminate();
+ try {
+ logFlusher.join();
+ } catch (InterruptedException e) {
+ throw new IllegalStateException(e);
+ }
}
private void deleteAllLogFiles() {
@@ -301,13 +306,7 @@
Collections.sort(logFileIds, new Comparator<Long>() {
@Override
public int compare(Long arg0, Long arg1) {
- if (arg0 > arg1) {
- return 1;
- } else if (arg0 == arg1) {
- return 0;
- } else {
- return -1;
- }
+ return arg0.compareTo(arg1);
}
});
}
@@ -386,20 +385,7 @@
flushPage.notify();
}
}
- POISON_PILL.isStop(false);
flushQ.offer(POISON_PILL);
- synchronized (POISON_PILL) {
- if(!POISON_PILL.isStop()) {
- try {
- POISON_PILL.wait();
- } catch (InterruptedException e) {
- throw new IllegalStateException("Unexpected interrupted exception", e);
- }
- if (!POISON_PILL.isStop()) {
- throw new IllegalStateException("LogFlusher thread is igonoring termination request.");
- }
- }
- }
}
@Override
@@ -409,15 +395,11 @@
try {
flushPage = flushQ.take();
if (flushPage == POISON_PILL) {
- synchronized (POISON_PILL) {
- POISON_PILL.isStop(true);
- POISON_PILL.notify();
- break;
- }
+ break;
}
flushPage.flush();
} catch (InterruptedException e) {
- throw new IllegalStateException("Unexpected interrupted exception", e);
+ //ignore
}
emptyQ.offer(flushPage);
}
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogReader.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogReader.java
index 80e6f4a..173088c 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogReader.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogReader.java
@@ -39,6 +39,11 @@
private long bufferBeginLSN;
private long fileBeginLSN;
private FileChannel fileChannel;
+
+ private enum ReturnState {
+ FLUSH,
+ EOF
+ };
public LogReader(LogManager logMgr, long logFileSize, int logPageSize, MutableLong flushLSN, boolean isRecoveryMode) {
this.logMgr = logMgr;
@@ -53,42 +58,18 @@
@Override
public void initializeScan(long beginLSN) throws ACIDException {
readLSN = beginLSN;
- synchronized (flushLSN) {
- while (readLSN >= flushLSN.get()) {
- if (isRecoveryMode) {
- return;
- }
- try {
- if (IS_DEBUG_MODE) {
- LOGGER.info("initializeScan()| flushLSN: " + flushLSN.get() + ", readLSN: " + readLSN);
- }
- flushLSN.wait();
- } catch (InterruptedException e) {
- //ignore.
- }
- }
+ if (waitForFlushOrReturnIfEOF() == ReturnState.EOF) {
+ return;
}
getFileChannel();
readPage();
}
-
+
//for scanning
@Override
public ILogRecord next() throws ACIDException {
- synchronized (flushLSN) {
- while (readLSN >= flushLSN.get()) {
- if (isRecoveryMode) {
- return null;
- }
- try {
- if (IS_DEBUG_MODE) {
- LOGGER.info("next()| flushLSN: " + flushLSN.get() + ", readLSN: " + readLSN);
- }
- flushLSN.wait();
- } catch (InterruptedException e) {
- //ignore
- }
- }
+ if (waitForFlushOrReturnIfEOF() == ReturnState.EOF) {
+ return null;
}
if (readBuffer.position() == readBuffer.limit() || !logRecord.readLogRecord(readBuffer)) {
readNextPage();
@@ -100,6 +81,26 @@
readLSN += logRecord.getLogSize();
return logRecord;
}
+
+ private ReturnState waitForFlushOrReturnIfEOF() {
+ synchronized (flushLSN) {
+ while (readLSN >= flushLSN.get()) {
+ if (isRecoveryMode) {
+ return ReturnState.EOF;
+ }
+ try {
+ if (IS_DEBUG_MODE) {
+ LOGGER.info("waitForFlushOrReturnIfEOF()| flushLSN: " + flushLSN.get() + ", readLSN: "
+ + readLSN);
+ }
+ flushLSN.wait();
+ } catch (InterruptedException e) {
+ //ignore
+ }
+ }
+ return ReturnState.FLUSH;
+ }
+ }
private void readNextPage() throws ACIDException {
try {
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogRecord.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogRecord.java
index 6f571ab..380e524 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogRecord.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogRecord.java
@@ -207,20 +207,8 @@
@Override
public String getLogRecordForDisplay() {
StringBuilder builder = new StringBuilder();
- String logTypeDisplay = null;
- switch (logType) {
- case LogType.JOB_COMMIT:
- logTypeDisplay = "JOB_COMMIT";
- break;
- case LogType.UPDATE:
- logTypeDisplay = "UPDATE";
- break;
- case LogType.ENTITY_COMMIT:
- logTypeDisplay = "ENTITY_COMMIT";
- break;
- }
builder.append(" LSN : ").append(LSN);
- builder.append(" LogType : ").append(logTypeDisplay);
+ builder.append(" LogType : ").append(LogType.toString(logType));
builder.append(" JobId : ").append(jobId);
builder.append(" DatasetId : ").append(datasetId);
builder.append(" PKHashValue : ").append(PKHashValue);
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogType.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogType.java
index 3fc710a..823c8d3 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogType.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogType.java
@@ -19,5 +19,22 @@
public static final byte UPDATE = 0;
public static final byte JOB_COMMIT = 1;
public static final byte ENTITY_COMMIT = 2;
+ private static final String STRING_UPDATE = "UPDATE";
+ private static final String STRING_JOB_COMMIT = "JOB_COMMIT";
+ private static final String STRING_ENTITY_COMMIT = "ENTITY_COMMIT";
+ private static final String STRING_INVALID_LOG_TYPE = "INVALID_LOG_TYPE";
+
+ public static String toString(byte logType) {
+ switch (logType) {
+ case LogType.UPDATE:
+ return STRING_UPDATE;
+ case LogType.JOB_COMMIT:
+ return STRING_JOB_COMMIT;
+ case LogType.ENTITY_COMMIT:
+ return STRING_ENTITY_COMMIT;
+ default:
+ return STRING_INVALID_LOG_TYPE;
+ }
+ }
}
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/recovery/RecoveryManager.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/recovery/RecoveryManager.java
index af14a1d..81a73d5 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/recovery/RecoveryManager.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/recovery/RecoveryManager.java
@@ -673,9 +673,9 @@
private void undo(ILogRecord logRecord) {
try {
- IIndex index = (IIndex) txnSubsystem.getAsterixAppRuntimeContextProvider().getIndexLifecycleManager()
+ ILSMIndex index = (ILSMIndex) txnSubsystem.getAsterixAppRuntimeContextProvider().getIndexLifecycleManager()
.getIndex(logRecord.getResourceId());
- ILSMIndexAccessor indexAccessor = (ILSMIndexAccessor) index.createAccessor(NoOpOperationCallback.INSTANCE,
+ ILSMIndexAccessor indexAccessor = index.createAccessor(NoOpOperationCallback.INSTANCE,
NoOpOperationCallback.INSTANCE);
if (logRecord.getResourceType() == ResourceType.LSM_BTREE) {
if (logRecord.getOldOp() != IndexOperation.NOOP.ordinal()) {
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionContext.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionContext.java
index 940ba23..678956b 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionContext.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionContext.java
@@ -149,7 +149,7 @@
}
}
- public void isWriteTxn(boolean isWriteTxn) {
+ public void setWriteTxn(boolean isWriteTxn) {
this.isWriteTxn.set(isWriteTxn);
}
@@ -171,7 +171,7 @@
return jobId;
}
- public void isTimeout(boolean isTimeout) {
+ public void setTimeout(boolean isTimeout) {
this.isTimeout = isTimeout;
}
@@ -198,7 +198,7 @@
}
@Override
- public void isMetadataTransaction(boolean isMetadataTxn) {
+ public void setMetadataTransaction(boolean isMetadataTxn) {
this.isMetadataTxn = isMetadataTxn;
}
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionManager.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionManager.java
index 82d980c..01bce83 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionManager.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionManager.java
@@ -60,8 +60,7 @@
if (LOGGER.isLoggable(Level.SEVERE)) {
LOGGER.severe(msg);
}
- ae.printStackTrace();
- throw new Error(msg);
+ throw new Error(msg, ae);
} finally {
txnSubsystem.getLockManager().releaseLocks(txnCtx);
transactionContextRepository.remove(txnCtx.getJobId());