Merge branch 'master' into zheilbron/asterix_msr_demo
Conflicts:
asterix-common/src/main/java/edu/uci/ics/asterix/common/api/AsterixThreadExecutor.java
asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogManager.java
diff --git a/asterix-algebra/src/main/java/edu/uci/ics/asterix/algebra/operators/physical/CommitRuntime.java b/asterix-algebra/src/main/java/edu/uci/ics/asterix/algebra/operators/physical/CommitRuntime.java
index a7c2fdb..6dd11bd 100644
--- a/asterix-algebra/src/main/java/edu/uci/ics/asterix/algebra/operators/physical/CommitRuntime.java
+++ b/asterix-algebra/src/main/java/edu/uci/ics/asterix/algebra/operators/physical/CommitRuntime.java
@@ -19,10 +19,11 @@
import edu.uci.ics.asterix.common.api.IAsterixAppRuntimeContext;
import edu.uci.ics.asterix.common.exceptions.ACIDException;
-import edu.uci.ics.asterix.common.transactions.DatasetId;
+import edu.uci.ics.asterix.common.transactions.ILogManager;
import edu.uci.ics.asterix.common.transactions.ITransactionContext;
import edu.uci.ics.asterix.common.transactions.ITransactionManager;
import edu.uci.ics.asterix.common.transactions.JobId;
+import edu.uci.ics.asterix.transaction.management.service.logging.LogRecord;
import edu.uci.ics.hyracks.algebricks.runtime.base.IPushRuntime;
import edu.uci.ics.hyracks.api.comm.IFrameWriter;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
@@ -39,11 +40,13 @@
private final IHyracksTaskContext hyracksTaskCtx;
private final ITransactionManager transactionManager;
+ private final ILogManager logMgr;
private final JobId jobId;
- private final DatasetId datasetId;
+ private final int datasetId;
private final int[] primaryKeyFields;
private final boolean isWriteTransaction;
private final long[] longHashes;
+ private final LogRecord logRecord;
private ITransactionContext transactionContext;
private RecordDescriptor inputRecordDesc;
@@ -56,12 +59,14 @@
IAsterixAppRuntimeContext runtimeCtx = (IAsterixAppRuntimeContext) ctx.getJobletContext()
.getApplicationContext().getApplicationObject();
this.transactionManager = runtimeCtx.getTransactionSubsystem().getTransactionManager();
+ this.logMgr = runtimeCtx.getTransactionSubsystem().getLogManager();
this.jobId = jobId;
- this.datasetId = new DatasetId(datasetId);
+ this.datasetId = datasetId;
this.primaryKeyFields = primaryKeyFields;
this.frameTupleReference = new FrameTupleReference();
this.isWriteTransaction = isWriteTransaction;
this.longHashes = new long[2];
+ this.logRecord = new LogRecord();
}
@Override
@@ -82,11 +87,9 @@
for (int t = 0; t < nTuple; t++) {
frameTupleReference.reset(frameTupleAccessor, t);
pkHash = computePrimaryKeyHashValue(frameTupleReference, primaryKeyFields);
- try {
- transactionManager.commitTransaction(transactionContext, datasetId, pkHash);
- } catch (ACIDException e) {
- throw new HyracksDataException(e);
- }
+ logRecord.formEntityCommitLogRecord(transactionContext, datasetId, pkHash, frameTupleReference,
+ primaryKeyFields);
+ logMgr.log(logRecord);
}
}
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/api/common/AsterixAppRuntimeContext.java b/asterix-app/src/main/java/edu/uci/ics/asterix/api/common/AsterixAppRuntimeContext.java
index 9f41527..0bbb956 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/api/common/AsterixAppRuntimeContext.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/api/common/AsterixAppRuntimeContext.java
@@ -16,7 +16,6 @@
import java.io.IOException;
import java.util.List;
-import java.util.concurrent.Executor;
import java.util.logging.Logger;
import edu.uci.ics.asterix.common.api.AsterixThreadExecutor;
@@ -240,7 +239,7 @@
}
@Override
- public Executor getThreadExecutor() {
+ public AsterixThreadExecutor getThreadExecutor() {
return threadExecutor;
}
}
\ No newline at end of file
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/api/common/AsterixAppRuntimeContextProdiverForRecovery.java b/asterix-app/src/main/java/edu/uci/ics/asterix/api/common/AsterixAppRuntimeContextProdiverForRecovery.java
index 1b78651..98de802 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/api/common/AsterixAppRuntimeContextProdiverForRecovery.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/api/common/AsterixAppRuntimeContextProdiverForRecovery.java
@@ -15,8 +15,8 @@
package edu.uci.ics.asterix.api.common;
import java.util.List;
-import java.util.concurrent.Executor;
+import edu.uci.ics.asterix.common.api.AsterixThreadExecutor;
import edu.uci.ics.asterix.common.transactions.IAsterixAppRuntimeContextProvider;
import edu.uci.ics.asterix.common.transactions.ITransactionSubsystem;
import edu.uci.ics.asterix.transaction.management.service.transaction.AsterixRuntimeComponentsProvider;
@@ -121,7 +121,7 @@
}
@Override
- public Executor getThreadExecutor() {
+ public AsterixThreadExecutor getThreadExecutor() {
return asterixAppRuntimeContext.getThreadExecutor();
}
}
\ No newline at end of file
diff --git a/asterix-app/src/main/resources/asterix-build-configuration.xml b/asterix-app/src/main/resources/asterix-build-configuration.xml
index 6a6332d..9c4d15b 100644
--- a/asterix-app/src/main/resources/asterix-build-configuration.xml
+++ b/asterix-app/src/main/resources/asterix-build-configuration.xml
@@ -1,50 +1,40 @@
-<!--
- ! Copyright 2009-2013 by The Regents of the University of California
- ! Licensed under the Apache License, Version 2.0 (the "License");
- ! you may not use this file except in compliance with the License.
- ! you may obtain a copy of the License from
- !
- ! http://www.apache.org/licenses/LICENSE-2.0
- !
- ! Unless required by applicable law or agreed to in writing, software
- ! distributed under the License is distributed on an "AS IS" BASIS,
- ! WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- ! See the License for the specific language governing permissions and
- ! limitations under the License.
- !-->
+<!-- ! Copyright 2009-2013 by The Regents of the University of California
+ ! Licensed under the Apache License, Version 2.0 (the "License"); ! you may
+ not use this file except in compliance with the License. ! you may obtain
+ a copy of the License from ! ! http://www.apache.org/licenses/LICENSE-2.0
+ ! ! Unless required by applicable law or agreed to in writing, software !
+ distributed under the License is distributed on an "AS IS" BASIS, ! WITHOUT
+ WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. ! See the
+ License for the specific language governing permissions and ! limitations
+ under the License. ! -->
<asterixConfiguration xmlns="asterixconf">
- <metadataNode>nc1</metadataNode>
- <store>
- <ncId>nc1</ncId>
- <storeDirs>nc1data</storeDirs>
- </store>
- <store>
- <ncId>nc2</ncId>
- <storeDirs>nc2data</storeDirs>
- </store>
- <transactionLogDir>
- <ncId>nc1</ncId>
- <txnLogDirPath>target/txnLogDir/nc1</txnLogDirPath>
- </transactionLogDir>
- <transactionLogDir>
- <ncId>nc2</ncId>
- <txnLogDirPath>target/txnLogDir/nc2</txnLogDirPath>
- </transactionLogDir>
- <property>
- <name>log.level</name>
- <value>WARNING</value>
- <description>Log level for running tests/build</description>
- </property>
- <property>
+ <metadataNode>nc1</metadataNode>
+ <store>
+ <ncId>nc1</ncId>
+ <storeDirs>nc1data</storeDirs>
+ </store>
+ <store>
+ <ncId>nc2</ncId>
+ <storeDirs>nc2data</storeDirs>
+ </store>
+ <transactionLogDir>
+ <ncId>nc1</ncId>
+ <txnLogDirPath>target/txnLogDir/nc1</txnLogDirPath>
+ </transactionLogDir>
+ <transactionLogDir>
+ <ncId>nc2</ncId>
+ <txnLogDirPath>target/txnLogDir/nc2</txnLogDirPath>
+ </transactionLogDir>
+ <property>
+ <name>log.level</name>
+ <value>WARNING</value>
+ <description>Log level for running tests/build</description>
+ </property>
+ <property>
<name>storage.memorycomponent.numpages</name>
<value>8</value>
<description>The number of pages to allocate for a memory component.
(Default = 8)
</description>
</property>
- <property>
- <name>txn.log.groupcommitinterval</name>
- <value>1</value>
- <description>The group commit wait time in milliseconds.</description>
- </property>
</asterixConfiguration>
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/api/AsterixThreadExecutor.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/api/AsterixThreadExecutor.java
index 990278c..f2026e2 100644
--- a/asterix-common/src/main/java/edu/uci/ics/asterix/common/api/AsterixThreadExecutor.java
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/api/AsterixThreadExecutor.java
@@ -14,19 +14,26 @@
*/
package edu.uci.ics.asterix.common.api;
+import java.util.concurrent.Callable;
import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
import java.util.concurrent.ThreadFactory;
public class AsterixThreadExecutor implements Executor {
- private final Executor executor;
+ private final ExecutorService executorService;
public AsterixThreadExecutor(ThreadFactory threadFactory) {
- executor = Executors.newCachedThreadPool(threadFactory);
+ executorService = Executors.newCachedThreadPool(threadFactory);
}
@Override
public void execute(Runnable command) {
- executor.execute(command);
+ executorService.execute(command);
+ }
+
+ public Future<Object> submit(Callable command) {
+ return (Future<Object>) executorService.submit(command);
}
}
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/config/AsterixTransactionProperties.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/config/AsterixTransactionProperties.java
index 00540f4..978577a 100644
--- a/asterix-common/src/main/java/edu/uci/ics/asterix/common/config/AsterixTransactionProperties.java
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/config/AsterixTransactionProperties.java
@@ -27,12 +27,6 @@
private static final String TXN_LOG_PARTITIONSIZE_KEY = "txn.log.partitionsize";
private static final long TXN_LOG_PARTITIONSIZE_DEFAULT = ((long)2 << 30); // 2GB
- private static final String TXN_LOG_DISKSECTORSIZE_KEY = "txn.log.disksectorsize";
- private static final int TXN_LOG_DISKSECTORSIZE_DEFAULT = 4096;
-
- private static final String TXN_LOG_GROUPCOMMITINTERVAL_KEY = "txn.log.groupcommitinterval";
- private static int TXN_LOG_GROUPCOMMITINTERVAL_DEFAULT = 10; // 0.1ms
-
private static final String TXN_LOG_CHECKPOINT_LSNTHRESHOLD_KEY = "txn.log.checkpoint.lsnthreshold";
private static final int TXN_LOG_CHECKPOINT_LSNTHRESHOLD_DEFAULT = (64 << 20); // 64M
@@ -81,16 +75,6 @@
PropertyInterpreters.getLongPropertyInterpreter());
}
- public int getLogDiskSectorSize() {
- return accessor.getProperty(TXN_LOG_DISKSECTORSIZE_KEY, TXN_LOG_DISKSECTORSIZE_DEFAULT,
- PropertyInterpreters.getIntegerPropertyInterpreter());
- }
-
- public int getGroupCommitInterval() {
- return accessor.getProperty(TXN_LOG_GROUPCOMMITINTERVAL_KEY, TXN_LOG_GROUPCOMMITINTERVAL_DEFAULT,
- PropertyInterpreters.getIntegerPropertyInterpreter());
- }
-
public int getCheckpointLSNThreshold() {
return accessor.getProperty(TXN_LOG_CHECKPOINT_LSNTHRESHOLD_KEY, TXN_LOG_CHECKPOINT_LSNTHRESHOLD_DEFAULT,
PropertyInterpreters.getIntegerPropertyInterpreter());
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/IAsterixAppRuntimeContextProvider.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/IAsterixAppRuntimeContextProvider.java
index 3015bf6..ded6cb0 100644
--- a/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/IAsterixAppRuntimeContextProvider.java
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/IAsterixAppRuntimeContextProvider.java
@@ -15,8 +15,8 @@
package edu.uci.ics.asterix.common.transactions;
import java.util.List;
-import java.util.concurrent.Executor;
+import edu.uci.ics.asterix.common.api.AsterixThreadExecutor;
import edu.uci.ics.hyracks.api.io.IIOManager;
import edu.uci.ics.hyracks.storage.am.common.api.IIndexLifecycleManager;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallbackProvider;
@@ -31,7 +31,7 @@
public interface IAsterixAppRuntimeContextProvider {
- public Executor getThreadExecutor();
+ public AsterixThreadExecutor getThreadExecutor();
public IBufferCache getBufferCache();
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/ILockManager.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/ILockManager.java
index 54c86af..a752afa 100644
--- a/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/ILockManager.java
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/ILockManager.java
@@ -70,17 +70,6 @@
throws ACIDException;
/**
- * @param datasetId
- * @param entityHashValue
- * @param txnContext
- * @throws ACIDException
- * TODO
- * @return
- */
- public void unlock(DatasetId datasetId, int entityHashValue, ITransactionContext txnContext, boolean commitFlag)
- throws ACIDException;
-
- /**
* Call to lock and unlock a specific resource in a specific lock mode
*
* @param datasetId
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/ILogRecord.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/ILogRecord.java
index d810ebd..d13ef6c 100644
--- a/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/ILogRecord.java
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/ILogRecord.java
@@ -20,16 +20,18 @@
public interface ILogRecord {
- public static final int COMMIT_LOG_SIZE = 21;
- public static final int UPDATE_LOG_BASE_SIZE = 56;
+ public static final int JOB_COMMIT_LOG_SIZE = 13;
+ public static final int ENTITY_COMMIT_LOG_BASE_SIZE = 29;
+ public static final int UPDATE_LOG_BASE_SIZE = 64;
public boolean readLogRecord(ByteBuffer buffer);
public void writeLogRecord(ByteBuffer buffer);
-
- public void formCommitLogRecord(ITransactionContext txnCtx, byte logType, int jobId, int datasetId, int PKHashValue);
- public void setUpdateLogSize();
+ public void formJobCommitLogRecord(ITransactionContext txnCtx);
+
+ public void formEntityCommitLogRecord(ITransactionContext txnCtx, int datasetId, int PKHashValue,
+ ITupleReference tupleReference, int[] primaryKeyFields);
public ITransactionContext getTxnCtx();
@@ -98,11 +100,23 @@
public long getChecksum();
public void setChecksum(long checksum);
-
+
public long getLSN();
public void setLSN(long LSN);
public String getLogRecordForDisplay();
+ public void computeAndSetLogSize();
+
+ public int getPKValueSize();
+
+ public ITupleReference getPKValue();
+
+ public void setPKFields(int[] primaryKeyFields);
+
+ public void computeAndSetPKValueSize();
+
+ public void setPKValue(ITupleReference PKValue);
+
}
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/LogManagerProperties.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/LogManagerProperties.java
index 591d9b1..dd1e7b4 100644
--- a/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/LogManagerProperties.java
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/LogManagerProperties.java
@@ -36,14 +36,10 @@
private final int logPageSize;
// number of log pages in the log buffer.
private final int numLogPages;
- // time in milliseconds
- private final long groupCommitWaitPeriod;
// logBufferSize = logPageSize * numLogPages;
private final int logBufferSize;
// maximum size of each log file
private final long logPartitionSize;
- // default disk sector size
- private final int diskSectorSize;
public LogManagerProperties(AsterixTransactionProperties txnProperties, String nodeId) {
this.logDirKey = new String(nodeId + LOG_DIR_SUFFIX);
@@ -52,12 +48,9 @@
long logPartitionSize = txnProperties.getLogPartitionSize();
this.logDir = txnProperties.getLogDirectory(nodeId);
this.logFilePrefix = DEFAULT_LOG_FILE_PREFIX;
- this.groupCommitWaitPeriod = txnProperties.getGroupCommitInterval();
-
this.logBufferSize = logPageSize * numLogPages;
//make sure that the log partition size is the multiple of log buffer size.
this.logPartitionSize = (logPartitionSize / logBufferSize) * logBufferSize;
- this.diskSectorSize = txnProperties.getLogDiskSectorSize();
}
public long getLogPartitionSize() {
@@ -84,18 +77,10 @@
return logBufferSize;
}
- public long getGroupCommitWaitPeriod() {
- return groupCommitWaitPeriod;
- }
-
public String getLogDirKey() {
return logDirKey;
}
- public int getDiskSectorSize() {
- return diskSectorSize;
- }
-
public String toString() {
StringBuilder builder = new StringBuilder();
builder.append("log_dir_ : " + logDir + lineSeparator);
@@ -103,7 +88,6 @@
builder.append("log_page_size : " + logPageSize + lineSeparator);
builder.append("num_log_pages : " + numLogPages + lineSeparator);
builder.append("log_partition_size : " + logPartitionSize + lineSeparator);
- builder.append("group_commit_wait_period : " + groupCommitWaitPeriod + lineSeparator);
return builder.toString();
}
}
diff --git a/asterix-installer/src/main/resources/conf/asterix-configuration.xml b/asterix-installer/src/main/resources/conf/asterix-configuration.xml
index deeb5b0..084a3f8 100644
--- a/asterix-installer/src/main/resources/conf/asterix-configuration.xml
+++ b/asterix-installer/src/main/resources/conf/asterix-configuration.xml
@@ -126,21 +126,6 @@
</property>
<property>
- <name>txn.log.disksectorsize</name>
- <value>4096</value>
- <description>The size of a disk sector. (Default = "4096")
- </description>
- </property>
-
- <property>
- <name>txn.log.groupcommitinterval</name>
- <value>40</value>
- <description>The group commit wait time in milliseconds. (Default =
- "40" // 40ms)
- </description>
- </property>
-
- <property>
<name>txn.log.checkpoint.lsnthreshold</name>
<value>67108864</value>
<description>The size of the window that the maximum LSN is allowed to
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/AbstractIndexModificationOperationCallback.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/AbstractIndexModificationOperationCallback.java
index dac3e95..031a26e 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/AbstractIndexModificationOperationCallback.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/AbstractIndexModificationOperationCallback.java
@@ -58,6 +58,9 @@
protected void log(int PKHash, ITupleReference newValue, IndexOperation oldOp, ITupleReference oldValue)
throws ACIDException {
logRecord.setPKHashValue(PKHash);
+ logRecord.setPKFields(primaryKeyFields);
+ logRecord.setPKValue(newValue);
+ logRecord.computeAndSetPKValueSize();
if (newValue != null) {
logRecord.setNewValueSize(tupleWriter.bytesRequired(newValue));
logRecord.setNewValue(newValue);
@@ -73,7 +76,7 @@
logRecord.setOldValueSize(0);
}
}
- logRecord.setUpdateLogSize();
+ logRecord.computeAndSetLogSize();
txnSubsystem.getLogManager().log(logRecord);
}
}
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/LockManager.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/LockManager.java
index bc8b722..f1de9e6 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/LockManager.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/LockManager.java
@@ -29,7 +29,6 @@
import edu.uci.ics.asterix.common.exceptions.ACIDException;
import edu.uci.ics.asterix.common.transactions.DatasetId;
import edu.uci.ics.asterix.common.transactions.ILockManager;
-import edu.uci.ics.asterix.common.transactions.ILogRecord;
import edu.uci.ics.asterix.common.transactions.ITransactionContext;
import edu.uci.ics.asterix.common.transactions.ITransactionManager;
import edu.uci.ics.asterix.common.transactions.JobId;
@@ -86,7 +85,6 @@
private LockRequestTracker lockRequestTracker; //for debugging
private ConsecutiveWakeupContext consecutiveWakeupContext;
- private final ILogRecord logRecord;
public LockManager(TransactionSubsystem txnSubsystem) throws ACIDException {
this.txnSubsystem = txnSubsystem;
@@ -105,7 +103,6 @@
this.tempDatasetIdObj = new DatasetId(0);
this.tempJobIdObj = new JobId(0);
this.consecutiveWakeupContext = new ConsecutiveWakeupContext();
- this.logRecord = new LogRecord();
if (IS_DEBUG_MODE) {
this.lockRequestTracker = new LockRequestTracker();
}
@@ -643,22 +640,16 @@
@Override
public void unlock(DatasetId datasetId, int entityHashValue, ITransactionContext txnContext) throws ACIDException {
- internalUnlock(datasetId, entityHashValue, txnContext, false, false);
- }
-
- @Override
- public void unlock(DatasetId datasetId, int entityHashValue, ITransactionContext txnContext, boolean commitFlag)
- throws ACIDException {
- internalUnlock(datasetId, entityHashValue, txnContext, false, commitFlag);
+ internalUnlock(datasetId, entityHashValue, txnContext, false);
}
private void instantUnlock(DatasetId datasetId, int entityHashValue, ITransactionContext txnContext)
throws ACIDException {
- internalUnlock(datasetId, entityHashValue, txnContext, true, false);
+ internalUnlock(datasetId, entityHashValue, txnContext, true);
}
private void internalUnlock(DatasetId datasetId, int entityHashValue, ITransactionContext txnContext,
- boolean isInstant, boolean commitFlag) throws ACIDException {
+ boolean isInstant) throws ACIDException {
JobId jobId = txnContext.getJobId();
int eLockInfo = -1;
DatasetLockInfo dLockInfo = null;
@@ -702,22 +693,6 @@
+ "," + entityHashValue + "]: Corresponding lock info doesn't exist.");
}
- //////////////////////////////////////////////////////////
- //[Notice]
- //If both EntityLockCount and DatasetLockCount are 1,
- //then write entity-commit log and return without releasing the lock.
- //The lock will be released when the entity-commit log is flushed.
- if (commitFlag && entityInfoManager.getEntityLockCount(entityInfo) == 1
- && entityInfoManager.getDatasetLockCount(entityInfo) == 1) {
- if (txnContext.isWriteTxn()) {
- logRecord.formCommitLogRecord(txnContext, LogType.ENTITY_COMMIT, jobId.getId(), datasetId.getId(),
- entityHashValue);
- txnSubsystem.getLogManager().log(logRecord);
- }
- return;
- }
- //////////////////////////////////////////////////////////
-
datasetLockMode = entityInfoManager.getDatasetLockMode(entityInfo) == LockMode.S ? LockMode.IS
: LockMode.IX;
@@ -759,11 +734,6 @@
waiterObjId = waiterObj.getNextWaiterObjId();
}
if (threadCount == 0) {
- if (entityInfoManager.getEntityLockMode(entityInfo) == LockMode.X) {
- //TODO
- //write a commit log for the unlocked resource
- //need to figure out that instantLock() also needs to write a commit log.
- }
entityInfoManager.deallocate(entityInfo);
}
}
@@ -2244,17 +2214,11 @@
tempDatasetIdObj.setId(logRecord.getDatasetId());
tempJobIdObj.setId(logRecord.getJobId());
txnCtx = txnSubsystem.getTransactionManager().getTransactionContext(tempJobIdObj);
- if (txnCtx == null) {
- throw new IllegalStateException("TransactionContext[" + tempJobIdObj + "] doesn't exist.");
- }
unlock(tempDatasetIdObj, logRecord.getPKHashValue(), txnCtx);
txnCtx.notifyOptracker(false);
} else if (logRecord.getLogType() == LogType.JOB_COMMIT) {
tempJobIdObj.setId(logRecord.getJobId());
txnCtx = txnSubsystem.getTransactionManager().getTransactionContext(tempJobIdObj);
- if (txnCtx == null) {
- throw new IllegalStateException("TransactionContext[" + tempJobIdObj + "] doesn't exist.");
- }
txnCtx.notifyOptracker(true);
((LogPage) logPage).notifyJobCommitter();
}
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/LockManagerDeterministicUnitTest.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/LockManagerDeterministicUnitTest.java
index caf04e0..e61cb55 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/LockManagerDeterministicUnitTest.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/LockManagerDeterministicUnitTest.java
@@ -14,6 +14,7 @@
*/
package edu.uci.ics.asterix.transaction.management.service.locking;
+import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.util.ArrayList;
@@ -21,6 +22,8 @@
import java.util.NoSuchElementException;
import java.util.Scanner;
+import org.apache.commons.io.FileUtils;
+
import edu.uci.ics.asterix.common.config.AsterixPropertiesAccessor;
import edu.uci.ics.asterix.common.config.AsterixTransactionProperties;
import edu.uci.ics.asterix.common.exceptions.ACIDException;
@@ -36,6 +39,13 @@
public class LockManagerDeterministicUnitTest {
public static void main(String args[]) throws ACIDException, IOException, AsterixException {
+ //prepare configuration file
+ File cwd = new File(System.getProperty("user.dir"));
+ File asterixdbDir = cwd.getParentFile();
+ File srcFile = new File(asterixdbDir.getAbsoluteFile(), "asterix-app/src/main/resources/asterix-build-configuration.xml");
+ File destFile = new File(cwd, "target/classes/asterix-configuration.xml");
+ FileUtils.copyFile(srcFile, destFile);
+
//initialize controller thread
String requestFileName = new String(
"src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/LockRequestFile");
@@ -57,8 +67,8 @@
long defaultWaitTime;
public LockRequestController(String requestFileName) throws ACIDException, AsterixException {
- this.txnProvider = new TransactionSubsystem("LockManagerPredefinedUnitTest", null,
- new AsterixTransactionProperties(new AsterixPropertiesAccessor()));
+ this.txnProvider = new TransactionSubsystem("nc1", null, new AsterixTransactionProperties(
+ new AsterixPropertiesAccessor()));
this.workerReadyQueue = new WorkerReadyQueue();
this.requestList = new ArrayList<LockRequest>();
this.expectedResultList = new ArrayList<ArrayList<Integer>>();
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/LockRequestFile b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/LockRequestFile
index fc2a883..c755895 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/LockRequestFile
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/LockRequestFile
@@ -1,20 +1,11 @@
T1 L J1 D1 E1 S
-T3 L J3 D1 E-1 S
-T2 L J2 D1 E-1 X
-T4 L J4 D1 E1 S
-T0 CST 1 3 -1
-T1 L J1 D1 E2 X
-T0 CST 3 -1
-T3 RL J3 D1 E-1 S
-T0 CST 1 3 -1
+T3 L J3 D1 E1 S
+T2 L J2 D1 E1 X
+T0 CSQ 1 3 -1
T1 UL J1 D1 E1 S
-T0 CST 1 3 -1
-T1 UL J1 D1 E2 X
-T0 CST 1 2 3 -1
-T3 END
+T3 UL J3 D1 E1 S
+T0 CST 1 2 3 -1
+T2 RL J2 D1 E1 X
T1 END
-T2 RL J2 D1 E-1 X
T2 END
-T0 CST 4 -1
-T4 UL J4 D1 E1 S
-T4 END
\ No newline at end of file
+T3 END
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogManager.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogManager.java
index a4b6467..42172b0 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogManager.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogManager.java
@@ -24,7 +24,11 @@
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.logging.Level;
import java.util.logging.Logger;
@@ -58,6 +62,7 @@
private FileChannel appendChannel;
private LogPage appendPage;
private LogFlusher logFlusher;
+ private Future<Object> futureLogFlusher;
public LogManager(TransactionSubsystem txnSubsystem) throws ACIDException {
this.txnSubsystem = txnSubsystem;
@@ -86,8 +91,7 @@
appendChannel = getFileChannel(appendLSN, false);
getAndInitNewPage();
logFlusher = new LogFlusher(this, emptyQ, flushQ);
- logFlusher.setDaemon(true);
- txnSubsystem.getAsterixAppRuntimeContextProvider().getThreadExecutor().execute(logFlusher);
+ futureLogFlusher = txnSubsystem.getAsterixAppRuntimeContextProvider().getThreadExecutor().submit(logFlusher);
}
@Override
@@ -174,6 +178,7 @@
@Override
public void stop(boolean dumpState, OutputStream os) {
+ terminateLogFlusher();
if (dumpState) {
// #. dump Configurable Variables
dumpConfVars(os);
@@ -267,17 +272,31 @@
}
private void terminateLogFlusher() {
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Terminating LogFlusher thread ...");
+ }
logFlusher.terminate();
try {
- logFlusher.join();
- } catch (InterruptedException e) {
- throw new IllegalStateException(e);
+ futureLogFlusher.get();
+ } catch (ExecutionException | InterruptedException e) {
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("---------- warning(begin): LogFlusher thread is terminated abnormally --------");
+ e.printStackTrace();
+ LOGGER.info("---------- warning(end) : LogFlusher thread is terminated abnormally --------");
+ }
+ }
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("LogFlusher thread is terminated.");
}
}
- private void deleteAllLogFiles() throws IOException {
+ private void deleteAllLogFiles() {
if (appendChannel != null) {
- appendChannel.close();
+ try {
+ appendChannel.close();
+ } catch (IOException e) {
+ throw new IllegalStateException("Failed to close a fileChannel of a log file");
+ }
}
List<Long> logFileIds = getLogFileIds();
for (Long id : logFileIds) {
@@ -367,43 +386,69 @@
}
}
-class LogFlusher extends Thread {
- private final static LogPage POISON_PILL = new LogPage(null, ILogRecord.COMMIT_LOG_SIZE, null);
+class LogFlusher implements Callable<Boolean> {
+ private final static LogPage POISON_PILL = new LogPage(null, ILogRecord.JOB_COMMIT_LOG_SIZE, null);
private final LogManager logMgr;//for debugging
private final LinkedBlockingQueue<LogPage> emptyQ;
private final LinkedBlockingQueue<LogPage> flushQ;
private LogPage flushPage;
+ private final AtomicBoolean isStarted;
+ private final AtomicBoolean terminateFlag;
public LogFlusher(LogManager logMgr, LinkedBlockingQueue<LogPage> emptyQ, LinkedBlockingQueue<LogPage> flushQ) {
this.logMgr = logMgr;
this.emptyQ = emptyQ;
this.flushQ = flushQ;
flushPage = null;
+ isStarted = new AtomicBoolean(false);
+ terminateFlag = new AtomicBoolean(false);
+
}
public void terminate() {
+ //make sure the LogFlusher thread started before terminating it.
+ synchronized (isStarted) {
+ while (!isStarted.get()) {
+ try {
+ isStarted.wait();
+ } catch (InterruptedException e) {
+ //ignore
+ }
+ }
+ }
+
+ terminateFlag.set(true);
if (flushPage != null) {
synchronized (flushPage) {
flushPage.isStop(true);
flushPage.notify();
}
}
+ //[Notice]
+ //The return value doesn't need to be checked
+ //since terminateFlag will trigger termination if the flushQ is full.
flushQ.offer(POISON_PILL);
}
@Override
- public void run() {
+ public Boolean call() {
+ synchronized (isStarted) {
+ isStarted.set(true);
+ isStarted.notify();
+ }
while (true) {
flushPage = null;
try {
flushPage = flushQ.take();
- if (flushPage == POISON_PILL) {
- break;
+ if (flushPage == POISON_PILL || terminateFlag.get()) {
+ return true;
}
- flushPage.flush();
} catch (InterruptedException e) {
- //ignore
+ if (flushPage == null) {
+ continue;
+ }
}
+ flushPage.flush();
emptyQ.offer(flushPage);
}
}
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogPage.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogPage.java
index 45c3e65..edfec69 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogPage.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogPage.java
@@ -58,7 +58,7 @@
appendOffset = 0;
flushOffset = 0;
isLastPage = false;
- syncCommitQ = new LinkedBlockingQueue<ILogRecord>(logPageSize / ILogRecord.COMMIT_LOG_SIZE);
+ syncCommitQ = new LinkedBlockingQueue<ILogRecord>(logPageSize / ILogRecord.JOB_COMMIT_LOG_SIZE);
}
////////////////////////////////////
@@ -144,7 +144,7 @@
}
this.wait();
} catch (InterruptedException e) {
- throw new IllegalStateException(e);
+ continue;
}
}
endOffset = appendOffset;
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogReader.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogReader.java
index 173088c..9dc966c 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogReader.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogReader.java
@@ -121,7 +121,7 @@
readBuffer.limit(logPageSize);
try {
fileChannel.position(readLSN % logFileSize);
- size = fileChannel.read(readBuffer, logPageSize);
+ size = fileChannel.read(readBuffer);
} catch (IOException e) {
throw new ACIDException(e);
}
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogRecord.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogRecord.java
index 380e524..4b0e1f2 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogRecord.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogRecord.java
@@ -30,13 +30,18 @@
/*
* == LogRecordFormat ==
* ---------------------------
- * [Header1] (13 bytes) : for all log types
+ * [Header1] (5 bytes) : for all log types
* LogType(1)
* JobId(4)
+ * ---------------------------
+ * [Header2] (16 bytes + PKValueSize) : for entity_commit and update log types
* DatasetId(4) //stored in dataset_dataset in Metadata Node
* PKHashValue(4)
+ * PKFieldCnt(4)
+ * PKValueSize(4)
+ * PKValue(PKValueSize)
* ---------------------------
- * [Header2] (21 bytes) : only for update log type
+ * [Header3] (21 bytes) : only for update log type
* PrevLSN(8)
* ResourceId(8) //stored in .metadata of the corresponding index in NC node
* ResourceType(1)
@@ -45,18 +50,21 @@
* [Body] (Variable size) : only for update log type
* FieldCnt(4)
* NewOp(1)
- * NewValueLength(4)
- * NewValue(NewValueLength)
+ * NewValueSize(4)
+ * NewValue(NewValueSize)
* OldOp(1)
- * OldValueLength(4)
- * OldValue(OldValueLength)
+ * OldValueSize(4)
+ * OldValue(OldValueSize)
* ---------------------------
* [Tail] (8 bytes) : for all log types
* Checksum(8)
* ---------------------------
* = LogSize =
- * 1) JOB_COMMIT and ENTITY_COMMIT: 21 bytes
- * 2) UPDATE: 56 + old and new value size (13 + 21 + 14 + old and newValueSize + 8)
+ * 1) JOB_COMMIT_LOG_SIZE: 13 bytes (5 + 8)
+ * 2) ENTITY_COMMIT: 29 + PKSize (5 + 16 + PKSize + 8)
+ * --> ENTITY_COMMIT_LOG_BASE_SIZE = 29
+ * 3) UPDATE: 64 + PKSize + New/OldValueSize (5 + 16 + PKSize + 21 + 14 + New/OldValueSize + 8)
+ * --> UPDATE_LOG_BASE_SIZE = 64
*/
public class LogRecord implements ILogRecord {
@@ -65,6 +73,9 @@
private int jobId;
private int datasetId;
private int PKHashValue;
+ private int PKFieldCnt;
+ private int PKValueSize;
+ private ITupleReference PKValue;
private long prevLSN;
private long resourceId;
private byte resourceType;
@@ -84,13 +95,18 @@
private long LSN;
private final AtomicBoolean isFlushed;
private final SimpleTupleWriter tupleWriter;
- private final SimpleTupleReference newTuple;
+ private final SimpleTupleReference readPKValue;
+ private final SimpleTupleReference readNewValue;
+ private final SimpleTupleReference readOldValue;
private final CRC32 checksumGen;
+ private int[] PKFields;
public LogRecord() {
isFlushed = new AtomicBoolean(false);
tupleWriter = new SimpleTupleWriter();
- newTuple = (SimpleTupleReference) tupleWriter.createTupleReference();
+ readPKValue = (SimpleTupleReference) tupleWriter.createTupleReference();
+ readNewValue = (SimpleTupleReference) tupleWriter.createTupleReference();
+ readOldValue = (SimpleTupleReference) tupleWriter.createTupleReference();
checksumGen = new CRC32();
}
@@ -99,8 +115,16 @@
int beginOffset = buffer.position();
buffer.put(logType);
buffer.putInt(jobId);
- buffer.putInt(datasetId);
- buffer.putInt(PKHashValue);
+ if (logType != LogType.JOB_COMMIT) {
+ buffer.putInt(datasetId);
+ buffer.putInt(PKHashValue);
+ buffer.putInt(PKFieldCnt);
+ if (PKValueSize <= 0) {
+ throw new IllegalStateException("Primary Key Size is less than or equal to 0");
+ }
+ buffer.putInt(PKValueSize);
+ writePKValue(buffer);
+ }
if (logType == LogType.UPDATE) {
buffer.putLong(prevLSN);
buffer.putLong(resourceId);
@@ -124,8 +148,16 @@
buffer.putLong(checksum);
}
+ private void writePKValue(ByteBuffer buffer) {
+ int i;
+ for (i = 0; i < PKFieldCnt; i++) {
+ buffer.put(PKValue.getFieldData(0), PKValue.getFieldStart(PKFields[i]), PKValue.getFieldLength(PKFields[i]));
+ }
+ }
+
private void writeTuple(ByteBuffer buffer, ITupleReference tuple, int size) {
tupleWriter.writeTuple(tuple, buffer.array(), buffer.position());
+ //writeTuple() doesn't change the position of the buffer.
buffer.position(buffer.position() + size);
}
@@ -141,8 +173,19 @@
try {
logType = buffer.get();
jobId = buffer.getInt();
- datasetId = buffer.getInt();
- PKHashValue = buffer.getInt();
+ if (logType == LogType.JOB_COMMIT) {
+ datasetId = -1;
+ PKHashValue = -1;
+ } else {
+ datasetId = buffer.getInt();
+ PKHashValue = buffer.getInt();
+ PKFieldCnt = buffer.getInt();
+ PKValueSize = buffer.getInt();
+ if (PKValueSize <= 0) {
+ throw new IllegalStateException("Primary Key Size is less than or equal to 0");
+ }
+ PKValue = readPKValue(buffer);
+ }
if (logType == LogType.UPDATE) {
prevLSN = buffer.getLong();
resourceId = buffer.getLong();
@@ -151,18 +194,18 @@
fieldCnt = buffer.getInt();
newOp = buffer.get();
newValueSize = buffer.getInt();
- newValue = readTuple(buffer, newValueSize);
+ newValue = readTuple(buffer, readNewValue, fieldCnt, newValueSize);
if (resourceType == ResourceType.LSM_BTREE) {
oldOp = buffer.get();
if (oldOp != (byte) (IndexOperation.NOOP.ordinal())) {
oldValueSize = buffer.getInt();
if (oldValueSize > 0) {
- oldValue = readTuple(buffer, oldValueSize);
+ oldValue = readTuple(buffer, readOldValue, fieldCnt, oldValueSize);
}
}
}
} else {
- logSize = COMMIT_LOG_SIZE;
+ computeAndSetLogSize();
}
checksum = buffer.getLong();
if (checksum != generateChecksum(buffer, beginOffset, logSize - CHECKSUM_SIZE)) {
@@ -174,27 +217,54 @@
}
return true;
}
+
+ private ITupleReference readPKValue(ByteBuffer buffer) {
+ return readTuple(buffer, readPKValue, PKFieldCnt, PKValueSize);
+ }
- private ITupleReference readTuple(ByteBuffer buffer, int size) {
- newTuple.setFieldCount(fieldCnt);
- newTuple.resetByTupleOffset(buffer, buffer.position());
- buffer.position(buffer.position() + size);
- return newTuple;
+ private ITupleReference readTuple(ByteBuffer srcBuffer, SimpleTupleReference destTuple, int fieldCnt, int size) {
+ destTuple.setFieldCount(fieldCnt);
+ destTuple.resetByTupleOffset(srcBuffer, srcBuffer.position());
+ srcBuffer.position(srcBuffer.position() + size);
+ return destTuple;
}
@Override
- public void formCommitLogRecord(ITransactionContext txnCtx, byte logType, int jobId, int datasetId, int PKHashValue) {
+ public void formJobCommitLogRecord(ITransactionContext txnCtx) {
this.txnCtx = txnCtx;
- this.logType = logType;
- this.jobId = jobId;
+ this.logType = LogType.JOB_COMMIT;
+ this.jobId = txnCtx.getJobId().getId();
+ this.datasetId = -1;
+ this.PKHashValue = -1;
+ computeAndSetLogSize();
+ }
+
+ @Override
+ public void formEntityCommitLogRecord(ITransactionContext txnCtx, int datasetId, int PKHashValue,
+ ITupleReference PKValue, int[] PKFields) {
+ this.txnCtx = txnCtx;
+ this.logType = LogType.ENTITY_COMMIT;
+ this.jobId = txnCtx.getJobId().getId();
this.datasetId = datasetId;
this.PKHashValue = PKHashValue;
- this.logSize = COMMIT_LOG_SIZE;
+ this.PKFieldCnt = PKFields.length;
+ this.PKValue = PKValue;
+ this.PKFields = PKFields;
+ computeAndSetPKValueSize();
+ computeAndSetLogSize();
}
@Override
- public void setUpdateLogSize() {
- logSize = UPDATE_LOG_BASE_SIZE + newValueSize + oldValueSize;
+ public void computeAndSetPKValueSize() {
+ int i;
+ PKValueSize = 0;
+ for (i = 0; i < PKFieldCnt; i++) {
+ PKValueSize += PKValue.getFieldLength(PKFields[i]);
+ }
+ }
+
+ private void setUpdateLogSize() {
+ logSize = UPDATE_LOG_BASE_SIZE + PKValueSize + newValueSize + oldValueSize;
if (resourceType != ResourceType.LSM_BTREE) {
logSize -= 5; //oldOp(byte: 1) + oldValueLength(int: 4)
} else {
@@ -205,18 +275,39 @@
}
@Override
+ public void computeAndSetLogSize() {
+ switch (logType) {
+ case LogType.UPDATE:
+ setUpdateLogSize();
+ break;
+ case LogType.JOB_COMMIT:
+ logSize = JOB_COMMIT_LOG_SIZE;
+ break;
+ case LogType.ENTITY_COMMIT:
+ logSize = ENTITY_COMMIT_LOG_BASE_SIZE + PKValueSize;
+ break;
+ default:
+ throw new IllegalStateException("Unsupported Log Type");
+ }
+ }
+
+ @Override
public String getLogRecordForDisplay() {
StringBuilder builder = new StringBuilder();
builder.append(" LSN : ").append(LSN);
builder.append(" LogType : ").append(LogType.toString(logType));
+ builder.append(" LogSize : ").append(logSize);
builder.append(" JobId : ").append(jobId);
- builder.append(" DatasetId : ").append(datasetId);
- builder.append(" PKHashValue : ").append(PKHashValue);
+ if (logType != LogType.JOB_COMMIT) {
+ builder.append(" DatasetId : ").append(datasetId);
+ builder.append(" PKHashValue : ").append(PKHashValue);
+ builder.append(" PKFieldCnt : ").append(PKFieldCnt);
+ builder.append(" PKSize: ").append(PKValueSize);
+ }
if (logType == LogType.UPDATE) {
builder.append(" PrevLSN : ").append(prevLSN);
builder.append(" ResourceId : ").append(resourceId);
builder.append(" ResourceType : ").append(resourceType);
- builder.append(" LogSize : ").append(logSize);
}
return builder.toString();
}
@@ -405,4 +496,25 @@
public void setLSN(long LSN) {
this.LSN = LSN;
}
+
+ @Override
+ public int getPKValueSize() {
+ return PKValueSize;
+ }
+
+ @Override
+ public ITupleReference getPKValue() {
+ return PKValue;
+ }
+
+ @Override
+ public void setPKFields(int[] primaryKeyFields) {
+ PKFields = primaryKeyFields;
+ PKFieldCnt = PKFields.length;
+ }
+
+ @Override
+ public void setPKValue(ITupleReference PKValue) {
+ this.PKValue = PKValue;
+ }
}
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/recovery/RecoveryManager.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/recovery/RecoveryManager.java
index bac60ec..0b5a243d 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/recovery/RecoveryManager.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/recovery/RecoveryManager.java
@@ -27,6 +27,7 @@
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
@@ -52,6 +53,7 @@
import edu.uci.ics.asterix.transaction.management.service.transaction.TransactionSubsystem;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.api.lifecycle.ILifeCycleComponent;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
import edu.uci.ics.hyracks.storage.am.common.api.IIndex;
import edu.uci.ics.hyracks.storage.am.common.api.IIndexLifecycleManager;
import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallback;
@@ -133,8 +135,10 @@
public void startRecovery(boolean synchronous) throws IOException, ACIDException {
int updateLogCount = 0;
- int commitLogCount = 0;
+ int entityCommitLogCount = 0;
+ int jobCommitLogCount = 0;
int redoCount = 0;
+ int jobId = -1;
state = SystemState.RECOVERING;
@@ -142,9 +146,12 @@
LOGGER.info("[RecoveryMgr] starting recovery ...");
}
- //winnerTxnTable is used to add pairs, <committed TxnId, the most recent commit Lsn of the TxnId>
- Map<TxnId, Long> winnerTxnTable = new HashMap<TxnId, Long>();
- TxnId tempKeyTxnId = new TxnId(-1, -1, -1);
+ Set<Integer> winnerJobSet = new HashSet<Integer>();
+ Map<Integer, Set<TxnId>> jobId2WinnerEntitiesMap = new HashMap<Integer, Set<TxnId>>();
+ //winnerEntity is used to add pairs, <committed TxnId, the most recent commit Lsn of the TxnId>
+ Set<TxnId> winnerEntitySet = null;
+ TxnId tempKeyTxnId = new TxnId(-1, -1, -1, null, -1, false);
+ TxnId winnerEntity = null;
//#. read checkpoint file and set lowWaterMark where anaylsis and redo start
CheckpointObject checkpointObject = readCheckpoint();
@@ -157,8 +164,6 @@
//-------------------------------------------------------------------------
// [ analysis phase ]
// - collect all committed Lsn
- // - if there are duplicate commits for the same TxnId,
- // keep only the mostRecentCommitLsn among the duplicates.
//-------------------------------------------------------------------------
if (LOGGER.isLoggable(Level.INFO)) {
LOGGER.info("[RecoveryMgr] in analysis phase");
@@ -176,23 +181,34 @@
if (logRecord.getJobId() > maxJobId) {
maxJobId = logRecord.getJobId();
}
- TxnId commitTxnId = null;
switch (logRecord.getLogType()) {
case LogType.UPDATE:
if (IS_DEBUG_MODE) {
updateLogCount++;
}
break;
-
case LogType.JOB_COMMIT:
- case LogType.ENTITY_COMMIT:
- commitTxnId = new TxnId(logRecord.getJobId(), logRecord.getDatasetId(), logRecord.getPKHashValue());
- winnerTxnTable.put(commitTxnId, logRecord.getLSN());
+ winnerJobSet.add(Integer.valueOf(logRecord.getJobId()));
+ jobId2WinnerEntitiesMap.remove(Integer.valueOf(logRecord.getJobId()));
if (IS_DEBUG_MODE) {
- commitLogCount++;
+ jobCommitLogCount++;
}
break;
-
+ case LogType.ENTITY_COMMIT:
+ jobId = logRecord.getJobId();
+ winnerEntity = new TxnId(jobId, logRecord.getDatasetId(), logRecord.getPKHashValue(),
+ logRecord.getPKValue(), logRecord.getPKValueSize(), true);
+ if (!jobId2WinnerEntitiesMap.containsKey(Integer.valueOf(jobId))) {
+ winnerEntitySet = new HashSet<TxnId>();
+ jobId2WinnerEntitiesMap.put(Integer.valueOf(jobId), winnerEntitySet);
+ } else {
+ winnerEntitySet = jobId2WinnerEntitiesMap.get(Integer.valueOf(jobId));
+ }
+ winnerEntitySet.add(winnerEntity);
+ if (IS_DEBUG_MODE) {
+ entityCommitLogCount++;
+ }
+ break;
default:
throw new ACIDException("Unsupported LogType: " + logRecord.getLogType());
}
@@ -203,55 +219,48 @@
// [ redo phase ]
// - redo if
// 1) The TxnId is committed && --> guarantee durability
- // 2) lsn < commitLog's Lsn && --> deal with a case of pkHashValue collision
- // 3) lsn > maxDiskLastLsn of the index --> guarantee idempotance
+ // 2) lsn > maxDiskLastLsn of the index --> guarantee idempotence
//-------------------------------------------------------------------------
if (LOGGER.isLoggable(Level.INFO)) {
LOGGER.info("[RecoveryMgr] in redo phase");
}
- //#. set log reader to the lowWaterMarkLsn again.
- logReader.initializeScan(lowWaterMarkLsn);
-
long resourceId;
long maxDiskLastLsn;
- long lsn = -1;
- long commitLsn = -1;
+ long LSN = -1;
ILSMIndex index = null;
LocalResource localResource = null;
ILocalResourceMetadata localResourceMetadata = null;
- Map<Long, Long> resourceId2MaxLsnMap = new HashMap<Long, Long>();
- TxnId jobLevelTxnId = new TxnId(-1, -1, -1);
- boolean foundWinnerTxn = false;
+ Map<Long, Long> resourceId2MaxLSNMap = new HashMap<Long, Long>();
+ boolean foundWinner = false;
//#. get indexLifeCycleManager
IAsterixAppRuntimeContextProvider appRuntimeContext = txnSubsystem.getAsterixAppRuntimeContextProvider();
IIndexLifecycleManager indexLifecycleManager = appRuntimeContext.getIndexLifecycleManager();
ILocalResourceRepository localResourceRepository = appRuntimeContext.getLocalResourceRepository();
+ //#. set log reader to the lowWaterMarkLsn again.
+ logReader.initializeScan(lowWaterMarkLsn);
logRecord = logReader.next();
while (logRecord != null) {
- lsn = logRecord.getLSN();
- foundWinnerTxn = false;
if (LogManager.IS_DEBUG_MODE) {
System.out.println(logRecord.getLogRecordForDisplay());
}
+ LSN = logRecord.getLSN();
+ jobId = logRecord.getJobId();
+ foundWinner = false;
switch (logRecord.getLogType()) {
case LogType.UPDATE:
- tempKeyTxnId.setTxnId(logRecord.getJobId(), logRecord.getDatasetId(), logRecord.getPKHashValue());
- jobLevelTxnId.setTxnId(logRecord.getJobId(), -1, -1);
- if (winnerTxnTable.containsKey(tempKeyTxnId)) {
- commitLsn = winnerTxnTable.get(tempKeyTxnId);
- if (lsn < commitLsn) {
- foundWinnerTxn = true;
- }
- } else if (winnerTxnTable.containsKey(jobLevelTxnId)) {
- commitLsn = winnerTxnTable.get(jobLevelTxnId);
- if (lsn < commitLsn) {
- foundWinnerTxn = true;
+ if (winnerJobSet.contains(Integer.valueOf(jobId))) {
+ foundWinner = true;
+ } else if (jobId2WinnerEntitiesMap.containsKey(Integer.valueOf(jobId))) {
+ winnerEntitySet = jobId2WinnerEntitiesMap.get(Integer.valueOf(jobId));
+ tempKeyTxnId.setTxnId(jobId, logRecord.getDatasetId(), logRecord.getPKHashValue(),
+ logRecord.getPKValue(), logRecord.getPKValueSize());
+ if (winnerEntitySet.contains(tempKeyTxnId)) {
+ foundWinner = true;
}
}
-
- if (foundWinnerTxn) {
+ if (foundWinner) {
resourceId = logRecord.getResourceId();
localResource = localResourceRepository.getResourceById(resourceId);
@@ -294,12 +303,12 @@
maxDiskLastLsn = abstractLSMIOCallback.getComponentLSN(index.getImmutableComponents());
//#. set resourceId and maxDiskLastLSN to the map
- resourceId2MaxLsnMap.put(resourceId, maxDiskLastLsn);
+ resourceId2MaxLSNMap.put(Long.valueOf(resourceId), Long.valueOf(maxDiskLastLsn));
} else {
- maxDiskLastLsn = resourceId2MaxLsnMap.get(resourceId);
+ maxDiskLastLsn = resourceId2MaxLSNMap.get(Long.valueOf(resourceId));
}
- if (lsn > maxDiskLastLsn) {
+ if (LSN > maxDiskLastLsn) {
redo(logRecord);
if (IS_DEBUG_MODE) {
redoCount++;
@@ -316,12 +325,11 @@
default:
throw new ACIDException("Unsupported LogType: " + logRecord.getLogType());
}
-
logRecord = logReader.next();
}
//close all indexes
- Set<Long> resourceIdList = resourceId2MaxLsnMap.keySet();
+ Set<Long> resourceIdList = resourceId2MaxLSNMap.keySet();
for (long r : resourceIdList) {
indexLifecycleManager.close(r);
}
@@ -332,8 +340,8 @@
LOGGER.info("[RecoveryMgr] recovery is completed.");
}
if (IS_DEBUG_MODE) {
- System.out.println("[RecoveryMgr] Count: Update/Commit/Redo = " + updateLogCount + "/" + commitLogCount
- + "/" + redoCount);
+ System.out.println("[RecoveryMgr] Count: Update/EntityCommit/JobCommit/Redo = " + updateLogCount + "/"
+ + entityCommitLogCount + "/" + jobCommitLogCount + "/" + redoCount);
}
}
@@ -537,15 +545,18 @@
@Override
public void rollbackTransaction(ITransactionContext txnContext) throws ACIDException {
Map<TxnId, List<Long>> loserTxnTable = new HashMap<TxnId, List<Long>>();
- TxnId tempKeyTxnId = new TxnId(-1, -1, -1);
+ TxnId tempKeyTxnId = new TxnId(-1, -1, -1, null, -1, false);
int updateLogCount = 0;
- int commitLogCount = 0;
+ int entityCommitLogCount = 0;
+ int jobId = -1;
+ int abortedJobId = txnContext.getJobId().getId();
+ long currentLSN = -1;
+ TxnId loserEntity = null;
- // Obtain the first log record written by the Job
+ // Obtain the first/last log record LSNs written by the Job
long firstLSN = txnContext.getFirstLSN();
long lastLSN = txnContext.getLastLSN();
- //TODO: make sure that the lastLsn is not updated anymore by another thread belonging to the same job.
if (LOGGER.isLoggable(Level.INFO)) {
LOGGER.info(" rollbacking transaction log records from " + firstLSN + " to " + lastLSN);
}
@@ -563,62 +574,62 @@
if (LOGGER.isLoggable(Level.INFO)) {
LOGGER.info(" collecting loser transaction's LSNs from " + firstLSN + " to " + lastLSN);
}
- boolean reachedLastLog = false;
List<Long> undoLSNSet = null;
ILogReader logReader = logMgr.getLogReader(false);
logReader.initializeScan(firstLSN);
- ILogRecord logRecord = logReader.next();
- while (logRecord != null) {
- if (IS_DEBUG_MODE) {
- System.out.println(logRecord.getLogRecordForDisplay());
+ ILogRecord logRecord = null;
+ while (currentLSN < lastLSN) {
+ logRecord = logReader.next();
+ if (logRecord == null) {
+ break;
+ } else {
+ if (IS_DEBUG_MODE) {
+ System.out.println(logRecord.getLogRecordForDisplay());
+ }
+ currentLSN = logRecord.getLSN();
}
-
- tempKeyTxnId.setTxnId(logRecord.getJobId(), logRecord.getDatasetId(), logRecord.getPKHashValue());
+ jobId = logRecord.getJobId();
+ if (jobId != abortedJobId) {
+ continue;
+ }
+ tempKeyTxnId.setTxnId(jobId, logRecord.getDatasetId(), logRecord.getPKHashValue(), logRecord.getPKValue(),
+ logRecord.getPKValueSize());
switch (logRecord.getLogType()) {
case LogType.UPDATE:
undoLSNSet = loserTxnTable.get(tempKeyTxnId);
if (undoLSNSet == null) {
- TxnId txnId = new TxnId(logRecord.getJobId(), logRecord.getDatasetId(),
- logRecord.getPKHashValue());
+ loserEntity = new TxnId(jobId, logRecord.getDatasetId(), logRecord.getPKHashValue(),
+ logRecord.getPKValue(), logRecord.getPKValueSize(), true);
undoLSNSet = new LinkedList<Long>();
- loserTxnTable.put(txnId, undoLSNSet);
+ loserTxnTable.put(loserEntity, undoLSNSet);
}
- undoLSNSet.add(logRecord.getLSN());
+ undoLSNSet.add(Long.valueOf(currentLSN));
if (IS_DEBUG_MODE) {
updateLogCount++;
- System.out.println("" + Thread.currentThread().getId() + "======> update[" + logRecord.getLSN()
- + "]:" + tempKeyTxnId);
+ System.out.println("" + Thread.currentThread().getId() + "======> update[" + currentLSN + "]:"
+ + tempKeyTxnId);
}
break;
case LogType.JOB_COMMIT:
+ throw new ACIDException("Unexpected LogType(" + logRecord.getLogType() + ") during abort.");
+
case LogType.ENTITY_COMMIT:
- undoLSNSet = loserTxnTable.get(tempKeyTxnId);
- if (undoLSNSet != null) {
- loserTxnTable.remove(tempKeyTxnId);
- }
+ loserTxnTable.remove(tempKeyTxnId);
if (IS_DEBUG_MODE) {
- commitLogCount++;
- System.out.println("" + Thread.currentThread().getId() + "======> commit[" + logRecord.getLSN()
- + "]" + tempKeyTxnId);
+ entityCommitLogCount++;
+ System.out.println("" + Thread.currentThread().getId() + "======> entity_commit[" + currentLSN + "]"
+ + tempKeyTxnId);
}
break;
default:
throw new ACIDException("Unsupported LogType: " + logRecord.getLogType());
}
- if (logRecord.getLSN() == lastLSN) {
- reachedLastLog = true;
- break;
- } else if (logRecord.getLSN() > lastLSN) {
- throw new IllegalStateException("LastLSN mismatch");
- }
- logRecord = logReader.next();
}
-
- if (!reachedLastLog) {
- throw new ACIDException("LastLSN mismatch: " + lastLSN + " vs " + logRecord.getLSN()
- + " during Rollback a transaction( " + txnContext.getJobId() + ")");
+ if (currentLSN != lastLSN) {
+ throw new ACIDException("LastLSN mismatch: lastLSN(" + lastLSN + ") vs currentLSN(" + currentLSN
+ + ") during abort( " + txnContext.getJobId() + ")");
}
//undo loserTxn's effect
@@ -626,7 +637,6 @@
LOGGER.info(" undoing loser transaction's effect");
}
- TxnId txnId = null;
Iterator<Entry<TxnId, List<Long>>> iter = loserTxnTable.entrySet().iterator();
int undoCount = 0;
while (iter.hasNext()) {
@@ -634,16 +644,15 @@
//Sort the lsns in order to undo in one pass.
Map.Entry<TxnId, List<Long>> loserTxn = (Map.Entry<TxnId, List<Long>>) iter.next();
- txnId = loserTxn.getKey();
-
undoLSNSet = loserTxn.getValue();
for (long undoLSN : undoLSNSet) {
- // here, all the log records are UPDATE type. So, we don't need to check the type again.
-
+ //here, all the log records are UPDATE type. So, we don't need to check the type again.
//read the corresponding log record to be undone.
logRecord = logReader.read(undoLSN);
- assert logRecord != null;
+ if (logRecord == null) {
+ throw new ACIDException("IllegalState exception during abort( " + txnContext.getJobId() + ")");
+ }
if (IS_DEBUG_MODE) {
System.out.println(logRecord.getLogRecordForDisplay());
}
@@ -660,8 +669,8 @@
LOGGER.info(" undone loser transaction's effect");
}
if (IS_DEBUG_MODE) {
- System.out.println("UpdateLogCount/CommitLogCount/UndoCount:" + updateLogCount + "/" + commitLogCount + "/"
- + undoCount);
+ System.out.println("UpdateLogCount/CommitLogCount/UndoCount:" + updateLogCount + "/" + entityCommitLogCount
+ + "/" + undoCount);
}
}
@@ -724,36 +733,53 @@
}
class TxnId {
+ public boolean isByteArrayPKValue;
public int jobId;
public int datasetId;
- public int pkHashVal;
+ public int pkHashValue;
+ public int pkSize;
+ public byte[] byteArrayPKValue;
+ public ITupleReference tupleReferencePKValue;
- public TxnId(int jobId, int datasetId, int pkHashVal) {
+ public TxnId(int jobId, int datasetId, int pkHashValue, ITupleReference pkValue, int pkSize,
+ boolean isByteArrayPKValue) {
this.jobId = jobId;
this.datasetId = datasetId;
- this.pkHashVal = pkHashVal;
+ this.pkHashValue = pkHashValue;
+ this.pkSize = pkSize;
+ this.isByteArrayPKValue = isByteArrayPKValue;
+ if (isByteArrayPKValue) {
+ this.byteArrayPKValue = new byte[pkSize];
+ readPKValueIntoByteArray(pkValue, pkSize, byteArrayPKValue);
+ } else {
+ this.tupleReferencePKValue = pkValue;
+ }
}
- public void setTxnId(int jobId, int datasetId, int pkHashVal) {
+ private void readPKValueIntoByteArray(ITupleReference pkValue, int pkSize, byte[] byteArrayPKValue) {
+ int readOffset = pkValue.getFieldStart(0);
+ byte[] readBuffer = pkValue.getFieldData(0);
+ for (int i = 0; i < pkSize; i++) {
+ byteArrayPKValue[i] = readBuffer[readOffset + i];
+ }
+ }
+
+ public void setTxnId(int jobId, int datasetId, int pkHashValue, ITupleReference pkValue, int pkSize) {
this.jobId = jobId;
this.datasetId = datasetId;
- this.pkHashVal = pkHashVal;
- }
-
- public void setTxnId(TxnId txnId) {
- this.jobId = txnId.jobId;
- this.datasetId = txnId.datasetId;
- this.pkHashVal = txnId.pkHashVal;
+ this.pkHashValue = pkHashValue;
+ this.tupleReferencePKValue = pkValue;
+ isByteArrayPKValue = false;
}
@Override
public String toString() {
- return "[" + jobId + "," + datasetId + "," + pkHashVal + "]";
+ return "[" + jobId + "," + datasetId + "," + pkHashValue + "," + pkSize + "]";
}
@Override
public int hashCode() {
- return pkHashVal;
+ return pkHashValue;
}
@Override
@@ -765,7 +791,52 @@
return false;
}
TxnId txnId = (TxnId) o;
+ return (txnId.pkHashValue == pkHashValue && txnId.datasetId == datasetId && txnId.jobId == jobId
+ && pkSize == txnId.pkSize && isEqualTo(txnId));
+ }
- return (txnId.pkHashVal == pkHashVal && txnId.datasetId == datasetId && txnId.jobId == jobId);
+ private boolean isEqualTo(TxnId txnId) {
+ if (isByteArrayPKValue && txnId.isByteArrayPKValue) {
+ return isEqual(byteArrayPKValue, txnId.byteArrayPKValue, pkSize);
+ } else if (isByteArrayPKValue && (!txnId.isByteArrayPKValue)) {
+ return isEqual(byteArrayPKValue, txnId.tupleReferencePKValue, pkSize);
+ } else if ((!isByteArrayPKValue) && txnId.isByteArrayPKValue) {
+ return isEqual(txnId.byteArrayPKValue, tupleReferencePKValue, pkSize);
+ } else {
+ return isEqual(tupleReferencePKValue, txnId.tupleReferencePKValue, pkSize);
+ }
+ }
+
+ private boolean isEqual(byte[] a, byte[] b, int size) {
+ for (int i = 0; i < size; i++) {
+ if (a[i] != b[i]) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ private boolean isEqual(byte[] a, ITupleReference b, int size) {
+ int readOffset = b.getFieldStart(0);
+ byte[] readBuffer = b.getFieldData(0);
+ for (int i = 0; i < size; i++) {
+ if (a[i] != readBuffer[readOffset + i]) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ private boolean isEqual(ITupleReference a, ITupleReference b, int size) {
+ int aOffset = a.getFieldStart(0);
+ byte[] aBuffer = a.getFieldData(0);
+ int bOffset = b.getFieldStart(0);
+ byte[] bBuffer = b.getFieldData(0);
+ for (int i = 0; i < size; i++) {
+ if (aBuffer[aOffset + i] != bBuffer[bOffset + i]) {
+ return false;
+ }
+ }
+ return true;
}
}
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionManager.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionManager.java
index 01bce83..01b38c2 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionManager.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionManager.java
@@ -29,7 +29,6 @@
import edu.uci.ics.asterix.common.transactions.ITransactionManager;
import edu.uci.ics.asterix.common.transactions.JobId;
import edu.uci.ics.asterix.transaction.management.service.logging.LogRecord;
-import edu.uci.ics.asterix.transaction.management.service.logging.LogType;
import edu.uci.ics.hyracks.api.lifecycle.ILifeCycleComponent;
/**
@@ -60,7 +59,8 @@
if (LOGGER.isLoggable(Level.SEVERE)) {
LOGGER.severe(msg);
}
- throw new Error(msg, ae);
+ ae.printStackTrace();
+ throw new ACIDException(msg, ae);
} finally {
txnSubsystem.getLockManager().releaseLocks(txnCtx);
transactionContextRepository.remove(txnCtx.getJobId());
@@ -90,20 +90,11 @@
@Override
public void commitTransaction(ITransactionContext txnCtx, DatasetId datasetId, int PKHashVal) throws ACIDException {
- //There is either job-level commit or entity-level commit.
- //The job-level commit will have -1 value both for datasetId and PKHashVal.
-
- //for entity-level commit
- if (PKHashVal != -1) {
- txnSubsystem.getLockManager().unlock(datasetId, PKHashVal, txnCtx, true);
- return;
- }
-
- //for job-level commit
+ //Only job-level commits call this method.
try {
if (txnCtx.isWriteTxn()) {
LogRecord logRecord = ((TransactionContext) txnCtx).getLogRecord();
- logRecord.formCommitLogRecord(txnCtx, LogType.JOB_COMMIT, txnCtx.getJobId().getId(), -1, -1);
+ logRecord.formJobCommitLogRecord(txnCtx);
txnSubsystem.getLogManager().log(logRecord);
}
} catch (Exception ae) {