Rollback is implemented and all existing tests pass.(The existing failure test cases work, but more test cases need to be added).
git-svn-id: https://asterixdb.googlecode.com/svn/branches/asterix_lsm_stabilization@979 eaa15691-b419-025a-1212-ee371bd00084
diff --git a/asterix-algebra/src/main/java/edu/uci/ics/asterix/algebra/operators/physical/CommitPOperator.java b/asterix-algebra/src/main/java/edu/uci/ics/asterix/algebra/operators/physical/CommitPOperator.java
index a46e52a..37983c9 100644
--- a/asterix-algebra/src/main/java/edu/uci/ics/asterix/algebra/operators/physical/CommitPOperator.java
+++ b/asterix-algebra/src/main/java/edu/uci/ics/asterix/algebra/operators/physical/CommitPOperator.java
@@ -42,11 +42,14 @@
private final List<LogicalVariable> primaryKeyLogicalVars;
private final JobId jobId;
private final int datasetId;
+ private final boolean isWriteTransaction;
- public CommitPOperator(JobId jobId, int datasetId, List<LogicalVariable> primaryKeyLogicalVars) {
+ public CommitPOperator(JobId jobId, int datasetId, List<LogicalVariable> primaryKeyLogicalVars,
+ boolean isWriteTransaction) {
this.jobId = jobId;
this.datasetId = datasetId;
this.primaryKeyLogicalVars = primaryKeyLogicalVars;
+ this.isWriteTransaction = isWriteTransaction;
}
@Override
@@ -85,7 +88,7 @@
primaryKeyLogicalVars, varTypeEnv, context);
CommitRuntimeFactory runtime = new CommitRuntimeFactory(jobId, datasetId, primaryKeyFields,
- binaryHashFunctionFactories);
+ binaryHashFunctionFactories, isWriteTransaction);
builder.contributeMicroOperator(op, runtime, recDesc);
ILogicalOperator src = op.getInputs().get(0).getValue();
builder.contributeGraphEdge(src, 0, op, 0);
diff --git a/asterix-algebra/src/main/java/edu/uci/ics/asterix/algebra/operators/physical/CommitRuntime.java b/asterix-algebra/src/main/java/edu/uci/ics/asterix/algebra/operators/physical/CommitRuntime.java
index 9f04a0e..98dbb5b 100644
--- a/asterix-algebra/src/main/java/edu/uci/ics/asterix/algebra/operators/physical/CommitRuntime.java
+++ b/asterix-algebra/src/main/java/edu/uci/ics/asterix/algebra/operators/physical/CommitRuntime.java
@@ -27,6 +27,7 @@
import edu.uci.ics.asterix.transaction.management.service.transaction.ITransactionManager;
import edu.uci.ics.asterix.transaction.management.service.transaction.JobId;
import edu.uci.ics.asterix.transaction.management.service.transaction.TransactionContext;
+import edu.uci.ics.asterix.transaction.management.service.transaction.TransactionContext.TransactionType;
import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalExpression;
import edu.uci.ics.hyracks.algebricks.runtime.base.IPushRuntime;
import edu.uci.ics.hyracks.api.comm.IFrameWriter;
@@ -46,6 +47,7 @@
private final DatasetId datasetId;
private final int[] primaryKeyFields;
private final IBinaryHashFunction[] primaryKeyHashFunctions;
+ private final boolean isWriteTransaction;
private TransactionContext transactionContext;
private RecordDescriptor inputRecordDesc;
@@ -53,7 +55,7 @@
private FrameTupleReference frameTupleReference;
public CommitRuntime(IHyracksTaskContext ctx, JobId jobId, int datasetId, int[] primaryKeyFields,
- IBinaryHashFunctionFactory[] binaryHashFunctionFactories) {
+ IBinaryHashFunctionFactory[] binaryHashFunctionFactories, boolean isWriteTransaction) {
this.hyracksTaskCtx = ctx;
AsterixAppRuntimeContext runtimeCtx = (AsterixAppRuntimeContext) ctx.getJobletContext().getApplicationContext()
.getApplicationObject();
@@ -66,12 +68,15 @@
this.primaryKeyHashFunctions[i] = binaryHashFunctionFactories[i].createBinaryHashFunction();
}
this.frameTupleReference = new FrameTupleReference();
+ this.isWriteTransaction = isWriteTransaction;
}
@Override
public void open() throws HyracksDataException {
try {
transactionContext = transactionManager.getTransactionContext(jobId);
+ transactionContext.setTransactionType(isWriteTransaction ? TransactionType.READ_WRITE
+ : TransactionType.READ);
} catch (ACIDException e) {
throw new HyracksDataException(e);
}
diff --git a/asterix-algebra/src/main/java/edu/uci/ics/asterix/algebra/operators/physical/CommitRuntimeFactory.java b/asterix-algebra/src/main/java/edu/uci/ics/asterix/algebra/operators/physical/CommitRuntimeFactory.java
index df7da97..262fa8f 100644
--- a/asterix-algebra/src/main/java/edu/uci/ics/asterix/algebra/operators/physical/CommitRuntimeFactory.java
+++ b/asterix-algebra/src/main/java/edu/uci/ics/asterix/algebra/operators/physical/CommitRuntimeFactory.java
@@ -35,13 +35,15 @@
private final int datasetId;
private final int[] primaryKeyFields;
IBinaryHashFunctionFactory[] binaryHashFunctionFactories;
+ private final boolean isWriteTransaction;
public CommitRuntimeFactory(JobId jobId, int datasetId, int[] primaryKeyFields,
- IBinaryHashFunctionFactory[] binaryHashFunctionFactories) {
+ IBinaryHashFunctionFactory[] binaryHashFunctionFactories, boolean isWriteTransaction) {
this.jobId = jobId;
this.datasetId = datasetId;
this.primaryKeyFields = primaryKeyFields;
this.binaryHashFunctionFactories = binaryHashFunctionFactories;
+ this.isWriteTransaction = isWriteTransaction;
}
@Override
@@ -51,6 +53,6 @@
@Override
public IPushRuntime createPushRuntime(IHyracksTaskContext ctx) throws AlgebricksException {
- return new CommitRuntime(ctx, jobId, datasetId, primaryKeyFields, binaryHashFunctionFactories);
+ return new CommitRuntime(ctx, jobId, datasetId, primaryKeyFields, binaryHashFunctionFactories, isWriteTransaction);
}
}
diff --git a/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/ReplaceSinkOpWithCommitOpRule.java b/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/ReplaceSinkOpWithCommitOpRule.java
index 02b7d1b..026725c 100644
--- a/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/ReplaceSinkOpWithCommitOpRule.java
+++ b/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/ReplaceSinkOpWithCommitOpRule.java
@@ -76,7 +76,7 @@
//create the logical and physical operator
CommitOperator commitOperator = new CommitOperator();
- CommitPOperator commitPOperator = new CommitPOperator(jobId, datasetId, primaryKeyLogicalVars);
+ CommitPOperator commitPOperator = new CommitPOperator(jobId, datasetId, primaryKeyLogicalVars, mp.isWriteTransaction());
commitOperator.setPhysicalOperator(commitPOperator);
//create ExtensionOperator and put the commitOperator in it.
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/api/common/APIFramework.java b/asterix-app/src/main/java/edu/uci/ics/asterix/api/common/APIFramework.java
index 2649e2c..ce23c17 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/api/common/APIFramework.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/api/common/APIFramework.java
@@ -203,6 +203,7 @@
edu.uci.ics.asterix.transaction.management.service.transaction.JobId asterixJobId = JobIdFactory
.generateJobId();
+ queryMetadataProvider.setJobId(asterixJobId);
AqlExpressionToPlanTranslator t = new AqlExpressionToPlanTranslator(queryMetadataProvider, varCounter,
outputDatasetName, statement);
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/aql/translator/AqlTranslator.java b/asterix-app/src/main/java/edu/uci/ics/asterix/aql/translator/AqlTranslator.java
index b363d1c..591d8a8 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/aql/translator/AqlTranslator.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/aql/translator/AqlTranslator.java
@@ -83,7 +83,6 @@
import edu.uci.ics.asterix.om.types.TypeSignature;
import edu.uci.ics.asterix.transaction.management.exception.ACIDException;
import edu.uci.ics.asterix.transaction.management.service.transaction.DatasetIdFactory;
-import edu.uci.ics.asterix.transaction.management.service.transaction.JobIdFactory;
import edu.uci.ics.asterix.translator.AbstractAqlTranslator;
import edu.uci.ics.asterix.translator.CompiledStatements.CompiledBeginFeedStatement;
import edu.uci.ics.asterix.translator.CompiledStatements.CompiledControlFeedStatement;
@@ -690,9 +689,6 @@
// Query Compilation (happens under the same ongoing metadata
// transaction)
sessionConfig.setGenerateJobSpec(true);
- if (metadataProvider.isWriteTransaction()) {
- metadataProvider.setJobId(JobIdFactory.generateJobId());
- }
JobSpecification spec = APIFramework.compileQuery(declaredFunctions, metadataProvider, query,
reWrittenQuery.second, stmt == null ? null : stmt.getDatasetName(), sessionConfig, out, pdf, stmt);
sessionConfig.setGenerateJobSpec(false);
diff --git a/asterix-app/src/test/java/edu/uci/ics/asterix/test/runtime/ExecutionTest.java b/asterix-app/src/test/java/edu/uci/ics/asterix/test/runtime/ExecutionTest.java
index 3eb9fcf..111f76b 100644
--- a/asterix-app/src/test/java/edu/uci/ics/asterix/test/runtime/ExecutionTest.java
+++ b/asterix-app/src/test/java/edu/uci/ics/asterix/test/runtime/ExecutionTest.java
@@ -98,8 +98,7 @@
if (!testFile.getAbsolutePath().contains("runtimets/queries/failure/")) {
continue;
}
- System.out.println(testFile.getAbsolutePath());
- ***********************************************************/
+ ************************************************************/
File expectedResultFile = tcCtx.getExpectedResultFile(cUnit);
File actualFile = new File(PATH_ACTUAL + File.separator
diff --git a/asterix-app/src/test/resources/runtimets/queries/failure/delete-rtree.aql b/asterix-app/src/test/resources/runtimets/queries/failure/delete-rtree.aql
index 465d94f..6fc11e5 100644
--- a/asterix-app/src/test/resources/runtimets/queries/failure/delete-rtree.aql
+++ b/asterix-app/src/test/resources/runtimets/queries/failure/delete-rtree.aql
@@ -18,7 +18,7 @@
using "edu.uci.ics.asterix.external.dataset.adapter.NCFileSystemAdapter"
(("path"="nc1://data/twitter/smalltweets.txt"),("format"="adm")) pre-sorted;
-delete $l from dataset MyData where $l.id>=50 die after 1500;
+delete $l from dataset MyData where $l.id>=50 die after 1000;
write output to nc1:"rttest/failure_delete-rtree.adm";
diff --git a/asterix-app/src/test/resources/runtimets/queries/failure/delete.aql b/asterix-app/src/test/resources/runtimets/queries/failure/delete.aql
index cd4a922..3da88f8 100644
--- a/asterix-app/src/test/resources/runtimets/queries/failure/delete.aql
+++ b/asterix-app/src/test/resources/runtimets/queries/failure/delete.aql
@@ -29,7 +29,7 @@
using "edu.uci.ics.asterix.external.dataset.adapter.NCFileSystemAdapter"
(("path"="nc1://data/tpch0.001/lineitem.tbl"),("format"="delimited-text"),("delimiter"="|")) pre-sorted;
-delete $l from dataset LineItem where $l.l_orderkey>=10 die after 1500;
+delete $l from dataset LineItem where $l.l_orderkey>=10 die after 1000;
write output to nc1:"rttest/failure_delete.adm";
for $c in dataset('LineItem')
diff --git a/asterix-app/src/test/resources/runtimets/testsuite.xml b/asterix-app/src/test/resources/runtimets/testsuite.xml
index a428042..fa5290c 100644
--- a/asterix-app/src/test/resources/runtimets/testsuite.xml
+++ b/asterix-app/src/test/resources/runtimets/testsuite.xml
@@ -918,7 +918,7 @@
<output-file compare="Text">delete-rtree.adm</output-file>
</compilation-unit>
</test-case>
-<!-- <test-case FilePath="failure">
+ <test-case FilePath="failure">
<compilation-unit name="delete">
<output-file compare="Text">delete.adm</output-file>
<expected-error>edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException</expected-error>
@@ -926,7 +926,7 @@
<compilation-unit name="verify_delete">
<output-file compare="Text">delete.adm</output-file>
</compilation-unit>
- </test-case> -->
+ </test-case>
<test-case FilePath="failure">
<compilation-unit name="insert-rtree">
<output-file compare="Text">insert-rtree.adm</output-file>
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/resource/TransactionalResourceRepository.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/resource/TransactionalResourceRepository.java
index fbaa12f..4532390 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/resource/TransactionalResourceRepository.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/resource/TransactionalResourceRepository.java
@@ -42,6 +42,8 @@
public void registerTransactionalResource(long resourceId, Object resource) {
synchronized (resourceRepository) {
mutableResourceId.setId(resourceId);
+// MutableResourceId newMutableResourceId = new MutableResourceId(resourceId);
+// resourceRepository.put(newMutableResourceId, resource);
if (resourceRepository.get(resourceId) == null) {
MutableResourceId newMutableResourceId = new MutableResourceId(resourceId);
resourceRepository.put(newMutableResourceId, resource);
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/LockManager.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/LockManager.java
index 050f1a1..6df3928 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/LockManager.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/LockManager.java
@@ -515,7 +515,7 @@
"Unsupported unlock request: dataset-granule unlock is not supported");
}
}
-
+
latchLockTable();
validateJob(txnContext);
@@ -527,37 +527,24 @@
//find the resource to be unlocked
dLockInfo = datasetResourceHT.get(datasetId);
jobInfo = jobHT.get(jobId);
- if (IS_DEBUG_MODE) {
- if (dLockInfo == null || jobInfo == null) {
- throw new IllegalStateException("Invalid unlock request: Corresponding lock info doesn't exist.");
- }
- }
-
- //////////////////////////////////////////////////////////////
- //TODO
- //Check whether the dLockInfo or jobInfo could be null
- //even when the callback is called properly
if (dLockInfo == null || jobInfo == null) {
unlatchLockTable();
- return;
+ throw new IllegalStateException("Invalid unlock request: Corresponding lock info doesn't exist.");
}
- /////////////////////////////////////////////////////////////
-
+
eLockInfo = dLockInfo.getEntityResourceHT().get(entityHashValue);
-
- if (IS_DEBUG_MODE) {
- if (eLockInfo == -1) {
- throw new IllegalStateException("Invalid unlock request: Corresponding lock info doesn't exist.");
- }
+
+ if (eLockInfo == -1) {
+ unlatchLockTable();
+ throw new IllegalStateException("Invalid unlock request: Corresponding lock info doesn't exist.");
}
//find the corresponding entityInfo
entityInfo = entityLockInfoManager.findEntityInfoFromHolderList(eLockInfo, jobId.getId(), entityHashValue);
- if (IS_DEBUG_MODE) {
- if (entityInfo == -1) {
- throw new IllegalStateException("Invalid unlock request[" + jobId.getId() + "," + datasetId.getId()
- + "," + entityHashValue + "]: Corresponding lock info doesn't exist.");
- }
+ if (entityInfo == -1) {
+ unlatchLockTable();
+ throw new IllegalStateException("Invalid unlock request[" + jobId.getId() + "," + datasetId.getId() + ","
+ + entityHashValue + "]: Corresponding lock info doesn't exist.");
}
//decrease the corresponding count of dLockInfo/eLockInfo/entityInfo
@@ -579,8 +566,16 @@
//This commit log is written here in order to avoid increasing the memory space for managing transactionIds
if (commitFlag) {
if (txnContext.getTransactionType().equals(TransactionContext.TransactionType.READ_WRITE)) {
- txnSubsystem.getLogManager().log(LogType.COMMIT, txnContext, datasetId.getId(), entityHashValue,
- -1, (byte) 0, 0, null, null, logicalLogLocator);
+ try {
+ txnSubsystem.getLogManager().log(LogType.COMMIT, txnContext, datasetId.getId(),
+ entityHashValue, -1, (byte) 0, 0, null, null, logicalLogLocator);
+ } catch (ACIDException e) {
+ try {
+ requestAbort(txnContext);
+ } finally {
+ unlatchLockTable();
+ }
+ }
}
txnContext.setLastLSNToIndexes(logicalLogLocator.getLsn());
@@ -660,7 +655,7 @@
trackLockRequest("Requested", RequestType.RELEASE_LOCKS, new DatasetId(0), 0, (byte) 0, txnContext,
dLockInfo, eLockInfo);
}
-
+
JobInfo jobInfo = jobHT.get(jobId);
if (jobInfo == null) {
unlatchLockTable();
@@ -971,7 +966,7 @@
StringBuilder s = new StringBuilder();
LockRequest request = new LockRequest(Thread.currentThread().getName(), requestType, datasetIdObj,
entityHashValue, lockMode, txnContext);
- s.append(Thread.currentThread().getId()+":");
+ s.append(Thread.currentThread().getId() + ":");
s.append(msg);
if (msg.equals("Granted")) {
if (dLockInfo != null) {
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/IndexResourceManager.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/IndexResourceManager.java
index cb6a67a..a70fbe2 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/IndexResourceManager.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/IndexResourceManager.java
@@ -43,6 +43,8 @@
long resourceId = logRecordHelper.getResourceId(logLocator);
int offset = logRecordHelper.getLogContentBeginPos(logLocator);
+ //TODO
+ //replace TransactionResourceRepository with IndexLifeCycleManager
// look up the repository to obtain the resource object
IIndex index = (IIndex) provider.getTransactionalResourceRepository().getTransactionalResource(resourceId);
@@ -95,9 +97,6 @@
indexAccessor.physicalDelete(newTuple);
}
} else {
- //For LSMRtree and LSMInvertedIndex
- //delete --> insert
- //insert --> delete
if (newOperation == (byte) IndexOperation.DELETE.ordinal()) {
indexAccessor.insert(newTuple);
} else {
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogCursor.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogCursor.java
index f1e3877..61c47f4 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogCursor.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogCursor.java
@@ -25,12 +25,12 @@
private final ILogFilter logFilter;
private IBuffer readOnlyBuffer;
private LogicalLogLocator logicalLogLocator = null;
- private int bufferIndex = 0;
+ private long bufferIndex = 0;
private boolean firstNext = true;
private boolean readMemory = false;
private long readLSN = 0;
private boolean needReloadBuffer = true;
-
+
/**
* @param logFilter
*/
@@ -56,7 +56,8 @@
String filePath = LogUtil.getLogFilePath(logManager.getLogManagerProperties(), fileId);
File file = new File(filePath);
if (file.exists()) {
- return FileUtil.getFileBasedBuffer(filePath, lsn, size);
+ return FileUtil.getFileBasedBuffer(filePath, lsn
+ % logManager.getLogManagerProperties().getLogPartitionSize(), size);
} else {
return null;
}
@@ -75,6 +76,8 @@
@Override
public boolean next(LogicalLogLocator currentLogLocator) throws IOException, ACIDException {
+ //TODO
+ //Test the correctness when multiple log files are created
int integerRead = -1;
boolean logRecordBeginPosFound = false;
long bytesSkipped = 0;
@@ -116,11 +119,19 @@
// bytes without finding a log record, it
// indicates an absence of logs any further.
}
+
+ if (logicalLogLocator.getLsn() > logManager.getLastFlushedLsn().get()) {
+ return next(currentLogLocator); //should read from memory if there is any further log
+ }
}
if (!logRecordBeginPosFound) {
// need to reload the buffer
- long lsnpos = (++bufferIndex * logManager.getLogManagerProperties().getLogBufferSize());
+ // TODO
+ // reduce IO by reading more pages(equal to logBufferSize) at a time.
+ long lsnpos = ((logicalLogLocator.getLsn() / logManager.getLogManagerProperties().getLogPageSize()) + 1)
+ * logManager.getLogManagerProperties().getLogPageSize();
+
readOnlyBuffer = getReadOnlyBuffer(lsnpos, logManager.getLogManagerProperties().getLogBufferSize());
if (readOnlyBuffer != null) {
logicalLogLocator.setBuffer(readOnlyBuffer);
@@ -163,7 +174,7 @@
private boolean readFromMemory(LogicalLogLocator currentLogLocator) throws ACIDException, IOException {
byte[] logRecord = null;
long lsn = logicalLogLocator.getLsn();
-
+
//set the needReloadBuffer to true
needReloadBuffer = true;
@@ -209,7 +220,7 @@
// need to read the next log page
readOnlyBuffer = null;
logicalLogLocator.setBuffer(null);
- logicalLogLocator.setLsn(lsn/logPageSize+1);
+ logicalLogLocator.setLsn(lsn / logPageSize + 1);
logicalLogLocator.setMemoryOffset(0);
return next(currentLogLocator);
}
@@ -227,7 +238,7 @@
readOnlyBuffer = memBuffer;
logicalLogLocator.setBuffer(readOnlyBuffer);
logicalLogLocator.setMemoryOffset(0);
-
+
if (logManager.getLogRecordHelper().validateLogRecord(logicalLogLocator)) {
if (currentLogLocator == null) {
currentLogLocator = new LogicalLogLocator(0, readOnlyBuffer, -1, logManager);
@@ -249,7 +260,7 @@
return next(currentLogLocator);
}
return logFilter.accept(readOnlyBuffer, currentLogLocator.getMemoryOffset(), logLength);
-
+
} else {
return next(currentLogLocator);//read from disk
}
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogManager.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogManager.java
index 0487aee..2221e86 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogManager.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogManager.java
@@ -36,7 +36,6 @@
public class LogManager implements ILogManager {
-
public static final boolean IS_DEBUG_MODE = false;//true
private static final Logger LOGGER = Logger.getLogger(LogManager.class.getName());
private TransactionSubsystem provider;
@@ -156,7 +155,7 @@
}
public void addFlushRequest(int pageIndex) {
- pendingFlushRequests[pageIndex].add(Thread.currentThread());
+ pendingFlushRequests[pageIndex].add(pendingFlushRequests);
}
public AtomicLong getLastFlushedLsn() {
@@ -427,6 +426,9 @@
}
if (forwardPage) {
+ //TODO
+ //this is not safe since the incoming thread may reach the same page slot with this page
+ //(differ by the log buffer size)
logPageStatus[prevPage].set(PageState.INACTIVE); // mark
// previous
// page
@@ -491,8 +493,8 @@
@Override
public void log(byte logType, TransactionContext context, int datasetId, int PKHashValue, long resourceId,
- byte resourceMgrId, int logContentSize, ReusableLogContentObject reusableLogContentObject,
- ILogger logger, LogicalLogLocator logicalLogLocator) throws ACIDException {
+ byte resourceMgrId, int logContentSize, ReusableLogContentObject reusableLogContentObject, ILogger logger,
+ LogicalLogLocator logicalLogLocator) throws ACIDException {
/*
* logLocator is a re-usable object that is appropriately set in each
* invocation. If the reference is null, the log manager must throw an
@@ -531,6 +533,9 @@
previousLSN = context.getLastLogLocator().getLsn();
currentLSN = getLsn(totalLogSize, logType);
context.setLastLSN(currentLSN);
+ if (IS_DEBUG_MODE) {
+ System.out.println("--------------> LSN(" + currentLSN + ") is allocated");
+ }
logicalLogLocator.setLsn(currentLSN);
}
@@ -567,7 +572,7 @@
*/
logRecordHelper.writeLogHeader(logicalLogLocator, logType, context, datasetId, PKHashValue, previousLSN,
resourceId, resourceMgrId, logContentSize);
-
+
// increment the offset so that the transaction can fill up the
// content in the correct region of the allocated space.
logicalLogLocator.increaseMemoryOffset(logRecordHelper.getLogHeaderSize(logType));
@@ -585,7 +590,8 @@
logger.log(context, logicalLogLocator, logContentSize, reusableLogContentObject);
logger.postLog(context, reusableLogContentObject);
if (IS_DEBUG_MODE) {
- logicalLogLocator.setMemoryOffset(logicalLogLocator.getMemoryOffset() - logRecordHelper.getLogHeaderSize(logType));
+ logicalLogLocator.setMemoryOffset(logicalLogLocator.getMemoryOffset()
+ - logRecordHelper.getLogHeaderSize(logType));
System.out.println(logRecordHelper.getLogRecordForDisplay(logicalLogLocator));
logicalLogLocator.increaseMemoryOffset(logRecordHelper.getLogHeaderSize(logType));
}
@@ -598,8 +604,12 @@
int startPosChecksum = logicalLogLocator.getMemoryOffset() - logRecordHelper.getLogHeaderSize(logType);
int length = totalLogSize - logRecordHelper.getLogChecksumSize();
long checksum = DataUtil.getChecksum(logPages[pageIndex], startPosChecksum, length);
- logPages[pageIndex].writeLong(pageOffset + logRecordHelper.getLogHeaderSize(logType)
- + logContentSize, checksum);
+ logPages[pageIndex].writeLong(pageOffset + logRecordHelper.getLogHeaderSize(logType) + logContentSize,
+ checksum);
+
+ if (IS_DEBUG_MODE) {
+ System.out.println("--------------> LSN(" + currentLSN + ") is written");
+ }
/*
* release the ownership as the log record has been placed in
@@ -614,8 +624,7 @@
* If the transaction thread happens to be the last owner of the log
* page the page must by marked as a candidate to be flushed.
*/
- if (pageDirtyCount == 0) {
- logPageStatus[pageIndex].set(PageState.INACTIVE);
+ if (pageDirtyCount == 0 && logPageStatus[pageIndex].get() == PageState.INACTIVE) {
addFlushRequest(pageIndex);
addedFlushRequest = true;
}
@@ -627,24 +636,13 @@
* been flushed to disk because the containing log page filled up.
*/
if (logType == LogType.COMMIT) {
- if (getLastFlushedLsn().get() < currentLSN) {
- if (!addedFlushRequest) {
- addFlushRequest(pageIndex);
- }
-
- /*
- * the commit log record is still in log buffer. need to
- * wait until the containing log page is flushed. When the
- * log flusher thread does flush the page, it notifies all
- * waiting threads of the flush event.
- */
- synchronized (logPages[pageIndex]) {
- while (getLastFlushedLsn().get() < currentLSN) {
- logPages[pageIndex].wait();
- }
+ synchronized (logPages[pageIndex]) {
+ while (getLastFlushedLsn().get() < currentLSN) {
+ logPages[pageIndex].wait();
}
}
}
+
} catch (Exception e) {
e.printStackTrace();
throw new ACIDException(context, "Thread: " + Thread.currentThread().getName()
@@ -669,6 +667,9 @@
logPages[pageIndex].reset(filePath, LogUtil.getFileOffset(this, nextWritePosition),
logManagerProperties.getLogPageSize());
+
+ //TODO Check if this is necessary
+ //Arrays.fill(logPages[pageIndex].getArray(), (byte) 0);
}
@Override
@@ -688,8 +689,7 @@
* Read a log that is residing on the disk.
*/
private void readDiskLog(long lsnValue, LogicalLogLocator logicalLogLocator) throws ACIDException {
- String filePath = LogUtil.getLogFilePath(logManagerProperties,
- LogUtil.getFileId(this, lsnValue));
+ String filePath = LogUtil.getLogFilePath(logManagerProperties, LogUtil.getFileId(this, lsnValue));
long fileOffset = LogUtil.getFileOffset(this, lsnValue);
ByteBuffer buffer = ByteBuffer.allocate(logManagerProperties.getLogPageSize());
RandomAccessFile raf = null;
@@ -701,7 +701,7 @@
buffer.position(0);
byte logType = buffer.get(4);
int logHeaderSize = logRecordHelper.getLogHeaderSize(logType);
- int logBodySize = buffer.getInt(logHeaderSize-4);
+ int logBodySize = buffer.getInt(logHeaderSize - 4);
int logRecordSize = logHeaderSize + logBodySize + logRecordHelper.getLogChecksumSize();
buffer.limit(logRecordSize);
MemBasedBuffer memBuffer = new MemBasedBuffer(buffer.slice());
@@ -716,8 +716,8 @@
throw new ACIDException(" invalid log record at lsn " + lsnValue);
}
} catch (Exception fnfe) {
- throw new ACIDException(" unable to retrieve log record with lsn " + lsnValue
- + " from the file system", fnfe);
+ throw new ACIDException(" unable to retrieve log record with lsn " + lsnValue + " from the file system",
+ fnfe);
} finally {
try {
if (raf != null) {
@@ -742,10 +742,10 @@
if (lsnValue > getLastFlushedLsn().get()) {
int pageIndex = getLogPageIndex(lsnValue);
int pageOffset = getLogPageOffset(lsnValue);
-
+
//TODO
//minimize memory allocation overhead. current code allocates the log page size per reading a log record.
-
+
byte[] pageContent = new byte[logManagerProperties.getLogPageSize()];
// take a lock on the log page so that the page is not flushed to
// disk interim
@@ -759,7 +759,7 @@
// get the log record length
logPages[pageIndex].getBytes(pageContent, 0, pageContent.length);
- byte logType = pageContent[pageOffset+4];
+ byte logType = pageContent[pageOffset + 4];
int logHeaderSize = logRecordHelper.getLogHeaderSize(logType);
int logBodySize = DataUtil.byteArrayToInt(pageContent, pageOffset + logHeaderSize - 4);
int logRecordSize = logHeaderSize + logBodySize + logRecordHelper.getLogChecksumSize();
@@ -783,8 +783,7 @@
throw new ACIDException(" invalid log record at lsn " + lsnValue);
}
} catch (Exception e) {
- throw new ACIDException("exception encoutered in validating log record at lsn "
- + lsnValue, e);
+ throw new ACIDException("exception encoutered in validating log record at lsn " + lsnValue, e);
}
return;
}
@@ -996,6 +995,7 @@
// that got flushed.
logManager.getLogPages()[pageToFlush].notifyAll();
logManager.setLastFlushedPage(pageToFlush);
+
}
} catch (IOException ioe) {
ioe.printStackTrace();
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogManagerProperties.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogManagerProperties.java
index fe48743..177533d 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogManagerProperties.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogManagerProperties.java
@@ -40,13 +40,14 @@
private int logPageSize = 128 * 1024; // 128 KB
private int numLogPages = 8; // number of log pages in the log buffer.
- private long logPartitionSize = logPageSize * 250; // maximum size of each
- // log file
+
private long groupCommitWaitPeriod = 0; // time in milliseconds for which a
// commit record will wait before
// the housing page is marked for
// flushing.
private int logBufferSize = logPageSize * numLogPages;
+ // maximum size of each log file
+ private long logPartitionSize = logBufferSize * 1024 * 2; //2GB
public int logMagicNumber = 123456789;
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/recovery/RecoveryManager.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/recovery/RecoveryManager.java
index 10d69dc..0905907 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/recovery/RecoveryManager.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/recovery/RecoveryManager.java
@@ -46,7 +46,6 @@
import edu.uci.ics.asterix.transaction.management.service.logging.LogicalLogLocator;
import edu.uci.ics.asterix.transaction.management.service.logging.PhysicalLogLocator;
import edu.uci.ics.asterix.transaction.management.service.transaction.IResourceManager;
-import edu.uci.ics.asterix.transaction.management.service.transaction.JobId;
import edu.uci.ics.asterix.transaction.management.service.transaction.TransactionContext;
import edu.uci.ics.asterix.transaction.management.service.transaction.TransactionManagementConstants;
import edu.uci.ics.asterix.transaction.management.service.transaction.TransactionSubsystem;
@@ -59,6 +58,7 @@
*/
public class RecoveryManager implements IRecoveryManager {
+ public static final boolean IS_DEBUG_MODE = false;//true
private static final Logger LOGGER = Logger.getLogger(RecoveryManager.class.getName());
private TransactionSubsystem txnSubsystem;
@@ -176,6 +176,9 @@
Map<TxnId, List<Long>> loserTxnTable = new HashMap<TxnId, List<Long>>();
TxnId tempKeyTxnId = new TxnId(-1, -1, -1);
+ int updateLogCount = 0;
+ int commitLogCount = 0;
+
// Obtain the first log record written by the Job
PhysicalLogLocator firstLSNLogLocator = txnContext.getFirstLogLocator();
PhysicalLogLocator lastLSNLogLocator = txnContext.getLastLogLocator();
@@ -212,10 +215,10 @@
List<Long> undoLSNSet = null;
if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info(" collecting loser transaction's LSNs from " + firstLSNLogLocator.getLsn() + " to " +
- + lastLSNLogLocator.getLsn());
+ LOGGER.info(" collecting loser transaction's LSNs from " + firstLSNLogLocator.getLsn() + " to "
+ + +lastLSNLogLocator.getLsn());
}
-
+
while (currentLogLocator.getLsn() != lastLSNLogLocator.getLsn()) {
try {
valid = logCursor.next(currentLogLocator);
@@ -229,13 +232,13 @@
break;//End of Log File
}
}
-
+
if (LogManager.IS_DEBUG_MODE) {
System.out.println(logManager.getLogRecordHelper().getLogRecordForDisplay(currentLogLocator));
}
- tempKeyTxnId.setTxnId(logRecordHelper.getJobId(currentLogLocator), logRecordHelper.getDatasetId(currentLogLocator),
- logRecordHelper.getPKHashValue(currentLogLocator));
+ tempKeyTxnId.setTxnId(logRecordHelper.getJobId(currentLogLocator),
+ logRecordHelper.getDatasetId(currentLogLocator), logRecordHelper.getPKHashValue(currentLogLocator));
logType = logRecordHelper.getLogType(currentLogLocator);
switch (logType) {
@@ -243,11 +246,17 @@
undoLSNSet = loserTxnTable.get(tempKeyTxnId);
if (undoLSNSet == null) {
TxnId txnId = new TxnId(logRecordHelper.getJobId(currentLogLocator),
- logRecordHelper.getDatasetId(currentLogLocator), logRecordHelper.getPKHashValue(currentLogLocator));
+ logRecordHelper.getDatasetId(currentLogLocator),
+ logRecordHelper.getPKHashValue(currentLogLocator));
undoLSNSet = new ArrayList<Long>();
loserTxnTable.put(txnId, undoLSNSet);
}
undoLSNSet.add(currentLogLocator.getLsn());
+ if (IS_DEBUG_MODE) {
+ updateLogCount++;
+ System.out.println("" + Thread.currentThread().getId() + "======> update["
+ + currentLogLocator.getLsn() + "]:" + tempKeyTxnId);
+ }
break;
case LogType.COMMIT:
@@ -255,6 +264,11 @@
if (undoLSNSet != null) {
loserTxnTable.remove(tempKeyTxnId);
}
+ if (IS_DEBUG_MODE) {
+ commitLogCount++;
+ System.out.println("" + Thread.currentThread().getId() + "======> commit["
+ + currentLogLocator.getLsn() + "]" + tempKeyTxnId);
+ }
break;
default:
@@ -266,11 +280,13 @@
if (LOGGER.isLoggable(Level.INFO)) {
LOGGER.info(" undoing loser transaction's effect");
}
-
+
TxnId txnId = null;
Iterator<Entry<TxnId, List<Long>>> iter = loserTxnTable.entrySet().iterator();
byte resourceMgrId;
+ int undoCount = 0;
while (iter.hasNext()) {
+
Map.Entry<TxnId, List<Long>> loserTxn = (Map.Entry<TxnId, List<Long>>) iter.next();
txnId = loserTxn.getKey();
@@ -280,7 +296,7 @@
for (long undoLSN : undoLSNSet) {
// here, all the log records are UPDATE type. So, we don't need to check the type again.
-
+
//read the corresponding log record to be undone.
logManager.readLog(undoLSN, currentLogLocator);
@@ -305,12 +321,20 @@
resourceMgrId, resourceMgr);
}
resourceMgr.undo(logRecordHelper, currentLogLocator);
+
+ if (IS_DEBUG_MODE) {
+ undoCount++;
+ }
}
}
-
+
if (LOGGER.isLoggable(Level.INFO)) {
LOGGER.info(" undone loser transaction's effect");
}
+ if (IS_DEBUG_MODE) {
+ System.out.println("UpdateLogCount/CommitLogCount/UndoCount:" + updateLogCount + "/" + commitLogCount + "/"
+ + undoCount);
+ }
}
}
@@ -338,6 +362,11 @@
}
@Override
+ public String toString() {
+ return "[" + jobId + "," + datasetId + "," + pkHashVal + "]";
+ }
+
+ @Override
public int hashCode() {
return pkHashVal;
}
@@ -347,7 +376,7 @@
if (o == this) {
return true;
}
- if (!(o instanceof JobId)) {
+ if (!(o instanceof TxnId)) {
return false;
}
TxnId txnId = (TxnId) o;
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionManager.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionManager.java
index 3f71540..bafa753 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionManager.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionManager.java
@@ -74,6 +74,7 @@
@Override
public TransactionContext getTransactionContext(JobId jobId) throws ACIDException {
synchronized (transactionContextRepository) {
+
TransactionContext context = transactionContextRepository.get(jobId);
if (context == null) {
context = transactionContextRepository.get(jobId);