fixed bugs related to 1)job-level-commit(which is used for DDL) log and 2) sharp checkpoint
git-svn-id: https://asterixdb.googlecode.com/svn/branches/asterix_lsm_stabilization@1416 eaa15691-b419-025a-1212-ee371bd00084
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataNode.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataNode.java
index 1d04cd7..d9c6ed4 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataNode.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataNode.java
@@ -63,6 +63,7 @@
import edu.uci.ics.asterix.transaction.management.service.transaction.IResourceManager.ResourceType;
import edu.uci.ics.asterix.transaction.management.service.transaction.JobId;
import edu.uci.ics.asterix.transaction.management.service.transaction.TransactionContext;
+import edu.uci.ics.asterix.transaction.management.service.transaction.TransactionContext.TransactionType;
import edu.uci.ics.asterix.transaction.management.service.transaction.TransactionSubsystem;
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
@@ -71,6 +72,7 @@
import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleReference;
import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
+import edu.uci.ics.hyracks.dataflow.common.util.TupleUtils;
import edu.uci.ics.hyracks.storage.am.btree.exceptions.BTreeDuplicateKeyException;
import edu.uci.ics.hyracks.storage.am.btree.impls.RangePredicate;
import edu.uci.ics.hyracks.storage.am.common.api.IIndex;
@@ -269,6 +271,9 @@
IIndexAccessor indexAccessor = lsmIndex.createAccessor(modCallback, NoOpOperationCallback.INSTANCE);
+ TransactionContext txnCtx = transactionSubsystem.getTransactionManager().getTransactionContext(jobId);
+ txnCtx.setTransactionType(TransactionType.READ_WRITE);
+
// TODO: fix exceptions once new BTree exception model is in hyracks.
indexAccessor.insert(tuple);
@@ -567,6 +572,10 @@
IModificationOperationCallback modCallback = createIndexModificationCallback(jobId, resourceID, metadataIndex,
lsmIndex, IndexOperation.DELETE);
IIndexAccessor indexAccessor = lsmIndex.createAccessor(modCallback, NoOpOperationCallback.INSTANCE);
+
+ TransactionContext txnCtx = transactionSubsystem.getTransactionManager().getTransactionContext(jobId);
+ txnCtx.setTransactionType(TransactionType.READ_WRITE);
+
indexAccessor.delete(tuple);
indexLifecycleManager.close(resourceID);
}
@@ -590,6 +599,7 @@
@Override
public Dataverse getDataverse(JobId jobId, String dataverseName) throws MetadataException, RemoteException {
+
try {
ITupleReference searchKey = createTuple(dataverseName);
DataverseTupleTranslator tupleReaderWriter = new DataverseTupleTranslator(false);
@@ -843,6 +853,86 @@
return results.get(0);
}
+ //Debugging Method
+ public String printMetadata() {
+
+ StringBuilder sb = new StringBuilder();
+ try {
+ IMetadataIndex index = MetadataPrimaryIndexes.DATAVERSE_DATASET;
+ long resourceID = index.getResourceID();
+ IIndex indexInstance = indexLifecycleManager.getIndex(resourceID);
+ indexLifecycleManager.open(resourceID);
+ IIndexAccessor indexAccessor = indexInstance.createAccessor(NoOpOperationCallback.INSTANCE,
+ NoOpOperationCallback.INSTANCE);
+ ITreeIndexCursor rangeCursor = (ITreeIndexCursor) indexAccessor.createSearchCursor();
+
+ RangePredicate rangePred = null;
+ rangePred = new RangePredicate(null, null, true, true, null, null);
+ indexAccessor.search(rangeCursor, rangePred);
+ try {
+ while (rangeCursor.hasNext()) {
+ rangeCursor.next();
+ sb.append(TupleUtils.printTuple(rangeCursor.getTuple(),
+ new ISerializerDeserializer[] { AqlSerializerDeserializerProvider.INSTANCE
+ .getSerializerDeserializer(BuiltinType.ASTRING) }));
+ }
+ } finally {
+ rangeCursor.close();
+ }
+ indexLifecycleManager.close(resourceID);
+
+ index = MetadataPrimaryIndexes.DATASET_DATASET;
+ resourceID = index.getResourceID();
+ indexInstance = indexLifecycleManager.getIndex(resourceID);
+ indexLifecycleManager.open(resourceID);
+ indexAccessor = indexInstance
+ .createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE);
+ rangeCursor = (ITreeIndexCursor) indexAccessor.createSearchCursor();
+
+ rangePred = null;
+ rangePred = new RangePredicate(null, null, true, true, null, null);
+ indexAccessor.search(rangeCursor, rangePred);
+ try {
+ while (rangeCursor.hasNext()) {
+ rangeCursor.next();
+ sb.append(TupleUtils.printTuple(rangeCursor.getTuple(), new ISerializerDeserializer[] {
+ AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.ASTRING),
+ AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.ASTRING) }));
+ }
+ } finally {
+ rangeCursor.close();
+ }
+ indexLifecycleManager.close(resourceID);
+
+ index = MetadataPrimaryIndexes.INDEX_DATASET;
+ resourceID = index.getResourceID();
+ indexInstance = indexLifecycleManager.getIndex(resourceID);
+ indexLifecycleManager.open(resourceID);
+ indexAccessor = indexInstance
+ .createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE);
+ rangeCursor = (ITreeIndexCursor) indexAccessor.createSearchCursor();
+
+ rangePred = null;
+ rangePred = new RangePredicate(null, null, true, true, null, null);
+ indexAccessor.search(rangeCursor, rangePred);
+ try {
+ while (rangeCursor.hasNext()) {
+ rangeCursor.next();
+ sb.append(TupleUtils.printTuple(rangeCursor.getTuple(), new ISerializerDeserializer[] {
+ AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.ASTRING),
+ AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.ASTRING),
+ AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.ASTRING) }));
+ }
+ } finally {
+ rangeCursor.close();
+ }
+ indexLifecycleManager.close(resourceID);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ return sb.toString();
+ }
+
private <ResultType> void searchIndex(JobId jobId, IMetadataIndex index, ITupleReference searchKey,
IValueExtractor<ResultType> valueExtractor, List<ResultType> results) throws Exception {
IBinaryComparatorFactory[] comparatorFactories = index.getKeyBinaryComparatorFactory();
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogManager.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogManager.java
index f3c33f8..242f228 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogManager.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogManager.java
@@ -697,6 +697,7 @@
closeLogPages();
initLSN();
openLogPages();
+ logPageFlusher.renew();
}
private PhysicalLogLocator initLSN() throws ACIDException {
@@ -858,8 +859,9 @@
*/
private final LinkedBlockingQueue<Object>[] flushRequestQueue;
private final Object[] flushRequests;
- private int lastFlushedPageIndex;
+ private int pageToFlush;
private final long groupCommitWaitPeriod;
+ private boolean isRenewRequest;
public LogPageFlushThread(LogManager logManager) {
this.logManager = logManager;
@@ -871,8 +873,16 @@
flushRequestQueue[i] = new LinkedBlockingQueue<Object>(1);
flushRequests[i] = new Object();
}
- this.lastFlushedPageIndex = -1;
+ this.pageToFlush = -1;
groupCommitWaitPeriod = logManager.getLogManagerProperties().getGroupCommitWaitPeriod();
+ isRenewRequest = false;
+ }
+
+ public void renew() {
+ isRenewRequest = true;
+ pageToFlush = -1;
+ this.interrupt();
+ isRenewRequest = false;
}
public void requestFlush(int pageIndex, long lsn, boolean isSynchronous) {
@@ -908,12 +918,19 @@
public void run() {
while (true) {
try {
- int pageToFlush = logManager.getNextPageInSequence(lastFlushedPageIndex);
+ pageToFlush = logManager.getNextPageInSequence(pageToFlush);
// A wait call on the linkedBLockingQueue. The flusher thread is
// notified when an object is added to the queue. Please note
// that each page has an associated blocking queue.
- flushRequestQueue[pageToFlush].take();
+ try {
+ flushRequestQueue[pageToFlush].take();
+ } catch (InterruptedException ie) {
+ while (isRenewRequest) {
+ sleep(1);
+ }
+ continue;
+ }
synchronized (logManager.getLogPage(pageToFlush)) {
@@ -954,7 +971,6 @@
// increment the last flushed lsn and lastFlushedPage
logManager.incrementLastFlushedLsn(logManager.getLogManagerProperties().getLogPageSize());
- lastFlushedPageIndex = pageToFlush;
// decrement activeTxnCountOnIndexes
logManager.decrementActiveTxnCountOnIndexes(pageToFlush);
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/recovery/RecoveryManager.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/recovery/RecoveryManager.java
index ed88112..c73f693 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/recovery/RecoveryManager.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/recovery/RecoveryManager.java
@@ -241,13 +241,15 @@
long resourceId;
byte resourceMgrId;
long maxDiskLastLSN;
- long currentLSN;
+ long currentLSN = -1;
int resourceType;
ILSMIndex index = null;
LocalResource localResource = null;
ILocalResourceMetadata localResourceMetadata = null;
Map<Long, Long> resourceId2MaxLSNMap = new HashMap<Long, Long>();
List<ILSMComponent> immutableDiskIndexList = null;
+ TxnId jobLevelTxnId = new TxnId(-1, -1, -1);
+ boolean foundWinnerTxn;
//#. get indexLifeCycleManager
IAsterixAppRuntimeContextProvider appRuntimeContext = txnSubsystem.getAsterixAppRuntimeContextProvider();
@@ -256,7 +258,7 @@
//#. redo
while (logCursor.next(currentLogLocator)) {
-
+ foundWinnerTxn = false;
if (LogManager.IS_DEBUG_MODE) {
System.out.println(logManager.getLogRecordHelper().getLogRecordForDisplay(currentLogLocator));
}
@@ -268,9 +270,16 @@
tempKeyTxnId.setTxnId(logRecordHelper.getJobId(currentLogLocator),
logRecordHelper.getDatasetId(currentLogLocator),
logRecordHelper.getPKHashValue(currentLogLocator));
-
+ jobLevelTxnId.setTxnId(logRecordHelper.getJobId(currentLogLocator), -1, -1);
if (winnerTxnTable.containsKey(tempKeyTxnId)) {
currentLSN = winnerTxnTable.get(tempKeyTxnId);
+ foundWinnerTxn = true;
+ } else if (winnerTxnTable.containsKey(jobLevelTxnId)) {
+ currentLSN = winnerTxnTable.get(jobLevelTxnId);
+ foundWinnerTxn = true;
+ }
+
+ if (foundWinnerTxn) {
resourceId = logRecordHelper.getResourceId(currentLogLocator);
localResource = localResourceRepository.getResourceById(resourceId);
diff --git a/asterix-transactions/src/test/java/edu/uci/ics/asterix/transaction/management/test/LogRecordReader.java b/asterix-transactions/src/test/java/edu/uci/ics/asterix/transaction/management/test/LogRecordReader.java
index 260e5ff..162660f 100644
--- a/asterix-transactions/src/test/java/edu/uci/ics/asterix/transaction/management/test/LogRecordReader.java
+++ b/asterix-transactions/src/test/java/edu/uci/ics/asterix/transaction/management/test/LogRecordReader.java
@@ -75,7 +75,7 @@
public static void main(String[] args) throws ACIDException, Exception {
Properties props = new Properties();
props.setProperty(LogManagerProperties.LOG_DIR_KEY,
- "/home/kisskys/workspace/lsm_merge/asterix_lsm_stabilization/debug/asterix_logs/nc1");
+ "/Users/kisskys/workspace/sanitycheck_lsm_merge/asterix_lsm_stabilization/asterix-app/asterix_logs/nc1");
LogManagerProperties logProps = new LogManagerProperties(props);
LogManager logManager = new LogManager(null, logProps);
LogRecordReader logReader = new LogRecordReader(logManager);