| Index: asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionManager.java |
| =================================================================== |
| --- asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionManager.java (revision 1194) |
| +++ asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionManager.java (working copy) |
| @@ -103,12 +103,14 @@ |
| //for entity-level commit |
| if (PKHashVal != -1) { |
| transactionProvider.getLockManager().unlock(datasetId, PKHashVal, txnContext, true); |
| + /***************************** |
| try { |
| //decrease the transaction reference count on index |
| txnContext.decreaseActiveTransactionCountOnIndexes(); |
| } catch (HyracksDataException e) { |
| throw new ACIDException("failed to complete index operation", e); |
| } |
| + *****************************/ |
| return; |
| } |
| |
| Index: asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionContext.java |
| =================================================================== |
| --- asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionContext.java (revision 1194) |
| +++ asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionContext.java (working copy) |
| @@ -19,6 +19,7 @@ |
| import java.util.HashSet; |
| import java.util.List; |
| import java.util.Set; |
| +import java.util.concurrent.atomic.AtomicInteger; |
| |
| import edu.uci.ics.asterix.transaction.management.exception.ACIDException; |
| import edu.uci.ics.asterix.transaction.management.opcallbacks.AbstractOperationCallback; |
| @@ -169,5 +170,14 @@ |
| closeable.close(this); |
| } |
| } |
| + |
| + @Override |
| + public int hashCode() { |
| + return jobId.getId(); |
| + } |
| |
| + @Override |
| + public boolean equals(Object o) { |
| + return (o == this); |
| + } |
| } |
| Index: asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/LockManager.java |
| =================================================================== |
| --- asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/LockManager.java (revision 1194) |
| +++ asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/LockManager.java (working copy) |
| @@ -567,7 +567,7 @@ |
| if (commitFlag) { |
| if (txnContext.getTransactionType().equals(TransactionContext.TransactionType.READ_WRITE)) { |
| try { |
| - txnSubsystem.getLogManager().log(LogType.COMMIT, txnContext, datasetId.getId(), |
| + txnSubsystem.getLogManager().log(LogType.ENTITY_COMMIT, txnContext, datasetId.getId(), |
| entityHashValue, -1, (byte) 0, 0, null, null, logicalLogLocator); |
| } catch (ACIDException e) { |
| try { |
| Index: asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/FileBasedBuffer.java |
| =================================================================== |
| --- asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/FileBasedBuffer.java (revision 1194) |
| +++ asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/FileBasedBuffer.java (working copy) |
| @@ -75,6 +75,7 @@ |
| buffer.position(0); |
| buffer.limit(size); |
| fileChannel.write(buffer); |
| + fileChannel.force(false); |
| erase(); |
| } |
| |
| Index: asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogManager.java |
| =================================================================== |
| --- asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogManager.java (revision 1194) |
| +++ asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogManager.java (working copy) |
| @@ -1,5 +1,5 @@ |
| /* |
| - * Copyright 2009-2010 by The Regents of the University of California |
| + * Copyright 2009-2012 by The Regents of the University of California |
| * Licensed under the Apache License, Version 2.0 (the "License"); |
| * you may not use this file except in compliance with the License. |
| * you may obtain a copy of the License from |
| @@ -21,7 +21,12 @@ |
| import java.io.RandomAccessFile; |
| import java.nio.ByteBuffer; |
| import java.nio.channels.FileChannel; |
| +import java.util.ArrayList; |
| +import java.util.HashMap; |
| +import java.util.List; |
| +import java.util.Map; |
| import java.util.Properties; |
| +import java.util.Set; |
| import java.util.concurrent.LinkedBlockingQueue; |
| import java.util.concurrent.atomic.AtomicInteger; |
| import java.util.concurrent.atomic.AtomicLong; |
| @@ -30,22 +35,25 @@ |
| |
| import edu.uci.ics.asterix.transaction.management.exception.ACIDException; |
| import edu.uci.ics.asterix.transaction.management.service.logging.IndexLogger.ReusableLogContentObject; |
| +import edu.uci.ics.asterix.transaction.management.service.logging.LogManager.PageOwnershipStatus; |
| +import edu.uci.ics.asterix.transaction.management.service.logging.LogManager.PageState; |
| import edu.uci.ics.asterix.transaction.management.service.transaction.TransactionContext; |
| import edu.uci.ics.asterix.transaction.management.service.transaction.TransactionManagementConstants; |
| import edu.uci.ics.asterix.transaction.management.service.transaction.TransactionSubsystem; |
| +import edu.uci.ics.hyracks.api.exceptions.HyracksDataException; |
| |
| public class LogManager implements ILogManager { |
| |
| public static final boolean IS_DEBUG_MODE = false;//true |
| private static final Logger LOGGER = Logger.getLogger(LogManager.class.getName()); |
| - private TransactionSubsystem provider; |
| + private final TransactionSubsystem provider; |
| private LogManagerProperties logManagerProperties; |
| + private LogPageFlushThread logPageFlusher; |
| |
| /* |
| * the array of log pages. The number of log pages is configurable. Pages |
| * taken together form an in-memory log buffer. |
| */ |
| - |
| private IFileBasedBuffer[] logPages; |
| |
| private ILogRecordHelper logRecordHelper; |
| @@ -54,6 +62,7 @@ |
| * Number of log pages that constitute the in-memory log buffer. |
| */ |
| private int numLogPages; |
| + |
| /* |
| * Initially all pages have an owner count of 1 that is the LogManager. When |
| * a transaction requests to write in a log page, the owner count is |
| @@ -62,12 +71,11 @@ |
| * (covering the whole log record). When the content has been put, the log |
| * manager computes the checksum and puts it after the content. At this |
| * point, the ownership count is decremented as the transaction is done with |
| - * using the page. When a page is full, the log manager decrements the count |
| - * by one indicating that it has released its ownership of the log page. |
| - * There could be other transaction(s) still owning the page (that is they |
| - * could still be mid-way putting the log content). When the ownership count |
| - * eventually reaches zero, the thread responsible for flushing the log page |
| - * is notified and the page is flushed to disk. |
| + * using the page. When a page is requested to be flushed, logPageFlusher |
| + * set the count to 0(LOG_FLUSHER: meaning that the page is being flushed) |
| + * only if the count is 1(LOG_WRITER: meaning that there is no other |
| + * transactions who own the page to write logs.) After flushing the page, |
| + * logPageFlusher set this count to 1. |
| */ |
| private AtomicInteger[] logPageOwnerCount; |
| |
| @@ -78,18 +86,16 @@ |
| |
| /* |
| * LogPageStatus: A page is either ACTIVE or INACTIVE. The status for each |
| - * page is maintained in a map called logPageStatus. A page is ACTIVE when |
| - * the LogManager can allocate space in the page for writing a log record. |
| - * Initially all pages are ACTIVE. As transactions fill up space by writing |
| - * log records, a page may not have sufficient space left for serving a |
| - * request by a transaction. When this happens, the page is marked INACTIVE. |
| - * An INACTIVE page with no owners ( logPageOwnerCount.get(<pageIndex>) == |
| - * 0) indicates that the page must be flushed to disk before any other log |
| - * record is written on the page.F |
| + * page is maintained in logPageStatus. A page is ACTIVE when the LogManager |
| + * can allocate space in the page for writing a log record. Initially all |
| + * pages are ACTIVE. As transactions fill up space by writing log records, |
| + * a page may not have sufficient space left for serving a request by a |
| + * transaction. When this happens, the page is flushed to disk by calling |
| + * logPageFlusher.requestFlush(). In the requestFlush(), after groupCommitWaitTime, |
| + * the page status is set to INACTIVE. Then, there is no more writer on the |
| + * page(meaning the corresponding logPageOwnerCount is 1), the page is flushed |
| + * by the logPageFlusher and the status is reset to ACTIVE by the logPageFlusher. |
| */ |
| - |
| - // private Map<Integer, Integer> logPageStatus = new |
| - // ConcurrentHashMap<Integer, Integer>(); |
| private AtomicInteger[] logPageStatus; |
| |
| static class PageState { |
| @@ -98,41 +104,8 @@ |
| } |
| |
| private AtomicLong lastFlushedLsn = new AtomicLong(-1); |
| - private AtomicInteger lastFlushedPage = new AtomicInteger(-1); |
| |
| /* |
| - * pendingFlushRequests is a map with key as Integer denoting the page |
| - * index. When a (transaction) thread discovers the need to flush a page, it |
| - * puts its Thread object into the corresponding value that is a |
| - * LinkedBlockingQueue. The LogManager has a LogFlusher thread that scans |
| - * this map in order of page index (and circling around). The flusher thread |
| - * needs to flush pages in order and waits for a thread to deposit an object |
| - * in the blocking queue corresponding to the next page in order. A request |
| - * to flush a page is conveyed to the flush thread by simply depositing an |
| - * object in to corresponding blocking queue. It is blocking in the sense |
| - * that the flusher thread will continue to wait for an object to arrive in |
| - * the queue. The object itself is ignored by the fliusher and just acts as |
| - * a signal/event that a page needs to be flushed. |
| - */ |
| - |
| - private LinkedBlockingQueue[] pendingFlushRequests; |
| - |
| - /* |
| - * ICommitResolver is an interface that provides an API that can answer a |
| - * simple boolean - Given the commit requests so far, should a page be |
| - * flushed. The implementation of the interface contains the logic (or you |
| - * can say the policy) for commit. It could be group commit in which case |
| - * the commit resolver may not return a true indicating that it wishes to |
| - * delay flushing of the page. |
| - */ |
| - private ICommitResolver commitResolver; |
| - |
| - /* |
| - * An object that keeps track of the submitted commit requests. |
| - */ |
| - private CommitRequestStatistics commitRequestStatistics; |
| - |
| - /* |
| * When the transaction eco-system comes to life, the log manager positions |
| * itself to the end of the last written log. the startingLsn represent the |
| * lsn value of the next log record to be written after a system (re)start. |
| @@ -146,16 +119,10 @@ |
| */ |
| private AtomicLong lsn = new AtomicLong(0); |
| |
| - /* |
| - * A map that tracks the flush requests submitted for each page. The |
| - * requests for a page are cleared when the page is flushed. |
| - */ |
| - public LinkedBlockingQueue<Thread> getPendingFlushRequests(int pageIndex) { |
| - return pendingFlushRequests[pageIndex]; |
| - } |
| + private List<HashMap<TransactionContext, Integer>> activeTxnCountMaps; |
| |
| - public void addFlushRequest(int pageIndex) { |
| - pendingFlushRequests[pageIndex].add(pendingFlushRequests); |
| + public void addFlushRequest(int pageIndex, long lsn, boolean isSynchronous) { |
| + logPageFlusher.requestFlush(pageIndex, lsn, isSynchronous); |
| } |
| |
| public AtomicLong getLastFlushedLsn() { |
| @@ -233,19 +200,12 @@ |
| numLogPages = logManagerProperties.getNumLogPages(); |
| logPageOwnerCount = new AtomicInteger[numLogPages]; |
| logPageStatus = new AtomicInteger[numLogPages]; |
| - pendingFlushRequests = new LinkedBlockingQueue[numLogPages]; |
| - if (logManagerProperties.getGroupCommitWaitPeriod() > 0) { // configure |
| - // the |
| - // Commit |
| - // Resolver |
| - commitResolver = new GroupCommitResolver(); // Group Commit is |
| - // enabled |
| - commitRequestStatistics = new CommitRequestStatistics(numLogPages); |
| - } else { |
| - commitResolver = new BasicCommitResolver(); // the basic commit |
| - // resolver |
| + |
| + activeTxnCountMaps = new ArrayList<HashMap<TransactionContext, Integer>>(numLogPages); |
| + for (int i = 0; i < numLogPages; i++) { |
| + activeTxnCountMaps.add(new HashMap<TransactionContext, Integer>()); |
| } |
| - this.commitResolver.init(this); // initialize the commit resolver |
| + |
| logPages = new FileBasedBuffer[numLogPages]; |
| |
| /* |
| @@ -264,7 +224,6 @@ |
| for (int i = 0; i < numLogPages; i++) { |
| logPageOwnerCount[i] = new AtomicInteger(PageOwnershipStatus.LOG_WRITER); |
| logPageStatus[i] = new AtomicInteger(PageState.ACTIVE); |
| - pendingFlushRequests[i] = new LinkedBlockingQueue<Thread>(); |
| } |
| |
| /* |
| @@ -278,9 +237,9 @@ |
| * daemon thread so that it does not stop the JVM from exiting when all |
| * other threads are done with their work. |
| */ |
| - LogPageFlushThread logFlusher = new LogPageFlushThread(this); |
| - logFlusher.setDaemon(true); |
| - logFlusher.start(); |
| + logPageFlusher = new LogPageFlushThread(this); |
| + logPageFlusher.setDaemon(true); |
| + logPageFlusher.start(); |
| } |
| |
| public int getLogPageIndex(long lsnValue) { |
| @@ -312,7 +271,7 @@ |
| */ |
| private void waitUntillPageIsAvailableForWritingLog(int pageIndex) throws ACIDException { |
| if (logPageStatus[pageIndex].get() == PageState.ACTIVE |
| - && getLogPageOwnershipCount(pageIndex).get() >= PageOwnershipStatus.LOG_WRITER) { |
| + && logPageOwnerCount[pageIndex].get() >= PageOwnershipStatus.LOG_WRITER) { |
| return; |
| } |
| try { |
| @@ -338,47 +297,40 @@ |
| */ |
| private long getLsn(int entrySize, byte logType) throws ACIDException { |
| long pageSize = logManagerProperties.getLogPageSize(); |
| - boolean requiresFlushing = logType == LogType.COMMIT; |
| + |
| while (true) { |
| boolean forwardPage = false; |
| - boolean shouldFlushPage = false; |
| long old = lsn.get(); |
| - int pageIndex = getLogPageIndex(old); // get the log page |
| - // corresponding to the |
| - // current lsn value |
| + |
| + //get the log page corresponding to the current lsn value |
| + int pageIndex = getLogPageIndex(old); |
| long retVal = old; |
| - long next = old + entrySize; // the lsn value for the next request, |
| - // if the current request is served. |
| + |
| + // the lsn value for the next request if the current request is served. |
| + long next = old + entrySize; |
| int prevPage = -1; |
| - if ((next - 1) / pageSize != old / pageSize // check if the log |
| - // record will cross |
| - // page boundaries, a |
| - // case that is not |
| - // allowed. |
| - || (next % pageSize == 0)) { |
| + |
| + // check if the log record will cross page boundaries, a case that is not allowed. |
| + if ((next - 1) / pageSize != old / pageSize || (next % pageSize == 0)) { |
| + |
| if ((old != 0 && old % pageSize == 0)) { |
| - retVal = old; // On second thought, this shall never be the |
| - // case as it means that the lsn is |
| - // currently at the beginning of a page and |
| - // we still need to forward the page which |
| - // means that the entrySize exceeds a log |
| - // page size. If this is the case, an |
| - // exception is thrown before calling this |
| - // API. |
| - // would remove this case. |
| + // On second thought, this shall never be the case as it means that the lsn is |
| + // currently at the beginning of a page and we still need to forward the page which |
| + // means that the entrySize exceeds a log page size. If this is the case, an |
| + // exception is thrown before calling this API. would remove this case. |
| + retVal = old; |
| |
| } else { |
| - retVal = ((old / pageSize) + 1) * pageSize; // set the lsn |
| - // to point to |
| - // the beginning |
| - // of the next |
| - // page. |
| + // set the lsn to point to the beginning of the next page. |
| + retVal = ((old / pageSize) + 1) * pageSize; |
| } |
| + |
| next = retVal; |
| - forwardPage = true; // as the log record shall cross log page |
| - // boundary, we must re-assign the lsn (so |
| - // that the log record begins on a different |
| - // location. |
| + |
| + // as the log record shall cross log page boundary, we must re-assign the lsn so |
| + // that the log record begins on a different location. |
| + forwardPage = true; |
| + |
| prevPage = pageIndex; |
| pageIndex = getNextPageInSequence(pageIndex); |
| } |
| @@ -397,109 +349,51 @@ |
| */ |
| waitUntillPageIsAvailableForWritingLog(pageIndex); |
| |
| - if (!forwardPage && requiresFlushing) { |
| - shouldFlushPage = commitResolver.shouldCommitPage(pageIndex, this, commitRequestStatistics); |
| - if (shouldFlushPage) { |
| - next = ((next / pageSize) + 1) * pageSize; /* |
| - * next |
| - * represents the |
| - * next value of |
| - * lsn after this |
| - * log record has |
| - * been written. |
| - * If the page |
| - * needs to be |
| - * flushed, then |
| - * we do not give |
| - * any more LSNs |
| - * from this |
| - * page. |
| - */ |
| - } |
| - } |
| - if (!lsn.compareAndSet(old, next)) { // Atomic call -> returns true |
| - // only when the value |
| - // represented by lsn is same as |
| - // "old". The value is updated |
| - // to "next". |
| + if (!lsn.compareAndSet(old, next)) { |
| + // Atomic call -> returns true only when the value represented by lsn is same as |
| + // "old". The value is updated to "next". |
| continue; |
| } |
| |
| if (forwardPage) { |
| - //TODO |
| - //this is not safe since the incoming thread may reach the same page slot with this page |
| - //(differ by the log buffer size) |
| - logPageStatus[prevPage].set(PageState.INACTIVE); // mark |
| - // previous |
| - // page |
| - // inactive |
| + addFlushRequest(prevPage, old, false); |
| |
| - /* |
| - * decrement on the behalf of the log manager. if there are no |
| - * more owners (count == 0) the page must be marked as a |
| - * candidate to be flushed. |
| - */ |
| - int pageDirtyCount = getLogPageOwnershipCount(prevPage).decrementAndGet(); |
| - if (pageDirtyCount == 0) { |
| - addFlushRequest(prevPage); |
| - } |
| - |
| - /* |
| - * The transaction thread that discovers the need to forward a |
| - * page is made to re-acquire a lsn. |
| - */ |
| + // The transaction thread that discovers the need to forward a |
| + // page is made to re-acquire a lsn. |
| continue; |
| + |
| } else { |
| - /* |
| - * the transaction thread has been given a space in a log page, |
| - * but is made to wait until the page is available. |
| - */ |
| + // the transaction thread has been given a space in a log page, |
| + // but is made to wait until the page is available. |
| + // (Is this needed? when does this wait happen?) |
| waitUntillPageIsAvailableForWritingLog(pageIndex); |
| - /* |
| - * increment the counter as the transaction thread now holds a |
| - * space in the log page and hence is an owner. |
| - */ |
| + |
| + // increment the counter as the transaction thread now holds a |
| + // space in the log page and hence is an owner. |
| logPageOwnerCount[pageIndex].incrementAndGet(); |
| - } |
| - if (requiresFlushing) { |
| - if (!shouldFlushPage) { |
| - /* |
| - * the log record requires the page to be flushed but under |
| - * the commit policy, the flush task has been deferred. The |
| - * transaction thread submits its request to flush the page. |
| - */ |
| - commitRequestStatistics.registerCommitRequest(pageIndex); |
| - } else { |
| - /* |
| - * the flush request was approved by the commit resolver. |
| - * Thus the page is marked INACTIVE as no more logs will be |
| - * written on this page. The log manager needs to release |
| - * its ownership. Note that transaction threads may still |
| - * continue to be owners of the log page till they fill up |
| - * the space allocated to them. |
| - */ |
| - logPageStatus[pageIndex].set(PageState.INACTIVE); |
| - logPageOwnerCount[pageIndex].decrementAndGet(); // on |
| - // the |
| - // behalf |
| - // of |
| - // log |
| - // manager |
| + |
| + // Before the count is incremented, if the flusher flushed the allocated page, |
| + // then retry to get new LSN. Otherwise, the log with allocated lsn will be lost. |
| + if (lastFlushedLsn.get() >= retVal) { |
| + logPageOwnerCount[pageIndex].decrementAndGet(); |
| + continue; |
| } |
| } |
| + |
| return retVal; |
| } |
| } |
| |
| @Override |
| - public void log(byte logType, TransactionContext context, int datasetId, int PKHashValue, long resourceId, |
| + public void log(byte logType, TransactionContext txnCtx, int datasetId, int PKHashValue, long resourceId, |
| byte resourceMgrId, int logContentSize, ReusableLogContentObject reusableLogContentObject, ILogger logger, |
| LogicalLogLocator logicalLogLocator) throws ACIDException { |
| - /* |
| - * logLocator is a re-usable object that is appropriately set in each |
| - * invocation. If the reference is null, the log manager must throw an |
| - * exception |
| - */ |
| + |
| + HashMap<TransactionContext, Integer> map = null; |
| + int activeTxnCount; |
| + |
| + // logLocator is a re-usable object that is appropriately set in each invocation. |
| + // If the reference is null, the log manager must throw an exception. |
| if (logicalLogLocator == null) { |
| throw new ACIDException( |
| " you need to pass in a non-null logLocator, if you dont have it, then pass in a dummy so that the +" |
| @@ -519,20 +413,19 @@ |
| |
| // all constraints checked and we are good to go and acquire a lsn. |
| long previousLSN = -1; |
| - long currentLSN; // the will be set to the location (a long value) |
| - // where the log record needs to be placed. |
| |
| - /* |
| - * The logs written by a transaction need to be linked to each other for |
| - * a successful rollback/recovery. However there could be multiple |
| - * threads operating concurrently that are part of a common transaction. |
| - * These threads need to synchronize and record the lsn corresponding to |
| - * the last log record written by (any thread of) the transaction. |
| - */ |
| - synchronized (context) { |
| - previousLSN = context.getLastLogLocator().getLsn(); |
| + // the will be set to the location (a long value) where the log record needs to be placed. |
| + long currentLSN; |
| + |
| + // The logs written by a transaction need to be linked to each other for |
| + // a successful rollback/recovery. However there could be multiple |
| + // threads operating concurrently that are part of a common transaction. |
| + // These threads need to synchronize and record the lsn corresponding to |
| + // the last log record written by (any thread of) the transaction. |
| + synchronized (txnCtx) { |
| + previousLSN = txnCtx.getLastLogLocator().getLsn(); |
| currentLSN = getLsn(totalLogSize, logType); |
| - context.setLastLSN(currentLSN); |
| + txnCtx.setLastLSN(currentLSN); |
| if (IS_DEBUG_MODE) { |
| System.out.println("--------------> LSN(" + currentLSN + ") is allocated"); |
| } |
| @@ -547,48 +440,37 @@ |
| * performed correctly that is ownership is released. |
| */ |
| |
| - boolean decremented = false; // indicates if the transaction thread |
| - // has release ownership of the |
| - // page. |
| - boolean addedFlushRequest = false; // indicates if the transaction |
| - // thread has submitted a flush |
| - // request. |
| + // indicates if the transaction thread has release ownership of the page. |
| + boolean decremented = false; |
| |
| int pageIndex = (int) getLogPageIndex(currentLSN); |
| |
| - /* |
| - * the lsn has been obtained for the log record. need to set the |
| - * LogLocator instance accordingly. |
| - */ |
| - |
| + // the lsn has been obtained for the log record. need to set the |
| + // LogLocator instance accordingly. |
| try { |
| - |
| logicalLogLocator.setBuffer(logPages[pageIndex]); |
| int pageOffset = getLogPageOffset(currentLSN); |
| logicalLogLocator.setMemoryOffset(pageOffset); |
| |
| - /* |
| - * write the log header. |
| - */ |
| - logRecordHelper.writeLogHeader(logicalLogLocator, logType, context, datasetId, PKHashValue, previousLSN, |
| + // write the log header. |
| + logRecordHelper.writeLogHeader(logicalLogLocator, logType, txnCtx, datasetId, PKHashValue, previousLSN, |
| resourceId, resourceMgrId, logContentSize); |
| |
| // increment the offset so that the transaction can fill up the |
| // content in the correct region of the allocated space. |
| logicalLogLocator.increaseMemoryOffset(logRecordHelper.getLogHeaderSize(logType)); |
| |
| - // a COMMIT log record does not have any content |
| - // and hence the logger (responsible for putting the log content) is |
| - // not invoked. |
| + // a COMMIT log record does not have any content and hence |
| + // the logger (responsible for putting the log content) is not invoked. |
| if (logContentSize != 0) { |
| - logger.preLog(context, reusableLogContentObject); |
| + logger.preLog(txnCtx, reusableLogContentObject); |
| } |
| |
| if (logContentSize != 0) { |
| // call the logger implementation and ask to fill in the log |
| // record content at the allocated space. |
| - logger.log(context, logicalLogLocator, logContentSize, reusableLogContentObject); |
| - logger.postLog(context, reusableLogContentObject); |
| + logger.log(txnCtx, logicalLogLocator, logContentSize, reusableLogContentObject); |
| + logger.postLog(txnCtx, reusableLogContentObject); |
| if (IS_DEBUG_MODE) { |
| logicalLogLocator.setMemoryOffset(logicalLogLocator.getMemoryOffset() |
| - logRecordHelper.getLogHeaderSize(logType)); |
| @@ -597,10 +479,8 @@ |
| } |
| } |
| |
| - /* |
| - * The log record has been written. For integrity checks, compute |
| - * the checksum and put it at the end of the log record. |
| - */ |
| + // The log record has been written. For integrity checks, compute |
| + // the checksum and put it at the end of the log record. |
| int startPosChecksum = logicalLogLocator.getMemoryOffset() - logRecordHelper.getLogHeaderSize(logType); |
| int length = totalLogSize - logRecordHelper.getLogChecksumSize(); |
| long checksum = DataUtil.getChecksum(logPages[pageIndex], startPosChecksum, length); |
| @@ -611,46 +491,31 @@ |
| System.out.println("--------------> LSN(" + currentLSN + ") is written"); |
| } |
| |
| - /* |
| - * release the ownership as the log record has been placed in |
| - * created space. |
| - */ |
| - int pageDirtyCount = logPageOwnerCount[pageIndex].decrementAndGet(); |
| + // release the ownership as the log record has been placed in created space. |
| + logPageOwnerCount[pageIndex].decrementAndGet(); |
| |
| // indicating that the transaction thread has released ownership |
| decremented = true; |
| |
| - /* |
| - * If the transaction thread happens to be the last owner of the log |
| - * page the page must by marked as a candidate to be flushed. |
| - */ |
| - if (pageDirtyCount == 0 && logPageStatus[pageIndex].get() == PageState.INACTIVE) { |
| - addFlushRequest(pageIndex); |
| - addedFlushRequest = true; |
| - } |
| - |
| - /* |
| - * If the log type is commit, a flush request is registered, if the |
| - * log record has not reached the disk. It may be possible that this |
| - * thread does not get CPU cycles and in-between the log record has |
| - * been flushed to disk because the containing log page filled up. |
| - */ |
| - if (logType == LogType.COMMIT) { |
| - synchronized (logPages[pageIndex]) { |
| - while (getLastFlushedLsn().get() < currentLSN) { |
| - logPages[pageIndex].wait(); |
| - } |
| + if (logType == LogType.ENTITY_COMMIT) { |
| + map = activeTxnCountMaps.get(pageIndex); |
| + if (map.containsKey(txnCtx)) { |
| + activeTxnCount = (Integer) map.get(txnCtx); |
| + activeTxnCount++; |
| + map.put(txnCtx, activeTxnCount); |
| + } else { |
| + map.put(txnCtx, 1); |
| } |
| + addFlushRequest(pageIndex, currentLSN, false); |
| + } else if (logType == LogType.COMMIT) { |
| + addFlushRequest(pageIndex, currentLSN, true); |
| } |
| |
| } catch (Exception e) { |
| e.printStackTrace(); |
| - throw new ACIDException(context, "Thread: " + Thread.currentThread().getName() |
| + throw new ACIDException(txnCtx, "Thread: " + Thread.currentThread().getName() |
| + " logger encountered exception", e); |
| } finally { |
| - /* |
| - * If an exception was encountered and we did not release ownership |
| - */ |
| if (!decremented) { |
| logPageOwnerCount[pageIndex].decrementAndGet(); |
| } |
| @@ -667,9 +532,6 @@ |
| |
| logPages[pageIndex].reset(filePath, LogUtil.getFileOffset(this, nextWritePosition), |
| logManagerProperties.getLogPageSize()); |
| - |
| - //TODO Check if this is necessary |
| - //Arrays.fill(logPages[pageIndex].getArray(), (byte) 0); |
| } |
| |
| @Override |
| @@ -747,16 +609,13 @@ |
| //minimize memory allocation overhead. current code allocates the log page size per reading a log record. |
| |
| byte[] pageContent = new byte[logManagerProperties.getLogPageSize()]; |
| - // take a lock on the log page so that the page is not flushed to |
| - // disk interim |
| + |
| + // take a lock on the log page so that the page is not flushed to disk interim |
| synchronized (logPages[pageIndex]) { |
| - if (lsnValue > getLastFlushedLsn().get()) { // need to check |
| - // again |
| - // (this |
| - // thread may have got |
| - // de-scheduled and must |
| - // refresh!) |
| |
| + // need to check again (this thread may have got de-scheduled and must refresh!) |
| + if (lsnValue > getLastFlushedLsn().get()) { |
| + |
| // get the log record length |
| logPages[pageIndex].getBytes(pageContent, 0, pageContent.length); |
| byte logType = pageContent[pageOffset + 4]; |
| @@ -765,9 +624,7 @@ |
| int logRecordSize = logHeaderSize + logBodySize + logRecordHelper.getLogChecksumSize(); |
| logRecord = new byte[logRecordSize]; |
| |
| - /* |
| - * copy the log record content |
| - */ |
| + // copy the log record content |
| System.arraycopy(pageContent, pageOffset, logRecord, 0, logRecordSize); |
| MemBasedBuffer memBuffer = new MemBasedBuffer(logRecord); |
| if (logicalLogLocator == null) { |
| @@ -790,9 +647,7 @@ |
| } |
| } |
| |
| - /* |
| - * the log record is residing on the disk, read it from there. |
| - */ |
| + // the log record is residing on the disk, read it from there. |
| readDiskLog(lsnValue, logicalLogLocator); |
| } |
| |
| @@ -860,30 +715,40 @@ |
| return logPageOwnerCount[pageIndex]; |
| } |
| |
| - public ICommitResolver getCommitResolver() { |
| - return commitResolver; |
| - } |
| - |
| - public CommitRequestStatistics getCommitRequestStatistics() { |
| - return commitRequestStatistics; |
| - } |
| - |
| public IFileBasedBuffer[] getLogPages() { |
| return logPages; |
| } |
| |
| - public int getLastFlushedPage() { |
| - return lastFlushedPage.get(); |
| - } |
| - |
| - public void setLastFlushedPage(int lastFlushedPage) { |
| - this.lastFlushedPage.set(lastFlushedPage); |
| - } |
| - |
| @Override |
| public TransactionSubsystem getTransactionSubsystem() { |
| return provider; |
| } |
| + |
| + public void decrementActiveTxnCountOnIndexes(int pageIndex) throws HyracksDataException { |
| + TransactionContext ctx = null; |
| + int count = 0; |
| + int i = 0; |
| + |
| + HashMap<TransactionContext, Integer> map = activeTxnCountMaps.get(pageIndex); |
| + Set<Map.Entry<TransactionContext, Integer>> entrySet = map.entrySet(); |
| + if (entrySet != null) { |
| + for (Map.Entry<TransactionContext, Integer> entry : entrySet) { |
| + if (entry != null) { |
| + if (entry.getValue() != null) { |
| + count = entry.getValue(); |
| + } |
| + if (count > 0) { |
| + ctx = entry.getKey(); |
| + for (i = 0; i < count; i++) { |
| + ctx.decreaseActiveTransactionCountOnIndexes(); |
| + } |
| + } |
| + } |
| + } |
| + } |
| + |
| + map.clear(); |
| + } |
| } |
| |
| /* |
| @@ -895,36 +760,82 @@ |
| class LogPageFlushThread extends Thread { |
| |
| private LogManager logManager; |
| + /* |
| + * pendingFlushRequests is a map with key as Integer denoting the page |
| + * index. When a (transaction) thread discovers the need to flush a page, it |
| + * puts its Thread object into the corresponding value that is a |
| + * LinkedBlockingQueue. The LogManager has a LogFlusher thread that scans |
| + * this map in order of page index (and circling around). The flusher thread |
| + * needs to flush pages in order and waits for a thread to deposit an object |
| + * in the blocking queue corresponding to the next page in order. A request |
| + * to flush a page is conveyed to the flush thread by simply depositing an |
| + * object in to corresponding blocking queue. It is blocking in the sense |
| + * that the flusher thread will continue to wait for an object to arrive in |
| + * the queue. The object itself is ignored by the fliusher and just acts as |
| + * a signal/event that a page needs to be flushed. |
| + */ |
| + private final LinkedBlockingQueue<Object>[] flushRequestQueue; |
| + private final Object[] flushRequests; |
| + private int lastFlushedPageIndex; |
| + private final long groupCommitWaitPeriod; |
| |
| public LogPageFlushThread(LogManager logManager) { |
| this.logManager = logManager; |
| setName("Flusher"); |
| + int numLogPages = logManager.getLogManagerProperties().getNumLogPages(); |
| + this.flushRequestQueue = new LinkedBlockingQueue[numLogPages]; |
| + this.flushRequests = new Object[numLogPages]; |
| + for (int i = 0; i < numLogPages; i++) { |
| + flushRequestQueue[i] = new LinkedBlockingQueue<Object>(1); |
| + flushRequests[i] = new Object(); |
| + } |
| + this.lastFlushedPageIndex = -1; |
| + groupCommitWaitPeriod = logManager.getLogManagerProperties().getGroupCommitWaitPeriod(); |
| } |
| |
| + public void requestFlush(int pageIndex, long lsn, boolean isSynchronous) { |
| + synchronized (logManager.getLogPage(pageIndex)) { |
| + //return if flushedLSN >= lsn |
| + if (logManager.getLastFlushedLsn().get() >= lsn) { |
| + return; |
| + } |
| + |
| + //put a new request to the queue only if the request on the page is not in the queue. |
| + flushRequestQueue[pageIndex].offer(flushRequests[pageIndex]); |
| + |
| + //return if the request is asynchronous |
| + if (!isSynchronous) { |
| + return; |
| + } |
| + |
| + //wait until there is flush. |
| + boolean isNotified = false; |
| + while (!isNotified) { |
| + try { |
| + logManager.getLogPage(pageIndex).wait(); |
| + isNotified = true; |
| + } catch (InterruptedException e) { |
| + e.printStackTrace(); |
| + } |
| + } |
| + } |
| + } |
| + |
| @Override |
| public void run() { |
| while (true) { |
| try { |
| - int pageToFlush = logManager.getNextPageInSequence(logManager.getLastFlushedPage()); |
| + int pageToFlush = logManager.getNextPageInSequence(lastFlushedPageIndex); |
| |
| - /* |
| - * A wait call on the linkedBLockingQueue. The flusher thread is |
| - * notified when an object is added to the queue. Please note |
| - * that each page has an associated blocking queue. |
| - */ |
| - logManager.getPendingFlushRequests(pageToFlush).take(); |
| + // A wait call on the linkedBLockingQueue. The flusher thread is |
| + // notified when an object is added to the queue. Please note |
| + // that each page has an associated blocking queue. |
| + flushRequestQueue[pageToFlush].take(); |
| |
| - /* |
| - * The LogFlusher was waiting for a page to be marked as a |
| - * candidate for flushing. Now that has happened. The thread |
| - * shall proceed to take a lock on the log page |
| - */ |
| - synchronized (logManager.getLogPages()[pageToFlush]) { |
| + synchronized (logManager.getLogPage(pageToFlush)) { |
| |
| - /* |
| - * lock the internal state of the log manager and create a |
| - * log file if necessary. |
| - */ |
| + // lock the internal state of the log manager and create a |
| + // log file if necessary. |
| int prevLogFileId = logManager.getLogFileId(logManager.getLastFlushedLsn().get()); |
| int nextLogFileId = logManager.getLogFileId(logManager.getLastFlushedLsn().get() |
| + logManager.getLogManagerProperties().getLogPageSize()); |
| @@ -936,198 +847,60 @@ |
| logManager.getLogManagerProperties().getLogPageSize()); |
| } |
| |
| - logManager.getLogPage(pageToFlush).flush(); // put the |
| - // content to |
| - // disk, the |
| - // thread still |
| - // has a lock on |
| - // the log page |
| + //#. sleep during the groupCommitWaitTime |
| + sleep(groupCommitWaitPeriod); |
| |
| - /* |
| - * acquire lock on the log manager as we need to update the |
| - * internal bookkeeping data. |
| - */ |
| + //#. set the logPageStatus to INACTIVE in order to prevent other txns from writing on this page. |
| + logManager.getLogPageStatus(pageToFlush).set(PageState.INACTIVE); |
| |
| - // increment the last flushed lsn. |
| - long lastFlushedLsn = logManager.incrementLastFlushedLsn(logManager.getLogManagerProperties() |
| - .getLogPageSize()); |
| + //#. need to wait until the logPageOwnerCount reaches 1 (LOG_WRITER) |
| + // meaning every one has finished writing logs on this page. |
| + while (logManager.getLogPageOwnershipCount(pageToFlush).get() != PageOwnershipStatus.LOG_WRITER) { |
| + sleep(0); |
| + } |
| |
| - /* |
| - * the log manager gains back ownership of the page. this is |
| - * reflected by incrementing the owner count of the page. |
| - * recall that when the page is begin flushed the owner |
| - * count is actually 0 Value of zero implicitly indicates |
| - * that the page is operated upon by the log flusher thread. |
| - */ |
| - logManager.getLogPageOwnershipCount(pageToFlush).incrementAndGet(); |
| + //#. set the logPageOwnerCount to 0 (LOG_FLUSHER) |
| + // meaning it is flushing. |
| + logManager.getLogPageOwnershipCount(pageToFlush).set(PageOwnershipStatus.LOG_FLUSHER); |
| |
| - /* |
| - * get the number of log buffers that have been written so |
| - * far. A log buffer = number of log pages * size of a log |
| - * page |
| - */ |
| - int numCycles = (int) lastFlushedLsn / logManager.getLogManagerProperties().getLogBufferSize(); |
| - if (lastFlushedLsn % logManager.getLogManagerProperties().getLogBufferSize() == 0) { |
| - numCycles--; |
| - } |
| + // put the content to disk (the thread still has a lock on the log page) |
| + logManager.getLogPage(pageToFlush).flush(); |
| |
| - /* |
| - * Map the log page to a new region in the log file. |
| - */ |
| + // increment the last flushed lsn and lastFlushedPage |
| + logManager.incrementLastFlushedLsn(logManager.getLogManagerProperties().getLogPageSize()); |
| + lastFlushedPageIndex = pageToFlush; |
| |
| + // decrement activeTxnCountOnIndexes |
| + logManager.decrementActiveTxnCountOnIndexes(pageToFlush); |
| + |
| + // reset the count to 1 |
| + logManager.getLogPageOwnershipCount(pageToFlush).set(PageOwnershipStatus.LOG_WRITER); |
| + |
| + // Map the log page to a new region in the log file. |
| long nextWritePosition = logManager.getLogPages()[pageToFlush].getNextWritePosition() |
| + logManager.getLogManagerProperties().getLogBufferSize(); |
| |
| - /* |
| - * long nextPos = (numCycles + 1) |
| - * logManager.getLogManagerProperties() .getLogBufferSize() |
| - * + pageToFlush logManager.getLogManagerProperties() |
| - * .getLogPageSize(); |
| - */ |
| logManager.resetLogPage(nextWritePosition, pageToFlush); |
| |
| // mark the page as ACTIVE |
| logManager.getLogPageStatus(pageToFlush).set(LogManager.PageState.ACTIVE); |
| |
| - // notify all waiting (transaction) threads. |
| - // Transaction thread may be waiting for the page to be |
| - // available or may have a commit log record on the page |
| - // that got flushed. |
| - logManager.getLogPages()[pageToFlush].notifyAll(); |
| - logManager.setLastFlushedPage(pageToFlush); |
| + //#. checks the queue whether there is another flush request on the same log buffer |
| + // If there is another request, then simply remove it. |
| + if (flushRequestQueue[pageToFlush].peek() != null) { |
| + flushRequestQueue[pageToFlush].take(); |
| + } |
| |
| + // notify all waiting (transaction) threads. |
| + logManager.getLogPage(pageToFlush).notifyAll(); |
| } |
| } catch (IOException ioe) { |
| ioe.printStackTrace(); |
| throw new Error(" exception in flushing log page", ioe); |
| } catch (InterruptedException e) { |
| e.printStackTrace(); |
| - break; // must break from the loop as the exception indicates |
| - // some thing horrendous has happened elsewhere |
| + break; |
| } |
| } |
| } |
| -} |
| - |
| -/* |
| - * TODO: By default the commit policy is to commit at each request and not have |
| - * a group commit. The following code needs to change to support group commit. |
| - * The code for group commit has not been tested thoroughly and is under |
| - * development. |
| - */ |
| -class BasicCommitResolver implements ICommitResolver { |
| - |
| - public boolean shouldCommitPage(int pageIndex, LogManager logManager, |
| - CommitRequestStatistics commitRequestStatistics) { |
| - return true; |
| - } |
| - |
| - public void init(LogManager logManager) { |
| - } |
| -} |
| - |
| -class GroupCommitResolver implements ICommitResolver { |
| - |
| - public boolean shouldCommitPage(int pageIndex, LogManager logManager, |
| - CommitRequestStatistics commitRequestStatistics) { |
| - long maxCommitWait = logManager.getLogManagerProperties().getGroupCommitWaitPeriod(); |
| - long timestamp = commitRequestStatistics.getPageLevelLastCommitRequestTimestamp(pageIndex); |
| - if (timestamp == -1) { |
| - if (maxCommitWait == 0) { |
| - return true; |
| - } else { |
| - timestamp = System.currentTimeMillis(); |
| - } |
| - } |
| - long currenTime = System.currentTimeMillis(); |
| - if (currenTime - timestamp > maxCommitWait) { |
| - return true; |
| - } |
| - return false; |
| - } |
| - |
| - public void init(LogManager logManager) { |
| - GroupCommitHandlerThread groupCommitHandler = new GroupCommitHandlerThread(logManager); |
| - groupCommitHandler.setDaemon(true); |
| - groupCommitHandler.start(); |
| - } |
| - |
| - class GroupCommitHandlerThread extends Thread { |
| - |
| - private LogManager logManager; |
| - |
| - public GroupCommitHandlerThread(LogManager logManager) { |
| - this.logManager = logManager; |
| - setName("Group Commit Handler"); |
| - } |
| - |
| - @Override |
| - public void run() { |
| - int pageIndex = -1; |
| - while (true) { |
| - pageIndex = logManager.getNextPageInSequence(pageIndex); |
| - long lastCommitRequeestTimestamp = logManager.getCommitRequestStatistics() |
| - .getPageLevelLastCommitRequestTimestamp(pageIndex); |
| - if (lastCommitRequeestTimestamp != -1 |
| - && System.currentTimeMillis() - lastCommitRequeestTimestamp > logManager |
| - .getLogManagerProperties().getGroupCommitWaitPeriod()) { |
| - int dirtyCount = logManager.getLogPageOwnershipCount(pageIndex).decrementAndGet(); |
| - if (dirtyCount == 0) { |
| - try { |
| - logManager.getLogPageStatus(pageIndex).set(LogManager.PageState.INACTIVE); |
| - logManager.getPendingFlushRequests(pageIndex).put(Thread.currentThread()); |
| - } catch (InterruptedException e) { |
| - e.printStackTrace(); |
| - break; |
| - } |
| - logManager.getCommitRequestStatistics().committedPage(pageIndex); |
| - } |
| - } |
| - } |
| - } |
| - } |
| - |
| -} |
| - |
| -interface ICommitResolver { |
| - public boolean shouldCommitPage(int pageIndex, LogManager logManager, |
| - CommitRequestStatistics commitRequestStatistics); |
| - |
| - public void init(LogManager logManager); |
| -} |
| - |
| -/** |
| - * Represents a collection of all commit requests by transactions for each log |
| - * page. The requests are accumulated until the commit policy triggers a flush |
| - * of the corresponding log page. Upon a flush of a page, all commit requests |
| - * for the page are cleared. |
| - */ |
| -class CommitRequestStatistics { |
| - |
| - AtomicInteger[] pageLevelCommitRequestCount; |
| - AtomicLong[] pageLevelLastCommitRequestTimestamp; |
| - |
| - public CommitRequestStatistics(int numPages) { |
| - pageLevelCommitRequestCount = new AtomicInteger[numPages]; |
| - pageLevelLastCommitRequestTimestamp = new AtomicLong[numPages]; |
| - for (int i = 0; i < numPages; i++) { |
| - pageLevelCommitRequestCount[i] = new AtomicInteger(0); |
| - pageLevelLastCommitRequestTimestamp[i] = new AtomicLong(-1L); |
| - } |
| - } |
| - |
| - public void registerCommitRequest(int pageIndex) { |
| - pageLevelCommitRequestCount[pageIndex].incrementAndGet(); |
| - pageLevelLastCommitRequestTimestamp[pageIndex].set(System.currentTimeMillis()); |
| - } |
| - |
| - public long getPageLevelLastCommitRequestTimestamp(int pageIndex) { |
| - return pageLevelLastCommitRequestTimestamp[pageIndex].get(); |
| - } |
| - |
| - public void committedPage(int pageIndex) { |
| - pageLevelCommitRequestCount[pageIndex].set(0); |
| - pageLevelLastCommitRequestTimestamp[pageIndex].set(-1L); |
| - } |
| - |
| -} |
| +} |
| \ No newline at end of file |
| Index: asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogRecordHelper.java |
| =================================================================== |
| --- asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogRecordHelper.java (revision 1194) |
| +++ asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogRecordHelper.java (working copy) |
| @@ -152,6 +152,9 @@ |
| case LogType.UPDATE: |
| logTypeDisplay = "UPDATE"; |
| break; |
| + case LogType.ENTITY_COMMIT: |
| + logTypeDisplay = "ENTITY_COMMIT"; |
| + break; |
| } |
| builder.append(" LSN : ").append(logicalLogLocator.getLsn()); |
| builder.append(" Log Type : ").append(logTypeDisplay); |
| Index: asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogType.java |
| =================================================================== |
| --- asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogType.java (revision 1194) |
| +++ asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogType.java (working copy) |
| @@ -18,5 +18,6 @@ |
| |
| public static final byte UPDATE = 0; |
| public static final byte COMMIT = 1; |
| + public static final byte ENTITY_COMMIT = 2; |
| |
| } |
| Index: asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogManagerProperties.java |
| =================================================================== |
| --- asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogManagerProperties.java (revision 1194) |
| +++ asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogManagerProperties.java (working copy) |
| @@ -1,5 +1,5 @@ |
| /* |
| - * Copyright 2009-2010 by The Regents of the University of California |
| + * Copyright 2009-2012 by The Regents of the University of California |
| * Licensed under the Apache License, Version 2.0 (the "License"); |
| * you may not use this file except in compliance with the License. |
| * you may obtain a copy of the License from |
| @@ -41,7 +41,7 @@ |
| private int logPageSize = 128 * 1024; // 128 KB |
| private int numLogPages = 8; // number of log pages in the log buffer. |
| |
| - private long groupCommitWaitPeriod = 0; // time in milliseconds for which a |
| + private long groupCommitWaitPeriod = 1; // time in milliseconds for which a |
| // commit record will wait before |
| // the housing page is marked for |
| // flushing. |
| Index: asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/recovery/RecoveryManager.java |
| =================================================================== |
| --- asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/recovery/RecoveryManager.java (revision 1194) |
| +++ asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/recovery/RecoveryManager.java (working copy) |
| @@ -184,6 +184,7 @@ |
| break; |
| |
| case LogType.COMMIT: |
| + case LogType.ENTITY_COMMIT: |
| tempKeyTxnId.setTxnId(logRecordHelper.getJobId(currentLogLocator), |
| logRecordHelper.getDatasetId(currentLogLocator), |
| logRecordHelper.getPKHashValue(currentLogLocator)); |
| @@ -218,6 +219,7 @@ |
| IIndex index = null; |
| LocalResource localResource = null; |
| ILocalResourceMetadata localResourceMetadata = null; |
| + List<Long> resourceIdList = new ArrayList<Long>(); |
| |
| //#. get indexLifeCycleManager |
| IAsterixAppRuntimeContextProvider appRuntimeContext = txnSubsystem.getAsterixAppRuntimeContextProvider(); |
| @@ -272,6 +274,8 @@ |
| index = localResourceMetadata.createIndexInstance(appRuntimeContext, |
| localResource.getResourceName(), localResource.getPartition()); |
| indexLifecycleManager.register(resourceId, index); |
| + indexLifecycleManager.open(resourceId); |
| + resourceIdList.add(resourceId); |
| } |
| |
| /***************************************************/ |
| @@ -300,6 +304,7 @@ |
| break; |
| |
| case LogType.COMMIT: |
| + case LogType.ENTITY_COMMIT: |
| //do nothing |
| break; |
| |
| @@ -308,6 +313,11 @@ |
| } |
| } |
| |
| + //close all indexes |
| + for (long r : resourceIdList) { |
| + indexLifecycleManager.close(r); |
| + } |
| + |
| JobIdFactory.initJobId(maxJobId); |
| } |
| |
| @@ -539,6 +549,7 @@ |
| break; |
| |
| case LogType.COMMIT: |
| + case LogType.ENTITY_COMMIT: |
| undoLSNSet = loserTxnTable.get(tempKeyTxnId); |
| if (undoLSNSet != null) { |
| loserTxnTable.remove(tempKeyTxnId); |
| Index: asterix-app/src/test/java/edu/uci/ics/asterix/test/metadata/MetadataTest.java |
| =================================================================== |
| --- asterix-app/src/test/java/edu/uci/ics/asterix/test/metadata/MetadataTest.java (revision 1194) |
| +++ asterix-app/src/test/java/edu/uci/ics/asterix/test/metadata/MetadataTest.java (working copy) |
| @@ -42,6 +42,16 @@ |
| List<CompilationUnit> cUnits = tcCtx.getTestCase().getCompilationUnit(); |
| for (CompilationUnit cUnit : cUnits) { |
| File testFile = tcCtx.getTestFile(cUnit); |
| + |
| + /***************** |
| + if (!testFile.getAbsolutePath().contains("meta09.aql")) { |
| + System.out.println(testFile.getAbsolutePath()); |
| + continue; |
| + } |
| + System.out.println(testFile.getAbsolutePath()); |
| + *****************/ |
| + |
| + |
| File expectedResultFile = tcCtx.getExpectedResultFile(cUnit); |
| File actualFile = new File(PATH_ACTUAL + File.separator |
| + tcCtx.getTestCase().getFilePath().replace(File.separator, "_") + "_" + cUnit.getName() + ".adm"); |
| Index: asterix-app/src/test/java/edu/uci/ics/asterix/test/runtime/ExecutionTest.java |
| =================================================================== |
| --- asterix-app/src/test/java/edu/uci/ics/asterix/test/runtime/ExecutionTest.java (revision 1194) |
| +++ asterix-app/src/test/java/edu/uci/ics/asterix/test/runtime/ExecutionTest.java (working copy) |
| @@ -95,9 +95,10 @@ |
| File testFile = tcCtx.getTestFile(cUnit); |
| |
| /*************** to avoid run failure cases **************** |
| - if (!testFile.getAbsolutePath().contains("index-selection/")) { |
| + if (!testFile.getAbsolutePath().contains("query-issue205.aql")) { |
| continue; |
| } |
| + System.out.println(testFile.getAbsolutePath()); |
| ************************************************************/ |
| |
| File expectedResultFile = tcCtx.getExpectedResultFile(cUnit); |
| Index: diff_file |
| =================================================================== |
| --- diff_file (revision 1194) |
| +++ diff_file (working copy) |
| @@ -1,2098 +1,1252 @@ |
| -Index: asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entitytupletranslators/DataverseTupleTranslator.java |
| +Index: asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionManager.java |
| =================================================================== |
| ---- asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entitytupletranslators/DataverseTupleTranslator.java (revision 1061) |
| -+++ asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entitytupletranslators/DataverseTupleTranslator.java (working copy) |
| -@@ -25,8 +25,11 @@ |
| - import edu.uci.ics.asterix.metadata.bootstrap.MetadataPrimaryIndexes; |
| - import edu.uci.ics.asterix.metadata.bootstrap.MetadataRecordTypes; |
| - import edu.uci.ics.asterix.metadata.entities.Dataverse; |
| -+import edu.uci.ics.asterix.om.base.AInt32; |
| -+import edu.uci.ics.asterix.om.base.AMutableInt32; |
| - import edu.uci.ics.asterix.om.base.ARecord; |
| - import edu.uci.ics.asterix.om.base.AString; |
| -+import edu.uci.ics.asterix.om.types.BuiltinType; |
| - import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer; |
| - import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference; |
| +--- asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionManager.java (revision 1194) |
| ++++ asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionManager.java (working copy) |
| +@@ -103,12 +103,14 @@ |
| + //for entity-level commit |
| + if (PKHashVal != -1) { |
| + transactionProvider.getLockManager().unlock(datasetId, PKHashVal, txnContext, true); |
| ++ /***************************** |
| + try { |
| + //decrease the transaction reference count on index |
| + txnContext.decreaseActiveTransactionCountOnIndexes(); |
| + } catch (HyracksDataException e) { |
| + throw new ACIDException("failed to complete index operation", e); |
| + } |
| ++ *****************************/ |
| + return; |
| + } |
| |
| -@@ -40,12 +43,18 @@ |
| - // Payload field containing serialized Dataverse. |
| - public static final int DATAVERSE_PAYLOAD_TUPLE_FIELD_INDEX = 1; |
| - |
| -+ private AMutableInt32 aInt32; |
| -+ protected ISerializerDeserializer<AInt32> aInt32Serde; |
| -+ |
| - @SuppressWarnings("unchecked") |
| - private ISerializerDeserializer<ARecord> recordSerDes = AqlSerializerDeserializerProvider.INSTANCE |
| - .getSerializerDeserializer(MetadataRecordTypes.DATAVERSE_RECORDTYPE); |
| - |
| -+ @SuppressWarnings("unchecked") |
| - public DataverseTupleTranslator(boolean getTuple) { |
| - super(getTuple, MetadataPrimaryIndexes.DATAVERSE_DATASET.getFieldCount()); |
| -+ aInt32 = new AMutableInt32(-1); |
| -+ aInt32Serde = AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.AINT32); |
| - } |
| - |
| - @Override |
| -@@ -57,7 +66,8 @@ |
| - DataInput in = new DataInputStream(stream); |
| - ARecord dataverseRecord = recordSerDes.deserialize(in); |
| - return new Dataverse(((AString) dataverseRecord.getValueByPos(0)).getStringValue(), |
| -- ((AString) dataverseRecord.getValueByPos(1)).getStringValue()); |
| -+ ((AString) dataverseRecord.getValueByPos(1)).getStringValue(), |
| -+ ((AInt32) dataverseRecord.getValueByPos(2)).getIntegerValue()); |
| - } |
| - |
| - @Override |
| -@@ -88,6 +98,12 @@ |
| - stringSerde.serialize(aString, fieldValue.getDataOutput()); |
| - recordBuilder.addField(MetadataRecordTypes.DATAVERSE_ARECORD_TIMESTAMP_FIELD_INDEX, fieldValue); |
| - |
| -+ // write field 3 |
| -+ fieldValue.reset(); |
| -+ aInt32.setValue(instance.getPendingOp()); |
| -+ aInt32Serde.serialize(aInt32, fieldValue.getDataOutput()); |
| -+ recordBuilder.addField(MetadataRecordTypes.DATAVERSE_ARECORD_PENDINGOP_FIELD_INDEX, fieldValue); |
| -+ |
| - recordBuilder.write(tupleBuilder.getDataOutput(), true); |
| - tupleBuilder.addFieldEndOffset(); |
| - |
| -Index: asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entitytupletranslators/DatasetTupleTranslator.java |
| +Index: asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionContext.java |
| =================================================================== |
| ---- asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entitytupletranslators/DatasetTupleTranslator.java (revision 1061) |
| -+++ asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entitytupletranslators/DatasetTupleTranslator.java (working copy) |
| -@@ -77,9 +77,9 @@ |
| - protected ISerializerDeserializer<AInt32> aInt32Serde; |
| +--- asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionContext.java (revision 1194) |
| ++++ asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionContext.java (working copy) |
| +@@ -19,6 +19,7 @@ |
| + import java.util.HashSet; |
| + import java.util.List; |
| + import java.util.Set; |
| ++import java.util.concurrent.atomic.AtomicInteger; |
| |
| - @SuppressWarnings("unchecked") |
| -- public DatasetTupleTranslator(boolean getTuple) { |
| -+ public DatasetTupleTranslator(boolean getTuple) { |
| - super(getTuple, MetadataPrimaryIndexes.DATASET_DATASET.getFieldCount()); |
| -- aInt32 = new AMutableInt32(-1); |
| -+ aInt32 = new AMutableInt32(-1); |
| - aInt32Serde = AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.AINT32); |
| - } |
| - |
| -@@ -104,8 +104,10 @@ |
| - .getValueByPos(MetadataRecordTypes.DATASET_ARECORD_DATATYPENAME_FIELD_INDEX)).getStringValue(); |
| - DatasetType datasetType = DatasetType.valueOf(((AString) datasetRecord.getValueByPos(3)).getStringValue()); |
| - IDatasetDetails datasetDetails = null; |
| -- int datasetId = ((AInt32) datasetRecord |
| -+ int datasetId = ((AInt32) datasetRecord |
| - .getValueByPos(MetadataRecordTypes.DATASET_ARECORD_DATASETID_FIELD_INDEX)).getIntegerValue(); |
| -+ int pendingOp = ((AInt32) datasetRecord |
| -+ .getValueByPos(MetadataRecordTypes.DATASET_ARECORD_PENDINGOP_FIELD_INDEX)).getIntegerValue(); |
| - switch (datasetType) { |
| - case FEED: |
| - case INTERNAL: { |
| -@@ -197,7 +199,7 @@ |
| - } |
| - datasetDetails = new ExternalDatasetDetails(adapter, properties); |
| + import edu.uci.ics.asterix.transaction.management.exception.ACIDException; |
| + import edu.uci.ics.asterix.transaction.management.opcallbacks.AbstractOperationCallback; |
| +@@ -169,5 +170,14 @@ |
| + closeable.close(this); |
| } |
| -- return new Dataset(dataverseName, datasetName, typeName, datasetDetails, datasetType, datasetId); |
| -+ return new Dataset(dataverseName, datasetName, typeName, datasetDetails, datasetType, datasetId, pendingOp); |
| } |
| ++ |
| ++ @Override |
| ++ public int hashCode() { |
| ++ return jobId.getId(); |
| ++ } |
| |
| - @Override |
| -@@ -248,13 +250,19 @@ |
| - aString.setValue(Calendar.getInstance().getTime().toString()); |
| - stringSerde.serialize(aString, fieldValue.getDataOutput()); |
| - recordBuilder.addField(MetadataRecordTypes.DATASET_ARECORD_TIMESTAMP_FIELD_INDEX, fieldValue); |
| -- |
| -+ |
| - // write field 8 |
| - fieldValue.reset(); |
| - aInt32.setValue(dataset.getDatasetId()); |
| - aInt32Serde.serialize(aInt32, fieldValue.getDataOutput()); |
| - recordBuilder.addField(MetadataRecordTypes.DATASET_ARECORD_DATASETID_FIELD_INDEX, fieldValue); |
| -- |
| -+ |
| -+ // write field 9 |
| -+ fieldValue.reset(); |
| -+ aInt32.setValue(dataset.getPendingOp()); |
| -+ aInt32Serde.serialize(aInt32, fieldValue.getDataOutput()); |
| -+ recordBuilder.addField(MetadataRecordTypes.DATASET_ARECORD_PENDINGOP_FIELD_INDEX, fieldValue); |
| -+ |
| - // write record |
| - recordBuilder.write(tupleBuilder.getDataOutput(), true); |
| - tupleBuilder.addFieldEndOffset(); |
| -@@ -290,13 +298,15 @@ |
| - fieldValue.reset(); |
| - aString.setValue(name); |
| - stringSerde.serialize(aString, fieldValue.getDataOutput()); |
| -- propertyRecordBuilder.addField(MetadataRecordTypes.DATASOURCE_ADAPTER_PROPERTIES_ARECORD_NAME_FIELD_INDEX, fieldValue); |
| -+ propertyRecordBuilder.addField(MetadataRecordTypes.DATASOURCE_ADAPTER_PROPERTIES_ARECORD_NAME_FIELD_INDEX, |
| -+ fieldValue); |
| - |
| - // write field 1 |
| - fieldValue.reset(); |
| - aString.setValue(value); |
| - stringSerde.serialize(aString, fieldValue.getDataOutput()); |
| -- propertyRecordBuilder.addField(MetadataRecordTypes.DATASOURCE_ADAPTER_PROPERTIES_ARECORD_VALUE_FIELD_INDEX, fieldValue); |
| -+ propertyRecordBuilder.addField(MetadataRecordTypes.DATASOURCE_ADAPTER_PROPERTIES_ARECORD_VALUE_FIELD_INDEX, |
| -+ fieldValue); |
| - |
| - propertyRecordBuilder.write(out, true); |
| - } |
| -Index: asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entitytupletranslators/IndexTupleTranslator.java |
| ++ @Override |
| ++ public boolean equals(Object o) { |
| ++ return (o == this); |
| ++ } |
| + } |
| +Index: asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/LockManager.java |
| =================================================================== |
| ---- asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entitytupletranslators/IndexTupleTranslator.java (revision 1061) |
| -+++ asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entitytupletranslators/IndexTupleTranslator.java (working copy) |
| -@@ -96,13 +96,15 @@ |
| - } |
| - Boolean isPrimaryIndex = ((ABoolean) rec.getValueByPos(MetadataRecordTypes.INDEX_ARECORD_ISPRIMARY_FIELD_INDEX)) |
| - .getBoolean(); |
| -+ int pendingOp = ((AInt32) rec.getValueByPos(MetadataRecordTypes.INDEX_ARECORD_PENDINGOP_FIELD_INDEX)) |
| -+ .getIntegerValue(); |
| - // Check if there is a gram length as well. |
| - int gramLength = -1; |
| - int gramLenPos = rec.getType().findFieldPosition(GRAM_LENGTH_FIELD_NAME); |
| - if (gramLenPos >= 0) { |
| - gramLength = ((AInt32) rec.getValueByPos(gramLenPos)).getIntegerValue(); |
| - } |
| -- return new Index(dvName, dsName, indexName, indexStructure, searchKey, gramLength, isPrimaryIndex); |
| -+ return new Index(dvName, dsName, indexName, indexStructure, searchKey, gramLength, isPrimaryIndex, pendingOp); |
| - } |
| - |
| - @Override |
| -@@ -174,7 +176,12 @@ |
| - stringSerde.serialize(aString, fieldValue.getDataOutput()); |
| - recordBuilder.addField(MetadataRecordTypes.INDEX_ARECORD_TIMESTAMP_FIELD_INDEX, fieldValue); |
| - |
| -- // write optional field 7 |
| -+ // write field 7 |
| -+ fieldValue.reset(); |
| -+ intSerde.serialize(new AInt32(instance.getPendingOp()), fieldValue.getDataOutput()); |
| -+ recordBuilder.addField(MetadataRecordTypes.INDEX_ARECORD_PENDINGOP_FIELD_INDEX, fieldValue); |
| -+ |
| -+ // write optional field 8 |
| - if (instance.getGramLength() > 0) { |
| - fieldValue.reset(); |
| - nameValue.reset(); |
| -Index: asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java |
| +--- asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/LockManager.java (revision 1194) |
| ++++ asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/LockManager.java (working copy) |
| +@@ -567,7 +567,7 @@ |
| + if (commitFlag) { |
| + if (txnContext.getTransactionType().equals(TransactionContext.TransactionType.READ_WRITE)) { |
| + try { |
| +- txnSubsystem.getLogManager().log(LogType.COMMIT, txnContext, datasetId.getId(), |
| ++ txnSubsystem.getLogManager().log(LogType.ENTITY_COMMIT, txnContext, datasetId.getId(), |
| + entityHashValue, -1, (byte) 0, 0, null, null, logicalLogLocator); |
| + } catch (ACIDException e) { |
| + try { |
| +Index: asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/FileBasedBuffer.java |
| =================================================================== |
| ---- asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java (revision 1061) |
| -+++ asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java (working copy) |
| -@@ -129,7 +129,7 @@ |
| - |
| - public class AqlMetadataProvider implements IMetadataProvider<AqlSourceId, String> { |
| - private static Logger LOGGER = Logger.getLogger(AqlMetadataProvider.class.getName()); |
| -- private final MetadataTransactionContext mdTxnCtx; |
| -+ private MetadataTransactionContext mdTxnCtx; |
| - private boolean isWriteTransaction; |
| - private Map<String, String[]> stores; |
| - private Map<String, String> config; |
| -@@ -156,8 +156,7 @@ |
| - return config; |
| +--- asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/FileBasedBuffer.java (revision 1194) |
| ++++ asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/FileBasedBuffer.java (working copy) |
| +@@ -75,6 +75,7 @@ |
| + buffer.position(0); |
| + buffer.limit(size); |
| + fileChannel.write(buffer); |
| ++ fileChannel.force(false); |
| + erase(); |
| } |
| |
| -- public AqlMetadataProvider(MetadataTransactionContext mdTxnCtx, Dataverse defaultDataverse) { |
| -- this.mdTxnCtx = mdTxnCtx; |
| -+ public AqlMetadataProvider(Dataverse defaultDataverse) { |
| - this.defaultDataverse = defaultDataverse; |
| - this.stores = AsterixProperties.INSTANCE.getStores(); |
| - } |
| -@@ -181,6 +180,10 @@ |
| - public void setWriterFactory(IAWriterFactory writerFactory) { |
| - this.writerFactory = writerFactory; |
| - } |
| -+ |
| -+ public void setMetadataTxnContext(MetadataTransactionContext mdTxnCtx) { |
| -+ this.mdTxnCtx = mdTxnCtx; |
| -+ } |
| - |
| - public MetadataTransactionContext getMetadataTxnContext() { |
| - return mdTxnCtx; |
| -Index: asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entities/Dataset.java |
| +Index: asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogManager.java |
| =================================================================== |
| ---- asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entities/Dataset.java (revision 1061) |
| -+++ asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entities/Dataset.java (working copy) |
| -@@ -35,15 +35,18 @@ |
| - private final DatasetType datasetType; |
| - private IDatasetDetails datasetDetails; |
| - private final int datasetId; |
| -+ // Type of pending operations with respect to atomic DDL operation |
| -+ private final int pendingOp; |
| +--- asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogManager.java (revision 1194) |
| ++++ asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogManager.java (working copy) |
| +@@ -1,5 +1,5 @@ |
| + /* |
| +- * Copyright 2009-2010 by The Regents of the University of California |
| ++ * Copyright 2009-2012 by The Regents of the University of California |
| + * Licensed under the Apache License, Version 2.0 (the "License"); |
| + * you may not use this file except in compliance with the License. |
| + * you may obtain a copy of the License from |
| +@@ -21,7 +21,12 @@ |
| + import java.io.RandomAccessFile; |
| + import java.nio.ByteBuffer; |
| + import java.nio.channels.FileChannel; |
| ++import java.util.ArrayList; |
| ++import java.util.HashMap; |
| ++import java.util.List; |
| ++import java.util.Map; |
| + import java.util.Properties; |
| ++import java.util.Set; |
| + import java.util.concurrent.LinkedBlockingQueue; |
| + import java.util.concurrent.atomic.AtomicInteger; |
| + import java.util.concurrent.atomic.AtomicLong; |
| +@@ -30,22 +35,25 @@ |
| |
| - public Dataset(String dataverseName, String datasetName, String itemTypeName, IDatasetDetails datasetDetails, |
| -- DatasetType datasetType, int datasetId) { |
| -+ DatasetType datasetType, int datasetId, int pendingOp) { |
| - this.dataverseName = dataverseName; |
| - this.datasetName = datasetName; |
| - this.itemTypeName = itemTypeName; |
| - this.datasetType = datasetType; |
| - this.datasetDetails = datasetDetails; |
| - this.datasetId = datasetId; |
| -+ this.pendingOp = pendingOp; |
| - } |
| + import edu.uci.ics.asterix.transaction.management.exception.ACIDException; |
| + import edu.uci.ics.asterix.transaction.management.service.logging.IndexLogger.ReusableLogContentObject; |
| ++import edu.uci.ics.asterix.transaction.management.service.logging.LogManager.PageOwnershipStatus; |
| ++import edu.uci.ics.asterix.transaction.management.service.logging.LogManager.PageState; |
| + import edu.uci.ics.asterix.transaction.management.service.transaction.TransactionContext; |
| + import edu.uci.ics.asterix.transaction.management.service.transaction.TransactionManagementConstants; |
| + import edu.uci.ics.asterix.transaction.management.service.transaction.TransactionSubsystem; |
| ++import edu.uci.ics.hyracks.api.exceptions.HyracksDataException; |
| |
| - public String getDataverseName() { |
| -@@ -73,6 +76,10 @@ |
| - public int getDatasetId() { |
| - return datasetId; |
| - } |
| -+ |
| -+ public int getPendingOp() { |
| -+ return pendingOp; |
| -+ } |
| + public class LogManager implements ILogManager { |
| |
| - @Override |
| - public Object addToCache(MetadataCache cache) { |
| -Index: asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entities/Index.java |
| -=================================================================== |
| ---- asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entities/Index.java (revision 1061) |
| -+++ asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entities/Index.java (working copy) |
| -@@ -45,9 +45,11 @@ |
| - private final boolean isPrimaryIndex; |
| - // Specific to NGRAM indexes. |
| - private final int gramLength; |
| -+ // Type of pending operations with respect to atomic DDL operation |
| -+ private final int pendingOp; |
| + public static final boolean IS_DEBUG_MODE = false;//true |
| + private static final Logger LOGGER = Logger.getLogger(LogManager.class.getName()); |
| +- private TransactionSubsystem provider; |
| ++ private final TransactionSubsystem provider; |
| + private LogManagerProperties logManagerProperties; |
| ++ private LogPageFlushThread logPageFlusher; |
| |
| - public Index(String dataverseName, String datasetName, String indexName, IndexType indexType, |
| -- List<String> keyFieldNames, int gramLength, boolean isPrimaryIndex) { |
| -+ List<String> keyFieldNames, int gramLength, boolean isPrimaryIndex, int pendingOp) { |
| - this.dataverseName = dataverseName; |
| - this.datasetName = datasetName; |
| - this.indexName = indexName; |
| -@@ -55,10 +57,11 @@ |
| - this.keyFieldNames = keyFieldNames; |
| - this.gramLength = gramLength; |
| - this.isPrimaryIndex = isPrimaryIndex; |
| -+ this.pendingOp = pendingOp; |
| - } |
| + /* |
| + * the array of log pages. The number of log pages is configurable. Pages |
| + * taken together form an in-memory log buffer. |
| + */ |
| +- |
| + private IFileBasedBuffer[] logPages; |
| |
| - public Index(String dataverseName, String datasetName, String indexName, IndexType indexType, |
| -- List<String> keyFieldNames, boolean isPrimaryIndex) { |
| -+ List<String> keyFieldNames, boolean isPrimaryIndex, int pendingOp) { |
| - this.dataverseName = dataverseName; |
| - this.datasetName = datasetName; |
| - this.indexName = indexName; |
| -@@ -66,6 +69,7 @@ |
| - this.keyFieldNames = keyFieldNames; |
| - this.gramLength = -1; |
| - this.isPrimaryIndex = isPrimaryIndex; |
| -+ this.pendingOp = pendingOp; |
| - } |
| + private ILogRecordHelper logRecordHelper; |
| +@@ -54,6 +62,7 @@ |
| + * Number of log pages that constitute the in-memory log buffer. |
| + */ |
| + private int numLogPages; |
| ++ |
| + /* |
| + * Initially all pages have an owner count of 1 that is the LogManager. When |
| + * a transaction requests to write in a log page, the owner count is |
| +@@ -62,12 +71,11 @@ |
| + * (covering the whole log record). When the content has been put, the log |
| + * manager computes the checksum and puts it after the content. At this |
| + * point, the ownership count is decremented as the transaction is done with |
| +- * using the page. When a page is full, the log manager decrements the count |
| +- * by one indicating that it has released its ownership of the log page. |
| +- * There could be other transaction(s) still owning the page (that is they |
| +- * could still be mid-way putting the log content). When the ownership count |
| +- * eventually reaches zero, the thread responsible for flushing the log page |
| +- * is notified and the page is flushed to disk. |
| ++ * using the page. When a page is requested to be flushed, logPageFlusher |
| ++ * set the count to 0(LOG_FLUSHER: meaning that the page is being flushed) |
| ++ * only if the count is 1(LOG_WRITER: meaning that there is no other |
| ++ * transactions who own the page to write logs.) After flushing the page, |
| ++ * logPageFlusher set this count to 1. |
| + */ |
| + private AtomicInteger[] logPageOwnerCount; |
| |
| - public String getDataverseName() { |
| -@@ -95,6 +99,10 @@ |
| - public boolean isPrimaryIndex() { |
| - return isPrimaryIndex; |
| - } |
| -+ |
| -+ public int getPendingOp() { |
| -+ return pendingOp; |
| -+ } |
| +@@ -78,18 +86,16 @@ |
| |
| - public boolean isSecondaryIndex() { |
| - return !isPrimaryIndex(); |
| -Index: asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entities/Dataverse.java |
| -=================================================================== |
| ---- asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entities/Dataverse.java (revision 1061) |
| -+++ asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entities/Dataverse.java (working copy) |
| -@@ -27,10 +27,12 @@ |
| - // Enforced to be unique within an Asterix cluster.. |
| - private final String dataverseName; |
| - private final String dataFormat; |
| -+ private final int pendingOp; |
| + /* |
| + * LogPageStatus: A page is either ACTIVE or INACTIVE. The status for each |
| +- * page is maintained in a map called logPageStatus. A page is ACTIVE when |
| +- * the LogManager can allocate space in the page for writing a log record. |
| +- * Initially all pages are ACTIVE. As transactions fill up space by writing |
| +- * log records, a page may not have sufficient space left for serving a |
| +- * request by a transaction. When this happens, the page is marked INACTIVE. |
| +- * An INACTIVE page with no owners ( logPageOwnerCount.get(<pageIndex>) == |
| +- * 0) indicates that the page must be flushed to disk before any other log |
| +- * record is written on the page.F |
| ++ * page is maintained in logPageStatus. A page is ACTIVE when the LogManager |
| ++ * can allocate space in the page for writing a log record. Initially all |
| ++ * pages are ACTIVE. As transactions fill up space by writing log records, |
| ++ * a page may not have sufficient space left for serving a request by a |
| ++ * transaction. When this happens, the page is flushed to disk by calling |
| ++ * logPageFlusher.requestFlush(). In the requestFlush(), after groupCommitWaitTime, |
| ++ * the page status is set to INACTIVE. Then, there is no more writer on the |
| ++ * page(meaning the corresponding logPageOwnerCount is 1), the page is flushed |
| ++ * by the logPageFlusher and the status is reset to ACTIVE by the logPageFlusher. |
| + */ |
| +- |
| +- // private Map<Integer, Integer> logPageStatus = new |
| +- // ConcurrentHashMap<Integer, Integer>(); |
| + private AtomicInteger[] logPageStatus; |
| |
| -- public Dataverse(String dataverseName, String format) { |
| -+ public Dataverse(String dataverseName, String format, int pendingOp) { |
| - this.dataverseName = dataverseName; |
| - this.dataFormat = format; |
| -+ this.pendingOp = pendingOp; |
| + static class PageState { |
| +@@ -98,41 +104,8 @@ |
| } |
| |
| - public String getDataverseName() { |
| -@@ -40,6 +42,10 @@ |
| - public String getDataFormat() { |
| - return dataFormat; |
| - } |
| -+ |
| -+ public int getPendingOp() { |
| -+ return pendingOp; |
| -+ } |
| + private AtomicLong lastFlushedLsn = new AtomicLong(-1); |
| +- private AtomicInteger lastFlushedPage = new AtomicInteger(-1); |
| |
| - @Override |
| - public Object addToCache(MetadataCache cache) { |
| -Index: asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataNode.java |
| -=================================================================== |
| ---- asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataNode.java (revision 1061) |
| -+++ asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataNode.java (working copy) |
| -@@ -25,6 +25,7 @@ |
| - import edu.uci.ics.asterix.common.exceptions.AsterixException; |
| - import edu.uci.ics.asterix.common.functions.FunctionSignature; |
| - import edu.uci.ics.asterix.formats.nontagged.AqlSerializerDeserializerProvider; |
| -+import edu.uci.ics.asterix.metadata.api.IMetadataEntity; |
| - import edu.uci.ics.asterix.metadata.api.IMetadataIndex; |
| - import edu.uci.ics.asterix.metadata.api.IMetadataNode; |
| - import edu.uci.ics.asterix.metadata.api.IValueExtractor; |
| -@@ -160,7 +161,7 @@ |
| - // Add the primary index for the dataset. |
| - InternalDatasetDetails id = (InternalDatasetDetails) dataset.getDatasetDetails(); |
| - Index primaryIndex = new Index(dataset.getDataverseName(), dataset.getDatasetName(), |
| -- dataset.getDatasetName(), IndexType.BTREE, id.getPrimaryKey(), true); |
| -+ dataset.getDatasetName(), IndexType.BTREE, id.getPrimaryKey(), true, dataset.getPendingOp()); |
| - addIndex(jobId, primaryIndex); |
| - ITupleReference nodeGroupTuple = createTuple(id.getNodeGroupName(), dataset.getDataverseName(), |
| - dataset.getDatasetName()); |
| -@@ -260,7 +261,7 @@ |
| - IIndexAccessor indexAccessor = indexInstance.createAccessor(NoOpOperationCallback.INSTANCE, |
| - NoOpOperationCallback.INSTANCE); |
| - TransactionContext txnCtx = transactionProvider.getTransactionManager().getTransactionContext(jobId); |
| -- transactionProvider.getLockManager().lock(index.getDatasetId(), -1, LockMode.X, txnCtx); |
| -+ //transactionProvider.getLockManager().lock(index.getDatasetId(), -1, LockMode.X, txnCtx); |
| - // TODO: fix exceptions once new BTree exception model is in hyracks. |
| - indexAccessor.insert(tuple); |
| - //TODO: extract the key from the tuple and get the PKHashValue from the key. |
| -@@ -536,7 +537,7 @@ |
| - // The transaction with txnId will have an S lock on the |
| - // resource. Note that lock converters have a higher priority than |
| - // regular waiters in the LockManager. |
| -- transactionProvider.getLockManager().lock(index.getDatasetId(), -1, LockMode.X, txnCtx); |
| -+ //transactionProvider.getLockManager().lock(index.getDatasetId(), -1, LockMode.X, txnCtx); |
| - indexAccessor.delete(tuple); |
| - //TODO: extract the key from the tuple and get the PKHashValue from the key. |
| - //check how to get the oldValue. |
| -@@ -803,7 +804,9 @@ |
| - private <ResultType> void searchIndex(JobId jobId, IMetadataIndex index, ITupleReference searchKey, |
| - IValueExtractor<ResultType> valueExtractor, List<ResultType> results) throws Exception { |
| - TransactionContext txnCtx = transactionProvider.getTransactionManager().getTransactionContext(jobId); |
| -- transactionProvider.getLockManager().lock(index.getDatasetId(), -1, LockMode.S, txnCtx); |
| -+ //#. currently lock is not needed to access any metadata |
| -+ // since the non-compatible concurrent access is always protected by the latch in the MetadataManager. |
| -+ //transactionProvider.getLockManager().lock(index.getDatasetId(), -1, LockMode.S, txnCtx); |
| - IBinaryComparatorFactory[] comparatorFactories = index.getKeyBinaryComparatorFactory(); |
| - long resourceID = index.getResourceID(); |
| - IIndex indexInstance = indexLifecycleManager.getIndex(resourceID); |
| -Index: asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/api/IMetadataEntity.java |
| -=================================================================== |
| ---- asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/api/IMetadataEntity.java (revision 1061) |
| -+++ asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/api/IMetadataEntity.java (working copy) |
| -@@ -20,6 +20,11 @@ |
| - import edu.uci.ics.asterix.metadata.MetadataCache; |
| - |
| - public interface IMetadataEntity extends Serializable { |
| -+ |
| -+ public static final int PENDING_NO_OP = 0; |
| -+ public static final int PENDING_ADD_OP = 1; |
| -+ public static final int PENDING_DROP_OP = 2; |
| -+ |
| - Object addToCache(MetadataCache cache); |
| - |
| - Object dropFromCache(MetadataCache cache); |
| -Index: asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataManager.java |
| -=================================================================== |
| ---- asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataManager.java (revision 1061) |
| -+++ asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataManager.java (working copy) |
| -@@ -17,6 +17,8 @@ |
| - |
| - import java.rmi.RemoteException; |
| - import java.util.List; |
| -+import java.util.concurrent.locks.ReadWriteLock; |
| -+import java.util.concurrent.locks.ReentrantReadWriteLock; |
| - |
| - import edu.uci.ics.asterix.common.functions.FunctionSignature; |
| - import edu.uci.ics.asterix.metadata.api.IAsterixStateProxy; |
| -@@ -79,11 +81,10 @@ |
| - public class MetadataManager implements IMetadataManager { |
| - // Set in init(). |
| - public static MetadataManager INSTANCE; |
| + /* |
| +- * pendingFlushRequests is a map with key as Integer denoting the page |
| +- * index. When a (transaction) thread discovers the need to flush a page, it |
| +- * puts its Thread object into the corresponding value that is a |
| +- * LinkedBlockingQueue. The LogManager has a LogFlusher thread that scans |
| +- * this map in order of page index (and circling around). The flusher thread |
| +- * needs to flush pages in order and waits for a thread to deposit an object |
| +- * in the blocking queue corresponding to the next page in order. A request |
| +- * to flush a page is conveyed to the flush thread by simply depositing an |
| +- * object in to corresponding blocking queue. It is blocking in the sense |
| +- * that the flusher thread will continue to wait for an object to arrive in |
| +- * the queue. The object itself is ignored by the fliusher and just acts as |
| +- * a signal/event that a page needs to be flushed. |
| +- */ |
| - |
| - private final MetadataCache cache = new MetadataCache(); |
| - private IAsterixStateProxy proxy; |
| - private IMetadataNode metadataNode; |
| +- private LinkedBlockingQueue[] pendingFlushRequests; |
| - |
| -+ |
| - public MetadataManager(IAsterixStateProxy proxy) { |
| - if (proxy == null) { |
| - throw new Error("Null proxy given to MetadataManager."); |
| -@@ -206,11 +207,14 @@ |
| +- /* |
| +- * ICommitResolver is an interface that provides an API that can answer a |
| +- * simple boolean - Given the commit requests so far, should a page be |
| +- * flushed. The implementation of the interface contains the logic (or you |
| +- * can say the policy) for commit. It could be group commit in which case |
| +- * the commit resolver may not return a true indicating that it wishes to |
| +- * delay flushing of the page. |
| +- */ |
| +- private ICommitResolver commitResolver; |
| +- |
| +- /* |
| +- * An object that keeps track of the submitted commit requests. |
| +- */ |
| +- private CommitRequestStatistics commitRequestStatistics; |
| +- |
| +- /* |
| + * When the transaction eco-system comes to life, the log manager positions |
| + * itself to the end of the last written log. the startingLsn represent the |
| + * lsn value of the next log record to be written after a system (re)start. |
| +@@ -146,16 +119,10 @@ |
| + */ |
| + private AtomicLong lsn = new AtomicLong(0); |
| |
| - @Override |
| - public void addDataset(MetadataTransactionContext ctx, Dataset dataset) throws MetadataException { |
| -+ // add dataset into metadataNode |
| - try { |
| - metadataNode.addDataset(ctx.getJobId(), dataset); |
| - } catch (RemoteException e) { |
| - throw new MetadataException(e); |
| - } |
| -+ |
| -+ // reflect the dataset into the cache |
| - ctx.addDataset(dataset); |
| +- /* |
| +- * A map that tracks the flush requests submitted for each page. The |
| +- * requests for a page are cleared when the page is flushed. |
| +- */ |
| +- public LinkedBlockingQueue<Thread> getPendingFlushRequests(int pageIndex) { |
| +- return pendingFlushRequests[pageIndex]; |
| +- } |
| ++ private List<HashMap<TransactionContext, Integer>> activeTxnCountMaps; |
| + |
| +- public void addFlushRequest(int pageIndex) { |
| +- pendingFlushRequests[pageIndex].add(pendingFlushRequests); |
| ++ public void addFlushRequest(int pageIndex, long lsn, boolean isSynchronous) { |
| ++ logPageFlusher.requestFlush(pageIndex, lsn, isSynchronous); |
| } |
| |
| -@@ -585,4 +589,5 @@ |
| + public AtomicLong getLastFlushedLsn() { |
| +@@ -233,19 +200,12 @@ |
| + numLogPages = logManagerProperties.getNumLogPages(); |
| + logPageOwnerCount = new AtomicInteger[numLogPages]; |
| + logPageStatus = new AtomicInteger[numLogPages]; |
| +- pendingFlushRequests = new LinkedBlockingQueue[numLogPages]; |
| +- if (logManagerProperties.getGroupCommitWaitPeriod() > 0) { // configure |
| +- // the |
| +- // Commit |
| +- // Resolver |
| +- commitResolver = new GroupCommitResolver(); // Group Commit is |
| +- // enabled |
| +- commitRequestStatistics = new CommitRequestStatistics(numLogPages); |
| +- } else { |
| +- commitResolver = new BasicCommitResolver(); // the basic commit |
| +- // resolver |
| ++ |
| ++ activeTxnCountMaps = new ArrayList<HashMap<TransactionContext, Integer>>(numLogPages); |
| ++ for (int i = 0; i < numLogPages; i++) { |
| ++ activeTxnCountMaps.add(new HashMap<TransactionContext, Integer>()); |
| } |
| - return adapter; |
| - } |
| +- this.commitResolver.init(this); // initialize the commit resolver |
| + |
| - } |
| -\ No newline at end of file |
| -Index: asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataTransactionContext.java |
| -=================================================================== |
| ---- asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataTransactionContext.java (revision 1061) |
| -+++ asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataTransactionContext.java (working copy) |
| -@@ -19,6 +19,7 @@ |
| + logPages = new FileBasedBuffer[numLogPages]; |
| |
| - import edu.uci.ics.asterix.common.functions.FunctionSignature; |
| - import edu.uci.ics.asterix.external.dataset.adapter.AdapterIdentifier; |
| -+import edu.uci.ics.asterix.metadata.api.IMetadataEntity; |
| - import edu.uci.ics.asterix.metadata.entities.Dataset; |
| - import edu.uci.ics.asterix.metadata.entities.DatasourceAdapter; |
| - import edu.uci.ics.asterix.metadata.entities.Datatype; |
| -@@ -104,19 +105,19 @@ |
| - } |
| + /* |
| +@@ -264,7 +224,6 @@ |
| + for (int i = 0; i < numLogPages; i++) { |
| + logPageOwnerCount[i] = new AtomicInteger(PageOwnershipStatus.LOG_WRITER); |
| + logPageStatus[i] = new AtomicInteger(PageState.ACTIVE); |
| +- pendingFlushRequests[i] = new LinkedBlockingQueue<Thread>(); |
| + } |
| |
| - public void dropDataset(String dataverseName, String datasetName) { |
| -- Dataset dataset = new Dataset(dataverseName, datasetName, null, null, null, -1); |
| -+ Dataset dataset = new Dataset(dataverseName, datasetName, null, null, null, -1, IMetadataEntity.PENDING_NO_OP); |
| - droppedCache.addDatasetIfNotExists(dataset); |
| - logAndApply(new MetadataLogicalOperation(dataset, false)); |
| + /* |
| +@@ -278,9 +237,9 @@ |
| + * daemon thread so that it does not stop the JVM from exiting when all |
| + * other threads are done with their work. |
| + */ |
| +- LogPageFlushThread logFlusher = new LogPageFlushThread(this); |
| +- logFlusher.setDaemon(true); |
| +- logFlusher.start(); |
| ++ logPageFlusher = new LogPageFlushThread(this); |
| ++ logPageFlusher.setDaemon(true); |
| ++ logPageFlusher.start(); |
| } |
| |
| - public void dropIndex(String dataverseName, String datasetName, String indexName) { |
| -- Index index = new Index(dataverseName, datasetName, indexName, null, null, false); |
| -+ Index index = new Index(dataverseName, datasetName, indexName, null, null, false, IMetadataEntity.PENDING_NO_OP); |
| - droppedCache.addIndexIfNotExists(index); |
| - logAndApply(new MetadataLogicalOperation(index, false)); |
| - } |
| - |
| - public void dropDataverse(String dataverseName) { |
| -- Dataverse dataverse = new Dataverse(dataverseName, null); |
| -+ Dataverse dataverse = new Dataverse(dataverseName, null, IMetadataEntity.PENDING_NO_OP); |
| - droppedCache.addDataverseIfNotExists(dataverse); |
| - logAndApply(new MetadataLogicalOperation(dataverse, false)); |
| - } |
| -@@ -162,7 +163,7 @@ |
| + public int getLogPageIndex(long lsnValue) { |
| +@@ -312,7 +271,7 @@ |
| + */ |
| + private void waitUntillPageIsAvailableForWritingLog(int pageIndex) throws ACIDException { |
| + if (logPageStatus[pageIndex].get() == PageState.ACTIVE |
| +- && getLogPageOwnershipCount(pageIndex).get() >= PageOwnershipStatus.LOG_WRITER) { |
| ++ && logPageOwnerCount[pageIndex].get() >= PageOwnershipStatus.LOG_WRITER) { |
| + return; |
| } |
| - return droppedCache.getDataset(dataverseName, datasetName) != null; |
| - } |
| -- |
| + try { |
| +@@ -338,47 +297,40 @@ |
| + */ |
| + private long getLsn(int entrySize, byte logType) throws ACIDException { |
| + long pageSize = logManagerProperties.getLogPageSize(); |
| +- boolean requiresFlushing = logType == LogType.COMMIT; |
| + |
| - public boolean indexIsDropped(String dataverseName, String datasetName, String indexName) { |
| - if (droppedCache.getDataverse(dataverseName) != null) { |
| - return true; |
| -Index: asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataRecordTypes.java |
| -=================================================================== |
| ---- asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataRecordTypes.java (revision 1061) |
| -+++ asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataRecordTypes.java (working copy) |
| -@@ -80,10 +80,11 @@ |
| - public static final int DATAVERSE_ARECORD_NAME_FIELD_INDEX = 0; |
| - public static final int DATAVERSE_ARECORD_FORMAT_FIELD_INDEX = 1; |
| - public static final int DATAVERSE_ARECORD_TIMESTAMP_FIELD_INDEX = 2; |
| -+ public static final int DATAVERSE_ARECORD_PENDINGOP_FIELD_INDEX = 3; |
| + while (true) { |
| + boolean forwardPage = false; |
| +- boolean shouldFlushPage = false; |
| + long old = lsn.get(); |
| +- int pageIndex = getLogPageIndex(old); // get the log page |
| +- // corresponding to the |
| +- // current lsn value |
| ++ |
| ++ //get the log page corresponding to the current lsn value |
| ++ int pageIndex = getLogPageIndex(old); |
| + long retVal = old; |
| +- long next = old + entrySize; // the lsn value for the next request, |
| +- // if the current request is served. |
| ++ |
| ++ // the lsn value for the next request if the current request is served. |
| ++ long next = old + entrySize; |
| + int prevPage = -1; |
| +- if ((next - 1) / pageSize != old / pageSize // check if the log |
| +- // record will cross |
| +- // page boundaries, a |
| +- // case that is not |
| +- // allowed. |
| +- || (next % pageSize == 0)) { |
| ++ |
| ++ // check if the log record will cross page boundaries, a case that is not allowed. |
| ++ if ((next - 1) / pageSize != old / pageSize || (next % pageSize == 0)) { |
| ++ |
| + if ((old != 0 && old % pageSize == 0)) { |
| +- retVal = old; // On second thought, this shall never be the |
| +- // case as it means that the lsn is |
| +- // currently at the beginning of a page and |
| +- // we still need to forward the page which |
| +- // means that the entrySize exceeds a log |
| +- // page size. If this is the case, an |
| +- // exception is thrown before calling this |
| +- // API. |
| +- // would remove this case. |
| ++ // On second thought, this shall never be the case as it means that the lsn is |
| ++ // currently at the beginning of a page and we still need to forward the page which |
| ++ // means that the entrySize exceeds a log page size. If this is the case, an |
| ++ // exception is thrown before calling this API. would remove this case. |
| ++ retVal = old; |
| |
| - private static final ARecordType createDataverseRecordType() { |
| -- return new ARecordType("DataverseRecordType", new String[] { "DataverseName", "DataFormat", "Timestamp" }, |
| -- new IAType[] { BuiltinType.ASTRING, BuiltinType.ASTRING, BuiltinType.ASTRING }, true); |
| -+ return new ARecordType("DataverseRecordType", new String[] { "DataverseName", "DataFormat", "Timestamp", "PendingOp" }, |
| -+ new IAType[] { BuiltinType.ASTRING, BuiltinType.ASTRING, BuiltinType.ASTRING, BuiltinType.AINT32 }, true); |
| - } |
| + } else { |
| +- retVal = ((old / pageSize) + 1) * pageSize; // set the lsn |
| +- // to point to |
| +- // the beginning |
| +- // of the next |
| +- // page. |
| ++ // set the lsn to point to the beginning of the next page. |
| ++ retVal = ((old / pageSize) + 1) * pageSize; |
| + } |
| ++ |
| + next = retVal; |
| +- forwardPage = true; // as the log record shall cross log page |
| +- // boundary, we must re-assign the lsn (so |
| +- // that the log record begins on a different |
| +- // location. |
| ++ |
| ++ // as the log record shall cross log page boundary, we must re-assign the lsn so |
| ++ // that the log record begins on a different location. |
| ++ forwardPage = true; |
| ++ |
| + prevPage = pageIndex; |
| + pageIndex = getNextPageInSequence(pageIndex); |
| + } |
| +@@ -397,109 +349,51 @@ |
| + */ |
| + waitUntillPageIsAvailableForWritingLog(pageIndex); |
| |
| - // Helper constants for accessing fields in an ARecord of anonymous type |
| -@@ -158,10 +159,11 @@ |
| - public static final int DATASET_ARECORD_FEEDDETAILS_FIELD_INDEX = 6; |
| - public static final int DATASET_ARECORD_TIMESTAMP_FIELD_INDEX = 7; |
| - public static final int DATASET_ARECORD_DATASETID_FIELD_INDEX = 8; |
| -+ public static final int DATASET_ARECORD_PENDINGOP_FIELD_INDEX = 9; |
| +- if (!forwardPage && requiresFlushing) { |
| +- shouldFlushPage = commitResolver.shouldCommitPage(pageIndex, this, commitRequestStatistics); |
| +- if (shouldFlushPage) { |
| +- next = ((next / pageSize) + 1) * pageSize; /* |
| +- * next |
| +- * represents the |
| +- * next value of |
| +- * lsn after this |
| +- * log record has |
| +- * been written. |
| +- * If the page |
| +- * needs to be |
| +- * flushed, then |
| +- * we do not give |
| +- * any more LSNs |
| +- * from this |
| +- * page. |
| +- */ |
| +- } |
| +- } |
| +- if (!lsn.compareAndSet(old, next)) { // Atomic call -> returns true |
| +- // only when the value |
| +- // represented by lsn is same as |
| +- // "old". The value is updated |
| +- // to "next". |
| ++ if (!lsn.compareAndSet(old, next)) { |
| ++ // Atomic call -> returns true only when the value represented by lsn is same as |
| ++ // "old". The value is updated to "next". |
| + continue; |
| + } |
| |
| - private static final ARecordType createDatasetRecordType() { |
| - String[] fieldNames = { "DataverseName", "DatasetName", "DataTypeName", "DatasetType", "InternalDetails", |
| -- "ExternalDetails", "FeedDetails", "Timestamp", "DatasetId" }; |
| -+ "ExternalDetails", "FeedDetails", "Timestamp", "DatasetId", "PendingOp" }; |
| + if (forwardPage) { |
| +- //TODO |
| +- //this is not safe since the incoming thread may reach the same page slot with this page |
| +- //(differ by the log buffer size) |
| +- logPageStatus[prevPage].set(PageState.INACTIVE); // mark |
| +- // previous |
| +- // page |
| +- // inactive |
| ++ addFlushRequest(prevPage, old, false); |
| |
| - List<IAType> internalRecordUnionList = new ArrayList<IAType>(); |
| - internalRecordUnionList.add(BuiltinType.ANULL); |
| -@@ -179,7 +181,8 @@ |
| - AUnionType feedRecordUnion = new AUnionType(feedRecordUnionList, null); |
| - |
| - IAType[] fieldTypes = { BuiltinType.ASTRING, BuiltinType.ASTRING, BuiltinType.ASTRING, BuiltinType.ASTRING, |
| -- internalRecordUnion, externalRecordUnion, feedRecordUnion, BuiltinType.ASTRING, BuiltinType.AINT32 }; |
| -+ internalRecordUnion, externalRecordUnion, feedRecordUnion, BuiltinType.ASTRING, BuiltinType.AINT32, |
| -+ BuiltinType.AINT32 }; |
| - return new ARecordType("DatasetRecordType", fieldNames, fieldTypes, true); |
| +- /* |
| +- * decrement on the behalf of the log manager. if there are no |
| +- * more owners (count == 0) the page must be marked as a |
| +- * candidate to be flushed. |
| +- */ |
| +- int pageDirtyCount = getLogPageOwnershipCount(prevPage).decrementAndGet(); |
| +- if (pageDirtyCount == 0) { |
| +- addFlushRequest(prevPage); |
| +- } |
| +- |
| +- /* |
| +- * The transaction thread that discovers the need to forward a |
| +- * page is made to re-acquire a lsn. |
| +- */ |
| ++ // The transaction thread that discovers the need to forward a |
| ++ // page is made to re-acquire a lsn. |
| + continue; |
| ++ |
| + } else { |
| +- /* |
| +- * the transaction thread has been given a space in a log page, |
| +- * but is made to wait until the page is available. |
| +- */ |
| ++ // the transaction thread has been given a space in a log page, |
| ++ // but is made to wait until the page is available. |
| ++ // (Is this needed? when does this wait happen?) |
| + waitUntillPageIsAvailableForWritingLog(pageIndex); |
| +- /* |
| +- * increment the counter as the transaction thread now holds a |
| +- * space in the log page and hence is an owner. |
| +- */ |
| ++ |
| ++ // increment the counter as the transaction thread now holds a |
| ++ // space in the log page and hence is an owner. |
| + logPageOwnerCount[pageIndex].incrementAndGet(); |
| +- } |
| +- if (requiresFlushing) { |
| +- if (!shouldFlushPage) { |
| +- /* |
| +- * the log record requires the page to be flushed but under |
| +- * the commit policy, the flush task has been deferred. The |
| +- * transaction thread submits its request to flush the page. |
| +- */ |
| +- commitRequestStatistics.registerCommitRequest(pageIndex); |
| +- } else { |
| +- /* |
| +- * the flush request was approved by the commit resolver. |
| +- * Thus the page is marked INACTIVE as no more logs will be |
| +- * written on this page. The log manager needs to release |
| +- * its ownership. Note that transaction threads may still |
| +- * continue to be owners of the log page till they fill up |
| +- * the space allocated to them. |
| +- */ |
| +- logPageStatus[pageIndex].set(PageState.INACTIVE); |
| +- logPageOwnerCount[pageIndex].decrementAndGet(); // on |
| +- // the |
| +- // behalf |
| +- // of |
| +- // log |
| +- // manager |
| ++ |
| ++ // Before the count is incremented, if the flusher flushed the allocated page, |
| ++ // then retry to get new LSN. Otherwise, the log with allocated lsn will be lost. |
| ++ if (lastFlushedLsn.get() >= retVal) { |
| ++ logPageOwnerCount[pageIndex].decrementAndGet(); |
| ++ continue; |
| + } |
| + } |
| ++ |
| + return retVal; |
| + } |
| } |
| |
| -@@ -264,13 +267,14 @@ |
| - public static final int INDEX_ARECORD_SEARCHKEY_FIELD_INDEX = 4; |
| - public static final int INDEX_ARECORD_ISPRIMARY_FIELD_INDEX = 5; |
| - public static final int INDEX_ARECORD_TIMESTAMP_FIELD_INDEX = 6; |
| -+ public static final int INDEX_ARECORD_PENDINGOP_FIELD_INDEX = 7; |
| + @Override |
| +- public void log(byte logType, TransactionContext context, int datasetId, int PKHashValue, long resourceId, |
| ++ public void log(byte logType, TransactionContext txnCtx, int datasetId, int PKHashValue, long resourceId, |
| + byte resourceMgrId, int logContentSize, ReusableLogContentObject reusableLogContentObject, ILogger logger, |
| + LogicalLogLocator logicalLogLocator) throws ACIDException { |
| +- /* |
| +- * logLocator is a re-usable object that is appropriately set in each |
| +- * invocation. If the reference is null, the log manager must throw an |
| +- * exception |
| +- */ |
| ++ |
| ++ HashMap<TransactionContext, Integer> map = null; |
| ++ int activeTxnCount; |
| ++ |
| ++ // logLocator is a re-usable object that is appropriately set in each invocation. |
| ++ // If the reference is null, the log manager must throw an exception. |
| + if (logicalLogLocator == null) { |
| + throw new ACIDException( |
| + " you need to pass in a non-null logLocator, if you dont have it, then pass in a dummy so that the +" |
| +@@ -519,20 +413,19 @@ |
| |
| - private static final ARecordType createIndexRecordType() { |
| - AOrderedListType olType = new AOrderedListType(BuiltinType.ASTRING, null); |
| - String[] fieldNames = { "DataverseName", "DatasetName", "IndexName", "IndexStructure", "SearchKey", |
| -- "IsPrimary", "Timestamp" }; |
| -+ "IsPrimary", "Timestamp", "PendingOp" }; |
| - IAType[] fieldTypes = { BuiltinType.ASTRING, BuiltinType.ASTRING, BuiltinType.ASTRING, BuiltinType.ASTRING, |
| -- olType, BuiltinType.ABOOLEAN, BuiltinType.ASTRING }; |
| -+ olType, BuiltinType.ABOOLEAN, BuiltinType.ASTRING, BuiltinType.AINT32 }; |
| - return new ARecordType("IndexRecordType", fieldNames, fieldTypes, true); |
| - }; |
| + // all constraints checked and we are good to go and acquire a lsn. |
| + long previousLSN = -1; |
| +- long currentLSN; // the will be set to the location (a long value) |
| +- // where the log record needs to be placed. |
| |
| -Index: asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataBootstrap.java |
| -=================================================================== |
| ---- asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataBootstrap.java (revision 1061) |
| -+++ asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataBootstrap.java (working copy) |
| -@@ -31,6 +31,7 @@ |
| - import edu.uci.ics.asterix.metadata.IDatasetDetails; |
| - import edu.uci.ics.asterix.metadata.MetadataManager; |
| - import edu.uci.ics.asterix.metadata.MetadataTransactionContext; |
| -+import edu.uci.ics.asterix.metadata.api.IMetadataEntity; |
| - import edu.uci.ics.asterix.metadata.api.IMetadataIndex; |
| - import edu.uci.ics.asterix.metadata.entities.AsterixBuiltinTypeMap; |
| - import edu.uci.ics.asterix.metadata.entities.Dataset; |
| -@@ -226,7 +227,7 @@ |
| - public static void insertInitialDataverses(MetadataTransactionContext mdTxnCtx) throws Exception { |
| - String dataverseName = MetadataPrimaryIndexes.DATAVERSE_DATASET.getDataverseName(); |
| - String dataFormat = NonTaggedDataFormat.NON_TAGGED_DATA_FORMAT; |
| -- MetadataManager.INSTANCE.addDataverse(mdTxnCtx, new Dataverse(dataverseName, dataFormat)); |
| -+ MetadataManager.INSTANCE.addDataverse(mdTxnCtx, new Dataverse(dataverseName, dataFormat, IMetadataEntity.PENDING_NO_OP)); |
| - } |
| +- /* |
| +- * The logs written by a transaction need to be linked to each other for |
| +- * a successful rollback/recovery. However there could be multiple |
| +- * threads operating concurrently that are part of a common transaction. |
| +- * These threads need to synchronize and record the lsn corresponding to |
| +- * the last log record written by (any thread of) the transaction. |
| +- */ |
| +- synchronized (context) { |
| +- previousLSN = context.getLastLogLocator().getLsn(); |
| ++ // the will be set to the location (a long value) where the log record needs to be placed. |
| ++ long currentLSN; |
| ++ |
| ++ // The logs written by a transaction need to be linked to each other for |
| ++ // a successful rollback/recovery. However there could be multiple |
| ++ // threads operating concurrently that are part of a common transaction. |
| ++ // These threads need to synchronize and record the lsn corresponding to |
| ++ // the last log record written by (any thread of) the transaction. |
| ++ synchronized (txnCtx) { |
| ++ previousLSN = txnCtx.getLastLogLocator().getLsn(); |
| + currentLSN = getLsn(totalLogSize, logType); |
| +- context.setLastLSN(currentLSN); |
| ++ txnCtx.setLastLSN(currentLSN); |
| + if (IS_DEBUG_MODE) { |
| + System.out.println("--------------> LSN(" + currentLSN + ") is allocated"); |
| + } |
| +@@ -547,48 +440,37 @@ |
| + * performed correctly that is ownership is released. |
| + */ |
| |
| - public static void insertInitialDatasets(MetadataTransactionContext mdTxnCtx) throws Exception { |
| -@@ -236,7 +237,7 @@ |
| - primaryIndexes[i].getNodeGroupName()); |
| - MetadataManager.INSTANCE.addDataset(mdTxnCtx, new Dataset(primaryIndexes[i].getDataverseName(), |
| - primaryIndexes[i].getIndexedDatasetName(), primaryIndexes[i].getPayloadRecordType().getTypeName(), |
| -- id, DatasetType.INTERNAL, primaryIndexes[i].getDatasetId().getId())); |
| -+ id, DatasetType.INTERNAL, primaryIndexes[i].getDatasetId().getId(), IMetadataEntity.PENDING_NO_OP)); |
| - } |
| - } |
| +- boolean decremented = false; // indicates if the transaction thread |
| +- // has release ownership of the |
| +- // page. |
| +- boolean addedFlushRequest = false; // indicates if the transaction |
| +- // thread has submitted a flush |
| +- // request. |
| ++ // indicates if the transaction thread has release ownership of the page. |
| ++ boolean decremented = false; |
| |
| -@@ -267,7 +268,7 @@ |
| - for (int i = 0; i < secondaryIndexes.length; i++) { |
| - MetadataManager.INSTANCE.addIndex(mdTxnCtx, new Index(secondaryIndexes[i].getDataverseName(), |
| - secondaryIndexes[i].getIndexedDatasetName(), secondaryIndexes[i].getIndexName(), IndexType.BTREE, |
| -- secondaryIndexes[i].getPartitioningExpr(), false)); |
| -+ secondaryIndexes[i].getPartitioningExpr(), false, IMetadataEntity.PENDING_NO_OP)); |
| - } |
| - } |
| + int pageIndex = (int) getLogPageIndex(currentLSN); |
| |
| -Index: asterix-app/src/test/java/edu/uci/ics/asterix/test/runtime/ExecutionTest.java |
| -=================================================================== |
| ---- asterix-app/src/test/java/edu/uci/ics/asterix/test/runtime/ExecutionTest.java (revision 1061) |
| -+++ asterix-app/src/test/java/edu/uci/ics/asterix/test/runtime/ExecutionTest.java (working copy) |
| -@@ -95,11 +95,11 @@ |
| - File testFile = tcCtx.getTestFile(cUnit); |
| - |
| - /*************** to avoid run failure cases **************** |
| -- if (!testFile.getAbsolutePath().contains("runtimets/queries/failure/")) { |
| -+ if (!testFile.getAbsolutePath().contains("index-selection/")) { |
| - continue; |
| - } |
| - ************************************************************/ |
| +- /* |
| +- * the lsn has been obtained for the log record. need to set the |
| +- * LogLocator instance accordingly. |
| +- */ |
| - |
| -+ |
| - File expectedResultFile = tcCtx.getExpectedResultFile(cUnit); |
| - File actualFile = new File(PATH_ACTUAL + File.separator |
| - + tcCtx.getTestCase().getFilePath().replace(File.separator, "_") + "_" + cUnit.getName() + ".adm"); |
| -Index: asterix-app/src/main/java/edu/uci/ics/asterix/file/DatasetOperations.java |
| -=================================================================== |
| ---- asterix-app/src/main/java/edu/uci/ics/asterix/file/DatasetOperations.java (revision 1061) |
| -+++ asterix-app/src/main/java/edu/uci/ics/asterix/file/DatasetOperations.java (working copy) |
| -@@ -90,7 +90,7 @@ |
| ++ // the lsn has been obtained for the log record. need to set the |
| ++ // LogLocator instance accordingly. |
| + try { |
| +- |
| + logicalLogLocator.setBuffer(logPages[pageIndex]); |
| + int pageOffset = getLogPageOffset(currentLSN); |
| + logicalLogLocator.setMemoryOffset(pageOffset); |
| |
| - private static Logger LOGGER = Logger.getLogger(DatasetOperations.class.getName()); |
| +- /* |
| +- * write the log header. |
| +- */ |
| +- logRecordHelper.writeLogHeader(logicalLogLocator, logType, context, datasetId, PKHashValue, previousLSN, |
| ++ // write the log header. |
| ++ logRecordHelper.writeLogHeader(logicalLogLocator, logType, txnCtx, datasetId, PKHashValue, previousLSN, |
| + resourceId, resourceMgrId, logContentSize); |
| |
| -- public static JobSpecification[] createDropDatasetJobSpec(CompiledDatasetDropStatement datasetDropStmt, |
| -+ public static JobSpecification createDropDatasetJobSpec(CompiledDatasetDropStatement datasetDropStmt, |
| - AqlMetadataProvider metadataProvider) throws AlgebricksException, HyracksDataException, RemoteException, |
| - ACIDException, AsterixException { |
| + // increment the offset so that the transaction can fill up the |
| + // content in the correct region of the allocated space. |
| + logicalLogLocator.increaseMemoryOffset(logRecordHelper.getLogHeaderSize(logType)); |
| |
| -@@ -111,67 +111,10 @@ |
| - throw new AlgebricksException("DROP DATASET: No metadata for dataset " + datasetName); |
| - } |
| - if (dataset.getDatasetType() == DatasetType.EXTERNAL) { |
| -- return new JobSpecification[0]; |
| -+ return new JobSpecification(); |
| - } |
| -- |
| -- List<Index> datasetIndexes = metadataProvider.getDatasetIndexes(dataset.getDataverseName(), |
| -- dataset.getDatasetName()); |
| -- int numSecondaryIndexes = 0; |
| -- for (Index index : datasetIndexes) { |
| -- if (index.isSecondaryIndex()) { |
| -- numSecondaryIndexes++; |
| -- } |
| -- } |
| -- JobSpecification[] specs; |
| -- if (numSecondaryIndexes > 0) { |
| -- specs = new JobSpecification[numSecondaryIndexes + 1]; |
| -- int i = 0; |
| -- // First, drop secondary indexes. |
| -- for (Index index : datasetIndexes) { |
| -- if (index.isSecondaryIndex()) { |
| -- specs[i] = new JobSpecification(); |
| -- Pair<IFileSplitProvider, AlgebricksPartitionConstraint> idxSplitsAndConstraint = metadataProvider |
| -- .splitProviderAndPartitionConstraintsForInternalOrFeedDataset(dataset.getDataverseName(), |
| -- datasetName, index.getIndexName()); |
| -- IIndexDataflowHelperFactory dfhFactory; |
| -- switch (index.getIndexType()) { |
| -- case BTREE: |
| -- dfhFactory = new LSMBTreeDataflowHelperFactory( |
| -- AsterixRuntimeComponentsProvider.LSMBTREE_PROVIDER, |
| -- AsterixRuntimeComponentsProvider.LSMBTREE_PROVIDER, |
| -- AsterixRuntimeComponentsProvider.LSMBTREE_PROVIDER); |
| -- break; |
| -- case RTREE: |
| -- dfhFactory = new LSMRTreeDataflowHelperFactory( |
| -- new IPrimitiveValueProviderFactory[] { null }, RTreePolicyType.RTREE, |
| -- new IBinaryComparatorFactory[] { null }, |
| -- AsterixRuntimeComponentsProvider.LSMRTREE_PROVIDER, |
| -- AsterixRuntimeComponentsProvider.LSMRTREE_PROVIDER, |
| -- AsterixRuntimeComponentsProvider.LSMRTREE_PROVIDER, null); |
| -- break; |
| -- case NGRAM_INVIX: |
| -- case WORD_INVIX: |
| -- dfhFactory = new LSMInvertedIndexDataflowHelperFactory( |
| -- AsterixRuntimeComponentsProvider.LSMINVERTEDINDEX_PROVIDER, |
| -- AsterixRuntimeComponentsProvider.LSMINVERTEDINDEX_PROVIDER, |
| -- AsterixRuntimeComponentsProvider.LSMINVERTEDINDEX_PROVIDER); |
| -- break; |
| -- default: |
| -- throw new AsterixException("Unknown index type provided."); |
| -- } |
| -- IndexDropOperatorDescriptor secondaryBtreeDrop = new IndexDropOperatorDescriptor(specs[i], |
| -- AsterixRuntimeComponentsProvider.NOINDEX_PROVIDER, |
| -- AsterixRuntimeComponentsProvider.NOINDEX_PROVIDER, idxSplitsAndConstraint.first, dfhFactory); |
| -- AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(specs[i], secondaryBtreeDrop, |
| -- idxSplitsAndConstraint.second); |
| -- i++; |
| -- } |
| -- } |
| -- } else { |
| -- specs = new JobSpecification[1]; |
| -- } |
| -+ |
| - JobSpecification specPrimary = new JobSpecification(); |
| -- specs[specs.length - 1] = specPrimary; |
| +- // a COMMIT log record does not have any content |
| +- // and hence the logger (responsible for putting the log content) is |
| +- // not invoked. |
| ++ // a COMMIT log record does not have any content and hence |
| ++ // the logger (responsible for putting the log content) is not invoked. |
| + if (logContentSize != 0) { |
| +- logger.preLog(context, reusableLogContentObject); |
| ++ logger.preLog(txnCtx, reusableLogContentObject); |
| + } |
| |
| - Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint = metadataProvider |
| - .splitProviderAndPartitionConstraintsForInternalOrFeedDataset(dataset.getDataverseName(), datasetName, |
| -@@ -187,7 +130,7 @@ |
| + if (logContentSize != 0) { |
| + // call the logger implementation and ask to fill in the log |
| + // record content at the allocated space. |
| +- logger.log(context, logicalLogLocator, logContentSize, reusableLogContentObject); |
| +- logger.postLog(context, reusableLogContentObject); |
| ++ logger.log(txnCtx, logicalLogLocator, logContentSize, reusableLogContentObject); |
| ++ logger.postLog(txnCtx, reusableLogContentObject); |
| + if (IS_DEBUG_MODE) { |
| + logicalLogLocator.setMemoryOffset(logicalLogLocator.getMemoryOffset() |
| + - logRecordHelper.getLogHeaderSize(logType)); |
| +@@ -597,10 +479,8 @@ |
| + } |
| + } |
| |
| - specPrimary.addRoot(primaryBtreeDrop); |
| +- /* |
| +- * The log record has been written. For integrity checks, compute |
| +- * the checksum and put it at the end of the log record. |
| +- */ |
| ++ // The log record has been written. For integrity checks, compute |
| ++ // the checksum and put it at the end of the log record. |
| + int startPosChecksum = logicalLogLocator.getMemoryOffset() - logRecordHelper.getLogHeaderSize(logType); |
| + int length = totalLogSize - logRecordHelper.getLogChecksumSize(); |
| + long checksum = DataUtil.getChecksum(logPages[pageIndex], startPosChecksum, length); |
| +@@ -611,46 +491,31 @@ |
| + System.out.println("--------------> LSN(" + currentLSN + ") is written"); |
| + } |
| |
| -- return specs; |
| -+ return specPrimary; |
| - } |
| +- /* |
| +- * release the ownership as the log record has been placed in |
| +- * created space. |
| +- */ |
| +- int pageDirtyCount = logPageOwnerCount[pageIndex].decrementAndGet(); |
| ++ // release the ownership as the log record has been placed in created space. |
| ++ logPageOwnerCount[pageIndex].decrementAndGet(); |
| |
| - public static JobSpecification createDatasetJobSpec(Dataverse dataverse, String datasetName, |
| -Index: asterix-app/src/main/java/edu/uci/ics/asterix/aql/translator/AqlTranslator.java |
| -=================================================================== |
| ---- asterix-app/src/main/java/edu/uci/ics/asterix/aql/translator/AqlTranslator.java (revision 1061) |
| -+++ asterix-app/src/main/java/edu/uci/ics/asterix/aql/translator/AqlTranslator.java (working copy) |
| -@@ -21,6 +21,8 @@ |
| - import java.util.HashMap; |
| - import java.util.List; |
| - import java.util.Map; |
| -+import java.util.concurrent.locks.ReadWriteLock; |
| -+import java.util.concurrent.locks.ReentrantReadWriteLock; |
| + // indicating that the transaction thread has released ownership |
| + decremented = true; |
| |
| - import org.json.JSONException; |
| +- /* |
| +- * If the transaction thread happens to be the last owner of the log |
| +- * page the page must by marked as a candidate to be flushed. |
| +- */ |
| +- if (pageDirtyCount == 0 && logPageStatus[pageIndex].get() == PageState.INACTIVE) { |
| +- addFlushRequest(pageIndex); |
| +- addedFlushRequest = true; |
| +- } |
| +- |
| +- /* |
| +- * If the log type is commit, a flush request is registered, if the |
| +- * log record has not reached the disk. It may be possible that this |
| +- * thread does not get CPU cycles and in-between the log record has |
| +- * been flushed to disk because the containing log page filled up. |
| +- */ |
| +- if (logType == LogType.COMMIT) { |
| +- synchronized (logPages[pageIndex]) { |
| +- while (getLastFlushedLsn().get() < currentLSN) { |
| +- logPages[pageIndex].wait(); |
| +- } |
| ++ if (logType == LogType.ENTITY_COMMIT) { |
| ++ map = activeTxnCountMaps.get(pageIndex); |
| ++ if (map.containsKey(txnCtx)) { |
| ++ activeTxnCount = (Integer) map.get(txnCtx); |
| ++ activeTxnCount++; |
| ++ map.put(txnCtx, activeTxnCount); |
| ++ } else { |
| ++ map.put(txnCtx, 1); |
| + } |
| ++ addFlushRequest(pageIndex, currentLSN, false); |
| ++ } else if (logType == LogType.COMMIT) { |
| ++ addFlushRequest(pageIndex, currentLSN, true); |
| + } |
| |
| -@@ -68,6 +70,7 @@ |
| - import edu.uci.ics.asterix.metadata.MetadataException; |
| - import edu.uci.ics.asterix.metadata.MetadataManager; |
| - import edu.uci.ics.asterix.metadata.MetadataTransactionContext; |
| -+import edu.uci.ics.asterix.metadata.api.IMetadataEntity; |
| - import edu.uci.ics.asterix.metadata.declared.AqlMetadataProvider; |
| - import edu.uci.ics.asterix.metadata.entities.Dataset; |
| - import edu.uci.ics.asterix.metadata.entities.Datatype; |
| -@@ -112,6 +115,7 @@ |
| - private final PrintWriter out; |
| - private final SessionConfig sessionConfig; |
| - private final DisplayFormat pdf; |
| -+ private final ReadWriteLock cacheLatch; |
| - private Dataverse activeDefaultDataverse; |
| - private List<FunctionDecl> declaredFunctions; |
| + } catch (Exception e) { |
| + e.printStackTrace(); |
| +- throw new ACIDException(context, "Thread: " + Thread.currentThread().getName() |
| ++ throw new ACIDException(txnCtx, "Thread: " + Thread.currentThread().getName() |
| + + " logger encountered exception", e); |
| + } finally { |
| +- /* |
| +- * If an exception was encountered and we did not release ownership |
| +- */ |
| + if (!decremented) { |
| + logPageOwnerCount[pageIndex].decrementAndGet(); |
| + } |
| +@@ -667,9 +532,6 @@ |
| |
| -@@ -121,6 +125,7 @@ |
| - this.out = out; |
| - this.sessionConfig = pc; |
| - this.pdf = pdf; |
| -+ this.cacheLatch = new ReentrantReadWriteLock(true); |
| - declaredFunctions = getDeclaredFunctions(aqlStatements); |
| + logPages[pageIndex].reset(filePath, LogUtil.getFileOffset(this, nextWritePosition), |
| + logManagerProperties.getLogPageSize()); |
| +- |
| +- //TODO Check if this is necessary |
| +- //Arrays.fill(logPages[pageIndex].getArray(), (byte) 0); |
| } |
| |
| -@@ -143,8 +148,7 @@ |
| + @Override |
| +@@ -747,16 +609,13 @@ |
| + //minimize memory allocation overhead. current code allocates the log page size per reading a log record. |
| |
| - for (Statement stmt : aqlStatements) { |
| - validateOperation(activeDefaultDataverse, stmt); |
| -- MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction(); |
| -- AqlMetadataProvider metadataProvider = new AqlMetadataProvider(mdTxnCtx, activeDefaultDataverse); |
| -+ AqlMetadataProvider metadataProvider = new AqlMetadataProvider(activeDefaultDataverse); |
| - metadataProvider.setWriterFactory(writerFactory); |
| - metadataProvider.setOutputFile(outputFile); |
| - metadataProvider.setConfig(config); |
| -@@ -253,15 +257,9 @@ |
| - } |
| + byte[] pageContent = new byte[logManagerProperties.getLogPageSize()]; |
| +- // take a lock on the log page so that the page is not flushed to |
| +- // disk interim |
| ++ |
| ++ // take a lock on the log page so that the page is not flushed to disk interim |
| + synchronized (logPages[pageIndex]) { |
| +- if (lsnValue > getLastFlushedLsn().get()) { // need to check |
| +- // again |
| +- // (this |
| +- // thread may have got |
| +- // de-scheduled and must |
| +- // refresh!) |
| |
| - } |
| -- MetadataManager.INSTANCE.commitTransaction(mdTxnCtx); |
| - } catch (Exception e) { |
| -- MetadataManager.INSTANCE.abortTransaction(mdTxnCtx); |
| - throw new AlgebricksException(e); |
| ++ // need to check again (this thread may have got de-scheduled and must refresh!) |
| ++ if (lsnValue > getLastFlushedLsn().get()) { |
| ++ |
| + // get the log record length |
| + logPages[pageIndex].getBytes(pageContent, 0, pageContent.length); |
| + byte logType = pageContent[pageOffset + 4]; |
| +@@ -765,9 +624,7 @@ |
| + int logRecordSize = logHeaderSize + logBodySize + logRecordHelper.getLogChecksumSize(); |
| + logRecord = new byte[logRecordSize]; |
| + |
| +- /* |
| +- * copy the log record content |
| +- */ |
| ++ // copy the log record content |
| + System.arraycopy(pageContent, pageOffset, logRecord, 0, logRecordSize); |
| + MemBasedBuffer memBuffer = new MemBasedBuffer(logRecord); |
| + if (logicalLogLocator == null) { |
| +@@ -790,9 +647,7 @@ |
| } |
| -- // Following jobs are run under a separate transaction, that is committed/aborted by the JobEventListener |
| -- for (JobSpecification jobspec : jobsToExecute) { |
| -- runJob(hcc, jobspec); |
| -- } |
| } |
| - return executionResult; |
| + |
| +- /* |
| +- * the log record is residing on the disk, read it from there. |
| +- */ |
| ++ // the log record is residing on the disk, read it from there. |
| + readDiskLog(lsnValue, logicalLogLocator); |
| } |
| -@@ -289,398 +287,802 @@ |
| |
| - private Dataverse handleUseDataverseStatement(AqlMetadataProvider metadataProvider, Statement stmt, |
| - List<JobSpecification> jobsToExecute) throws MetadataException, RemoteException, ACIDException { |
| -- DataverseDecl dvd = (DataverseDecl) stmt; |
| -- String dvName = dvd.getDataverseName().getValue(); |
| -- Dataverse dv = MetadataManager.INSTANCE.getDataverse(metadataProvider.getMetadataTxnContext(), dvName); |
| -- if (dv == null) { |
| -- throw new MetadataException("Unknown dataverse " + dvName); |
| -+ |
| -+ MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction(); |
| -+ metadataProvider.setMetadataTxnContext(mdTxnCtx); |
| -+ acquireReadLatch(); |
| -+ |
| -+ try { |
| -+ DataverseDecl dvd = (DataverseDecl) stmt; |
| -+ String dvName = dvd.getDataverseName().getValue(); |
| -+ Dataverse dv = MetadataManager.INSTANCE.getDataverse(metadataProvider.getMetadataTxnContext(), dvName); |
| -+ if (dv == null) { |
| -+ throw new MetadataException("Unknown dataverse " + dvName); |
| -+ } |
| -+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx); |
| -+ return dv; |
| -+ } catch (Exception e) { |
| -+ MetadataManager.INSTANCE.abortTransaction(mdTxnCtx); |
| -+ throw new MetadataException(e); |
| -+ } finally { |
| -+ releaseReadLatch(); |
| - } |
| -- return dv; |
| +@@ -860,30 +715,40 @@ |
| + return logPageOwnerCount[pageIndex]; |
| } |
| |
| - private void handleCreateDataverseStatement(AqlMetadataProvider metadataProvider, Statement stmt, |
| - List<JobSpecification> jobsToExecute) throws MetadataException, AlgebricksException, RemoteException, |
| - ACIDException { |
| -- CreateDataverseStatement stmtCreateDataverse = (CreateDataverseStatement) stmt; |
| -- String dvName = stmtCreateDataverse.getDataverseName().getValue(); |
| -- Dataverse dv = MetadataManager.INSTANCE.getDataverse(metadataProvider.getMetadataTxnContext(), dvName); |
| -- if (dv != null && !stmtCreateDataverse.getIfNotExists()) { |
| -- throw new AlgebricksException("A dataverse with this name " + dvName + " already exists."); |
| -+ |
| -+ MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction(); |
| -+ metadataProvider.setMetadataTxnContext(mdTxnCtx); |
| -+ acquireWriteLatch(); |
| -+ |
| -+ try { |
| -+ CreateDataverseStatement stmtCreateDataverse = (CreateDataverseStatement) stmt; |
| -+ String dvName = stmtCreateDataverse.getDataverseName().getValue(); |
| -+ Dataverse dv = MetadataManager.INSTANCE.getDataverse(metadataProvider.getMetadataTxnContext(), dvName); |
| -+ if (dv != null && !stmtCreateDataverse.getIfNotExists()) { |
| -+ throw new AlgebricksException("A dataverse with this name " + dvName + " already exists."); |
| -+ } |
| -+ MetadataManager.INSTANCE.addDataverse(metadataProvider.getMetadataTxnContext(), new Dataverse(dvName, |
| -+ stmtCreateDataverse.getFormat(), IMetadataEntity.PENDING_NO_OP)); |
| -+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx); |
| -+ } catch (Exception e) { |
| -+ MetadataManager.INSTANCE.abortTransaction(mdTxnCtx); |
| -+ throw new AlgebricksException(e); |
| -+ } finally { |
| -+ releaseWriteLatch(); |
| - } |
| -- MetadataManager.INSTANCE.addDataverse(metadataProvider.getMetadataTxnContext(), new Dataverse(dvName, |
| -- stmtCreateDataverse.getFormat())); |
| +- public ICommitResolver getCommitResolver() { |
| +- return commitResolver; |
| +- } |
| +- |
| +- public CommitRequestStatistics getCommitRequestStatistics() { |
| +- return commitRequestStatistics; |
| +- } |
| +- |
| + public IFileBasedBuffer[] getLogPages() { |
| + return logPages; |
| } |
| |
| - private void handleCreateDatasetStatement(AqlMetadataProvider metadataProvider, Statement stmt, |
| - IHyracksClientConnection hcc, List<JobSpecification> jobsToExecute) throws AsterixException, Exception { |
| -- DatasetDecl dd = (DatasetDecl) stmt; |
| -- String dataverseName = dd.getDataverse() != null ? dd.getDataverse().getValue() |
| -- : activeDefaultDataverse != null ? activeDefaultDataverse.getDataverseName() : null; |
| -- if (dataverseName == null) { |
| -- throw new AlgebricksException(" dataverse not specified "); |
| -- } |
| -- String datasetName = dd.getName().getValue(); |
| -- DatasetType dsType = dd.getDatasetType(); |
| -- String itemTypeName = dd.getItemTypeName().getValue(); |
| - |
| -- IDatasetDetails datasetDetails = null; |
| -- Dataset ds = MetadataManager.INSTANCE.getDataset(metadataProvider.getMetadataTxnContext(), dataverseName, |
| -- datasetName); |
| -- if (ds != null) { |
| -- if (dd.getIfNotExists()) { |
| -- return; |
| -- } else { |
| -- throw new AlgebricksException("A dataset with this name " + datasetName + " already exists."); |
| -+ MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction(); |
| -+ boolean bActiveTxn = true; |
| -+ metadataProvider.setMetadataTxnContext(mdTxnCtx); |
| -+ acquireWriteLatch(); |
| +- public int getLastFlushedPage() { |
| +- return lastFlushedPage.get(); |
| +- } |
| +- |
| +- public void setLastFlushedPage(int lastFlushedPage) { |
| +- this.lastFlushedPage.set(lastFlushedPage); |
| +- } |
| +- |
| + @Override |
| + public TransactionSubsystem getTransactionSubsystem() { |
| + return provider; |
| + } |
| + |
| -+ try { |
| -+ DatasetDecl dd = (DatasetDecl) stmt; |
| -+ String dataverseName = dd.getDataverse() != null ? dd.getDataverse().getValue() |
| -+ : activeDefaultDataverse != null ? activeDefaultDataverse.getDataverseName() : null; |
| -+ if (dataverseName == null) { |
| -+ throw new AlgebricksException(" dataverse not specified "); |
| - } |
| -- } |
| -- Datatype dt = MetadataManager.INSTANCE.getDatatype(metadataProvider.getMetadataTxnContext(), dataverseName, |
| -- itemTypeName); |
| -- if (dt == null) { |
| -- throw new AlgebricksException(": type " + itemTypeName + " could not be found."); |
| -- } |
| -- switch (dd.getDatasetType()) { |
| -- case INTERNAL: { |
| -- IAType itemType = dt.getDatatype(); |
| -- if (itemType.getTypeTag() != ATypeTag.RECORD) { |
| -- throw new AlgebricksException("Can only partition ARecord's."); |
| -+ String datasetName = dd.getName().getValue(); |
| -+ DatasetType dsType = dd.getDatasetType(); |
| -+ String itemTypeName = dd.getItemTypeName().getValue(); |
| ++ public void decrementActiveTxnCountOnIndexes(int pageIndex) throws HyracksDataException { |
| ++ TransactionContext ctx = null; |
| ++ int count = 0; |
| ++ int i = 0; |
| + |
| -+ IDatasetDetails datasetDetails = null; |
| -+ Dataset ds = MetadataManager.INSTANCE.getDataset(metadataProvider.getMetadataTxnContext(), dataverseName, |
| -+ datasetName); |
| -+ if (ds != null) { |
| -+ if (dd.getIfNotExists()) { |
| -+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx); |
| -+ return; |
| -+ } else { |
| -+ throw new AlgebricksException("A dataset with this name " + datasetName + " already exists."); |
| - } |
| -- List<String> partitioningExprs = ((InternalDetailsDecl) dd.getDatasetDetailsDecl()) |
| -- .getPartitioningExprs(); |
| -- String ngName = ((InternalDetailsDecl) dd.getDatasetDetailsDecl()).getNodegroupName().getValue(); |
| -- datasetDetails = new InternalDatasetDetails(InternalDatasetDetails.FileStructure.BTREE, |
| -- InternalDatasetDetails.PartitioningStrategy.HASH, partitioningExprs, partitioningExprs, ngName); |
| -- break; |
| - } |
| -- case EXTERNAL: { |
| -- String adapter = ((ExternalDetailsDecl) dd.getDatasetDetailsDecl()).getAdapter(); |
| -- Map<String, String> properties = ((ExternalDetailsDecl) dd.getDatasetDetailsDecl()).getProperties(); |
| -- datasetDetails = new ExternalDatasetDetails(adapter, properties); |
| -- break; |
| -+ Datatype dt = MetadataManager.INSTANCE.getDatatype(metadataProvider.getMetadataTxnContext(), dataverseName, |
| -+ itemTypeName); |
| -+ if (dt == null) { |
| -+ throw new AlgebricksException(": type " + itemTypeName + " could not be found."); |
| - } |
| -- case FEED: { |
| -- IAType itemType = dt.getDatatype(); |
| -- if (itemType.getTypeTag() != ATypeTag.RECORD) { |
| -- throw new AlgebricksException("Can only partition ARecord's."); |
| -+ switch (dd.getDatasetType()) { |
| -+ case INTERNAL: { |
| -+ IAType itemType = dt.getDatatype(); |
| -+ if (itemType.getTypeTag() != ATypeTag.RECORD) { |
| -+ throw new AlgebricksException("Can only partition ARecord's."); |
| ++ HashMap<TransactionContext, Integer> map = activeTxnCountMaps.get(pageIndex); |
| ++ Set<Map.Entry<TransactionContext, Integer>> entrySet = map.entrySet(); |
| ++ if (entrySet != null) { |
| ++ for (Map.Entry<TransactionContext, Integer> entry : entrySet) { |
| ++ if (entry != null) { |
| ++ if (entry.getValue() != null) { |
| ++ count = entry.getValue(); |
| + } |
| -+ List<String> partitioningExprs = ((InternalDetailsDecl) dd.getDatasetDetailsDecl()) |
| -+ .getPartitioningExprs(); |
| -+ String ngName = ((InternalDetailsDecl) dd.getDatasetDetailsDecl()).getNodegroupName().getValue(); |
| -+ datasetDetails = new InternalDatasetDetails(InternalDatasetDetails.FileStructure.BTREE, |
| -+ InternalDatasetDetails.PartitioningStrategy.HASH, partitioningExprs, partitioningExprs, |
| -+ ngName); |
| -+ break; |
| - } |
| -- List<String> partitioningExprs = ((FeedDetailsDecl) dd.getDatasetDetailsDecl()).getPartitioningExprs(); |
| -- String ngName = ((FeedDetailsDecl) dd.getDatasetDetailsDecl()).getNodegroupName().getValue(); |
| -- String adapter = ((FeedDetailsDecl) dd.getDatasetDetailsDecl()).getAdapterFactoryClassname(); |
| -- Map<String, String> configuration = ((FeedDetailsDecl) dd.getDatasetDetailsDecl()).getConfiguration(); |
| -- FunctionSignature signature = ((FeedDetailsDecl) dd.getDatasetDetailsDecl()).getFunctionSignature(); |
| -- datasetDetails = new FeedDatasetDetails(InternalDatasetDetails.FileStructure.BTREE, |
| -- InternalDatasetDetails.PartitioningStrategy.HASH, partitioningExprs, partitioningExprs, ngName, |
| -- adapter, configuration, signature, FeedDatasetDetails.FeedState.INACTIVE.toString()); |
| -- break; |
| -+ case EXTERNAL: { |
| -+ String adapter = ((ExternalDetailsDecl) dd.getDatasetDetailsDecl()).getAdapter(); |
| -+ Map<String, String> properties = ((ExternalDetailsDecl) dd.getDatasetDetailsDecl()).getProperties(); |
| -+ datasetDetails = new ExternalDatasetDetails(adapter, properties); |
| -+ break; |
| -+ } |
| -+ case FEED: { |
| -+ IAType itemType = dt.getDatatype(); |
| -+ if (itemType.getTypeTag() != ATypeTag.RECORD) { |
| -+ throw new AlgebricksException("Can only partition ARecord's."); |
| ++ if (count > 0) { |
| ++ ctx = entry.getKey(); |
| ++ for (i = 0; i < count; i++) { |
| ++ ctx.decreaseActiveTransactionCountOnIndexes(); |
| ++ } |
| + } |
| -+ List<String> partitioningExprs = ((FeedDetailsDecl) dd.getDatasetDetailsDecl()) |
| -+ .getPartitioningExprs(); |
| -+ String ngName = ((FeedDetailsDecl) dd.getDatasetDetailsDecl()).getNodegroupName().getValue(); |
| -+ String adapter = ((FeedDetailsDecl) dd.getDatasetDetailsDecl()).getAdapterFactoryClassname(); |
| -+ Map<String, String> configuration = ((FeedDetailsDecl) dd.getDatasetDetailsDecl()) |
| -+ .getConfiguration(); |
| -+ FunctionSignature signature = ((FeedDetailsDecl) dd.getDatasetDetailsDecl()).getFunctionSignature(); |
| -+ datasetDetails = new FeedDatasetDetails(InternalDatasetDetails.FileStructure.BTREE, |
| -+ InternalDatasetDetails.PartitioningStrategy.HASH, partitioningExprs, partitioningExprs, |
| -+ ngName, adapter, configuration, signature, FeedDatasetDetails.FeedState.INACTIVE.toString()); |
| -+ break; |
| + } |
| - } |
| -+ |
| -+ //#. add a new dataset with PendingAddOp |
| -+ Dataset dataset = new Dataset(dataverseName, datasetName, itemTypeName, datasetDetails, dsType, |
| -+ DatasetIdFactory.generateDatasetId(), IMetadataEntity.PENDING_ADD_OP); |
| -+ MetadataManager.INSTANCE.addDataset(metadataProvider.getMetadataTxnContext(), dataset); |
| -+ |
| -+ if (dd.getDatasetType() == DatasetType.INTERNAL || dd.getDatasetType() == DatasetType.FEED) { |
| -+ Dataverse dataverse = MetadataManager.INSTANCE.getDataverse(metadataProvider.getMetadataTxnContext(), |
| -+ dataverseName); |
| -+ JobSpecification jobSpec = DatasetOperations.createDatasetJobSpec(dataverse, datasetName, |
| -+ metadataProvider); |
| -+ |
| -+ //#. make metadataTxn commit before calling runJob. |
| -+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx); |
| -+ bActiveTxn = false; |
| -+ |
| -+ //#. runJob |
| -+ runJob(hcc, jobSpec); |
| -+ |
| -+ //#. begin new metadataTxn |
| -+ mdTxnCtx = MetadataManager.INSTANCE.beginTransaction(); |
| -+ bActiveTxn = true; |
| -+ metadataProvider.setMetadataTxnContext(mdTxnCtx); |
| + } |
| ++ } |
| + |
| -+ //#. add a new dataset with PendingNoOp after deleting the dataset with PendingAddOp |
| -+ MetadataManager.INSTANCE.dropDataset(metadataProvider.getMetadataTxnContext(), dataverseName, datasetName); |
| -+ MetadataManager.INSTANCE.addDataset(metadataProvider.getMetadataTxnContext(), new Dataset(dataverseName, |
| -+ datasetName, itemTypeName, datasetDetails, dsType, dataset.getDatasetId(), |
| -+ IMetadataEntity.PENDING_NO_OP)); |
| -+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx); |
| -+ } catch (Exception e) { |
| -+ if (bActiveTxn) { |
| -+ MetadataManager.INSTANCE.abortTransaction(mdTxnCtx); |
| -+ } |
| -+ throw new AlgebricksException(e); |
| -+ } finally { |
| -+ releaseWriteLatch(); |
| - } |
| -- MetadataManager.INSTANCE.addDataset(metadataProvider.getMetadataTxnContext(), new Dataset(dataverseName, |
| -- datasetName, itemTypeName, datasetDetails, dsType, DatasetIdFactory.generateDatasetId())); |
| -- if (dd.getDatasetType() == DatasetType.INTERNAL || dd.getDatasetType() == DatasetType.FEED) { |
| -- Dataverse dataverse = MetadataManager.INSTANCE.getDataverse(metadataProvider.getMetadataTxnContext(), |
| -- dataverseName); |
| -- runJob(hcc, DatasetOperations.createDatasetJobSpec(dataverse, datasetName, metadataProvider)); |
| -- } |
| - } |
| ++ map.clear(); |
| ++ } |
| + } |
| |
| - private void handleCreateIndexStatement(AqlMetadataProvider metadataProvider, Statement stmt, |
| - IHyracksClientConnection hcc, List<JobSpecification> jobsToExecute) throws Exception { |
| -- CreateIndexStatement stmtCreateIndex = (CreateIndexStatement) stmt; |
| -- String dataverseName = stmtCreateIndex.getDataverseName() == null ? activeDefaultDataverse == null ? null |
| -- : activeDefaultDataverse.getDataverseName() : stmtCreateIndex.getDataverseName().getValue(); |
| -- if (dataverseName == null) { |
| -- throw new AlgebricksException(" dataverse not specified "); |
| -- } |
| -- String datasetName = stmtCreateIndex.getDatasetName().getValue(); |
| -- Dataset ds = MetadataManager.INSTANCE.getDataset(metadataProvider.getMetadataTxnContext(), dataverseName, |
| -- datasetName); |
| -- if (ds == null) { |
| -- throw new AlgebricksException("There is no dataset with this name " + datasetName + " in dataverse " |
| -- + dataverseName); |
| -- } |
| -- String indexName = stmtCreateIndex.getIndexName().getValue(); |
| -- Index idx = MetadataManager.INSTANCE.getIndex(metadataProvider.getMetadataTxnContext(), dataverseName, |
| -- datasetName, indexName); |
| -- if (idx != null) { |
| -- if (!stmtCreateIndex.getIfNotExists()) { |
| -- throw new AlgebricksException("An index with this name " + indexName + " already exists."); |
| -- } else { |
| -- stmtCreateIndex.setNeedToCreate(false); |
| -+ |
| -+ MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction(); |
| -+ boolean bActiveTxn = true; |
| -+ metadataProvider.setMetadataTxnContext(mdTxnCtx); |
| -+ acquireWriteLatch(); |
| -+ |
| -+ try { |
| -+ CreateIndexStatement stmtCreateIndex = (CreateIndexStatement) stmt; |
| -+ String dataverseName = stmtCreateIndex.getDataverseName() == null ? activeDefaultDataverse == null ? null |
| -+ : activeDefaultDataverse.getDataverseName() : stmtCreateIndex.getDataverseName().getValue(); |
| -+ if (dataverseName == null) { |
| -+ throw new AlgebricksException(" dataverse not specified "); |
| - } |
| -- } else { |
| -+ String datasetName = stmtCreateIndex.getDatasetName().getValue(); |
| -+ |
| -+ Dataset ds = MetadataManager.INSTANCE.getDataset(metadataProvider.getMetadataTxnContext(), dataverseName, |
| -+ datasetName); |
| -+ if (ds == null) { |
| -+ throw new AlgebricksException("There is no dataset with this name " + datasetName + " in dataverse " |
| -+ + dataverseName); |
| -+ } |
| -+ |
| -+ String indexName = stmtCreateIndex.getIndexName().getValue(); |
| -+ Index idx = MetadataManager.INSTANCE.getIndex(metadataProvider.getMetadataTxnContext(), dataverseName, |
| -+ datasetName, indexName); |
| -+ |
| -+ if (idx != null) { |
| -+ if (!stmtCreateIndex.getIfNotExists()) { |
| -+ throw new AlgebricksException("An index with this name " + indexName + " already exists."); |
| -+ } else { |
| -+ stmtCreateIndex.setNeedToCreate(false); |
| -+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx); |
| -+ return; |
| -+ } |
| -+ } |
| -+ |
| -+ //#. add a new index with PendingAddOp |
| - Index index = new Index(dataverseName, datasetName, indexName, stmtCreateIndex.getIndexType(), |
| -- stmtCreateIndex.getFieldExprs(), stmtCreateIndex.getGramLength(), false); |
| -+ stmtCreateIndex.getFieldExprs(), stmtCreateIndex.getGramLength(), false, |
| -+ IMetadataEntity.PENDING_ADD_OP); |
| - MetadataManager.INSTANCE.addIndex(metadataProvider.getMetadataTxnContext(), index); |
| -- runCreateIndexJob(hcc, stmtCreateIndex, metadataProvider); |
| + /* |
| +@@ -895,36 +760,82 @@ |
| + class LogPageFlushThread extends Thread { |
| |
| -+ //#. create the index artifact in NC. |
| - CompiledCreateIndexStatement cis = new CompiledCreateIndexStatement(index.getIndexName(), dataverseName, |
| - index.getDatasetName(), index.getKeyFieldNames(), index.getGramLength(), index.getIndexType()); |
| -- JobSpecification loadIndexJobSpec = IndexOperations |
| -- .buildSecondaryIndexLoadingJobSpec(cis, metadataProvider); |
| -- runJob(hcc, loadIndexJobSpec); |
| -+ JobSpecification spec = IndexOperations.buildSecondaryIndexCreationJobSpec(cis, metadataProvider); |
| -+ if (spec == null) { |
| -+ throw new AsterixException("Failed to create job spec for creating index '" |
| -+ + stmtCreateIndex.getDatasetName() + "." + stmtCreateIndex.getIndexName() + "'"); |
| -+ } |
| -+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx); |
| -+ bActiveTxn = false; |
| -+ |
| -+ runJob(hcc, spec); |
| -+ |
| -+ mdTxnCtx = MetadataManager.INSTANCE.beginTransaction(); |
| -+ bActiveTxn = true; |
| -+ metadataProvider.setMetadataTxnContext(mdTxnCtx); |
| -+ |
| -+ //#. load data into the index in NC. |
| -+ cis = new CompiledCreateIndexStatement(index.getIndexName(), dataverseName, index.getDatasetName(), |
| -+ index.getKeyFieldNames(), index.getGramLength(), index.getIndexType()); |
| -+ spec = IndexOperations.buildSecondaryIndexLoadingJobSpec(cis, metadataProvider); |
| -+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx); |
| -+ bActiveTxn = false; |
| -+ |
| -+ runJob(hcc, spec); |
| -+ |
| -+ //#. begin new metadataTxn |
| -+ mdTxnCtx = MetadataManager.INSTANCE.beginTransaction(); |
| -+ bActiveTxn = true; |
| -+ metadataProvider.setMetadataTxnContext(mdTxnCtx); |
| -+ |
| -+ //#. add another new index with PendingNoOp after deleting the index with PendingAddOp |
| -+ MetadataManager.INSTANCE.dropIndex(metadataProvider.getMetadataTxnContext(), dataverseName, datasetName, |
| -+ indexName); |
| -+ index = new Index(dataverseName, datasetName, indexName, stmtCreateIndex.getIndexType(), |
| -+ stmtCreateIndex.getFieldExprs(), stmtCreateIndex.getGramLength(), false, |
| -+ IMetadataEntity.PENDING_NO_OP); |
| -+ MetadataManager.INSTANCE.addIndex(metadataProvider.getMetadataTxnContext(), index); |
| -+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx); |
| -+ |
| -+ } catch (Exception e) { |
| -+ if (bActiveTxn) { |
| -+ MetadataManager.INSTANCE.abortTransaction(mdTxnCtx); |
| -+ } |
| -+ throw new AlgebricksException(e); |
| -+ } finally { |
| -+ releaseWriteLatch(); |
| - } |
| - } |
| + private LogManager logManager; |
| ++ /* |
| ++ * pendingFlushRequests is a map with key as Integer denoting the page |
| ++ * index. When a (transaction) thread discovers the need to flush a page, it |
| ++ * puts its Thread object into the corresponding value that is a |
| ++ * LinkedBlockingQueue. The LogManager has a LogFlusher thread that scans |
| ++ * this map in order of page index (and circling around). The flusher thread |
| ++ * needs to flush pages in order and waits for a thread to deposit an object |
| ++ * in the blocking queue corresponding to the next page in order. A request |
| ++ * to flush a page is conveyed to the flush thread by simply depositing an |
| ++ * object in to corresponding blocking queue. It is blocking in the sense |
| ++ * that the flusher thread will continue to wait for an object to arrive in |
| ++ * the queue. The object itself is ignored by the fliusher and just acts as |
| ++ * a signal/event that a page needs to be flushed. |
| ++ */ |
| ++ private final LinkedBlockingQueue<Object>[] flushRequestQueue; |
| ++ private final Object[] flushRequests; |
| ++ private int lastFlushedPageIndex; |
| ++ private final long groupCommitWaitPeriod; |
| |
| - private void handleCreateTypeStatement(AqlMetadataProvider metadataProvider, Statement stmt, |
| - List<JobSpecification> jobsToExecute) throws AlgebricksException, RemoteException, ACIDException, |
| - MetadataException { |
| -- MetadataTransactionContext mdTxnCtx = metadataProvider.getMetadataTxnContext(); |
| -- TypeDecl stmtCreateType = (TypeDecl) stmt; |
| -- String dataverseName = stmtCreateType.getDataverseName() == null ? activeDefaultDataverse == null ? null |
| -- : activeDefaultDataverse.getDataverseName() : stmtCreateType.getDataverseName().getValue(); |
| -- if (dataverseName == null) { |
| -- throw new AlgebricksException(" dataverse not specified "); |
| -- } |
| -- String typeName = stmtCreateType.getIdent().getValue(); |
| -- Dataverse dv = MetadataManager.INSTANCE.getDataverse(mdTxnCtx, dataverseName); |
| -- if (dv == null) { |
| -- throw new AlgebricksException("Unknonw dataverse " + dataverseName); |
| -- } |
| -- Datatype dt = MetadataManager.INSTANCE.getDatatype(mdTxnCtx, dataverseName, typeName); |
| -- if (dt != null) { |
| -- if (!stmtCreateType.getIfNotExists()) |
| -- throw new AlgebricksException("A datatype with this name " + typeName + " already exists."); |
| -- } else { |
| -- if (builtinTypeMap.get(typeName) != null) { |
| -- throw new AlgebricksException("Cannot redefine builtin type " + typeName + "."); |
| -+ |
| -+ MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction(); |
| -+ metadataProvider.setMetadataTxnContext(mdTxnCtx); |
| -+ acquireWriteLatch(); |
| -+ |
| -+ try { |
| -+ TypeDecl stmtCreateType = (TypeDecl) stmt; |
| -+ String dataverseName = stmtCreateType.getDataverseName() == null ? activeDefaultDataverse == null ? null |
| -+ : activeDefaultDataverse.getDataverseName() : stmtCreateType.getDataverseName().getValue(); |
| -+ if (dataverseName == null) { |
| -+ throw new AlgebricksException(" dataverse not specified "); |
| -+ } |
| -+ String typeName = stmtCreateType.getIdent().getValue(); |
| -+ Dataverse dv = MetadataManager.INSTANCE.getDataverse(mdTxnCtx, dataverseName); |
| -+ if (dv == null) { |
| -+ throw new AlgebricksException("Unknonw dataverse " + dataverseName); |
| -+ } |
| -+ Datatype dt = MetadataManager.INSTANCE.getDatatype(mdTxnCtx, dataverseName, typeName); |
| -+ if (dt != null) { |
| -+ if (!stmtCreateType.getIfNotExists()) { |
| -+ throw new AlgebricksException("A datatype with this name " + typeName + " already exists."); |
| -+ } |
| - } else { |
| -- Map<TypeSignature, IAType> typeMap = TypeTranslator.computeTypes(mdTxnCtx, (TypeDecl) stmt, |
| -- dataverseName); |
| -- TypeSignature typeSignature = new TypeSignature(dataverseName, typeName); |
| -- IAType type = typeMap.get(typeSignature); |
| -- MetadataManager.INSTANCE.addDatatype(mdTxnCtx, new Datatype(dataverseName, typeName, type, false)); |
| -+ if (builtinTypeMap.get(typeName) != null) { |
| -+ throw new AlgebricksException("Cannot redefine builtin type " + typeName + "."); |
| -+ } else { |
| -+ Map<TypeSignature, IAType> typeMap = TypeTranslator.computeTypes(mdTxnCtx, (TypeDecl) stmt, |
| -+ dataverseName); |
| -+ TypeSignature typeSignature = new TypeSignature(dataverseName, typeName); |
| -+ IAType type = typeMap.get(typeSignature); |
| -+ MetadataManager.INSTANCE.addDatatype(mdTxnCtx, new Datatype(dataverseName, typeName, type, false)); |
| -+ } |
| - } |
| -+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx); |
| -+ } catch (Exception e) { |
| -+ MetadataManager.INSTANCE.abortTransaction(mdTxnCtx); |
| -+ throw new AlgebricksException(e); |
| -+ } finally { |
| -+ releaseWriteLatch(); |
| - } |
| + public LogPageFlushThread(LogManager logManager) { |
| + this.logManager = logManager; |
| + setName("Flusher"); |
| ++ int numLogPages = logManager.getLogManagerProperties().getNumLogPages(); |
| ++ this.flushRequestQueue = new LinkedBlockingQueue[numLogPages]; |
| ++ this.flushRequests = new Object[numLogPages]; |
| ++ for (int i = 0; i < numLogPages; i++) { |
| ++ flushRequestQueue[i] = new LinkedBlockingQueue<Object>(1); |
| ++ flushRequests[i] = new Object(); |
| ++ } |
| ++ this.lastFlushedPageIndex = -1; |
| ++ groupCommitWaitPeriod = logManager.getLogManagerProperties().getGroupCommitWaitPeriod(); |
| } |
| |
| - private void handleDataverseDropStatement(AqlMetadataProvider metadataProvider, Statement stmt, |
| - IHyracksClientConnection hcc, List<JobSpecification> jobsToExecute) throws Exception { |
| -- MetadataTransactionContext mdTxnCtx = metadataProvider.getMetadataTxnContext(); |
| -- DataverseDropStatement stmtDelete = (DataverseDropStatement) stmt; |
| -- String dvName = stmtDelete.getDataverseName().getValue(); |
| - |
| -- Dataverse dv = MetadataManager.INSTANCE.getDataverse(mdTxnCtx, dvName); |
| -- if (dv == null) { |
| -- if (!stmtDelete.getIfExists()) { |
| -- throw new AlgebricksException("There is no dataverse with this name " + dvName + "."); |
| -+ MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction(); |
| -+ boolean bActiveTxn = true; |
| -+ metadataProvider.setMetadataTxnContext(mdTxnCtx); |
| -+ acquireWriteLatch(); |
| -+ |
| -+ try { |
| -+ DataverseDropStatement stmtDelete = (DataverseDropStatement) stmt; |
| -+ String dvName = stmtDelete.getDataverseName().getValue(); |
| -+ |
| -+ Dataverse dv = MetadataManager.INSTANCE.getDataverse(mdTxnCtx, dvName); |
| -+ if (dv == null) { |
| -+ if (!stmtDelete.getIfExists()) { |
| -+ throw new AlgebricksException("There is no dataverse with this name " + dvName + "."); |
| -+ } |
| -+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx); |
| ++ public void requestFlush(int pageIndex, long lsn, boolean isSynchronous) { |
| ++ synchronized (logManager.getLogPage(pageIndex)) { |
| ++ //return if flushedLSN >= lsn |
| ++ if (logManager.getLastFlushedLsn().get() >= lsn) { |
| + return; |
| - } |
| -- } else { |
| -+ |
| -+ //#. prepare jobs which will drop corresponding datasets with indexes. |
| - List<Dataset> datasets = MetadataManager.INSTANCE.getDataverseDatasets(mdTxnCtx, dvName); |
| - for (int j = 0; j < datasets.size(); j++) { |
| - String datasetName = datasets.get(j).getDatasetName(); |
| - DatasetType dsType = datasets.get(j).getDatasetType(); |
| - if (dsType == DatasetType.INTERNAL || dsType == DatasetType.FEED) { |
| -+ |
| - List<Index> indexes = MetadataManager.INSTANCE.getDatasetIndexes(mdTxnCtx, dvName, datasetName); |
| - for (int k = 0; k < indexes.size(); k++) { |
| - if (indexes.get(k).isSecondaryIndex()) { |
| -- compileIndexDropStatement(hcc, dvName, datasetName, indexes.get(k).getIndexName(), |
| -- metadataProvider); |
| -+ CompiledIndexDropStatement cds = new CompiledIndexDropStatement(dvName, datasetName, |
| -+ indexes.get(k).getIndexName()); |
| -+ jobsToExecute.add(IndexOperations.buildDropSecondaryIndexJobSpec(cds, metadataProvider)); |
| - } |
| - } |
| -+ |
| -+ CompiledDatasetDropStatement cds = new CompiledDatasetDropStatement(dvName, datasetName); |
| -+ jobsToExecute.add(DatasetOperations.createDropDatasetJobSpec(cds, metadataProvider)); |
| - } |
| -- compileDatasetDropStatement(hcc, dvName, datasetName, metadataProvider); |
| - } |
| - |
| -+ //#. mark PendingDropOp on the dataverse record by |
| -+ // first, deleting the dataverse record from the DATAVERSE_DATASET |
| -+ // second, inserting the dataverse record with the PendingDropOp value into the DATAVERSE_DATASET |
| - MetadataManager.INSTANCE.dropDataverse(mdTxnCtx, dvName); |
| -+ MetadataManager.INSTANCE.addDataverse(mdTxnCtx, new Dataverse(dvName, dv.getDataFormat(), |
| -+ IMetadataEntity.PENDING_DROP_OP)); |
| -+ |
| -+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx); |
| -+ bActiveTxn = false; |
| -+ |
| -+ for (JobSpecification jobSpec : jobsToExecute) { |
| -+ runJob(hcc, jobSpec); |
| + } |
| + |
| -+ mdTxnCtx = MetadataManager.INSTANCE.beginTransaction(); |
| -+ bActiveTxn = true; |
| -+ metadataProvider.setMetadataTxnContext(mdTxnCtx); |
| ++ //put a new request to the queue only if the request on the page is not in the queue. |
| ++ flushRequestQueue[pageIndex].offer(flushRequests[pageIndex]); |
| + |
| -+ //#. finally, delete the dataverse. |
| -+ MetadataManager.INSTANCE.dropDataverse(mdTxnCtx, dvName); |
| - if (activeDefaultDataverse != null && activeDefaultDataverse.getDataverseName() == dvName) { |
| - activeDefaultDataverse = null; |
| - } |
| -+ |
| -+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx); |
| -+ } catch (Exception e) { |
| -+ if (bActiveTxn) { |
| -+ MetadataManager.INSTANCE.abortTransaction(mdTxnCtx); |
| ++ //return if the request is asynchronous |
| ++ if (!isSynchronous) { |
| ++ return; |
| + } |
| -+ throw new AlgebricksException(e); |
| -+ } finally { |
| -+ releaseWriteLatch(); |
| - } |
| - } |
| - |
| - private void handleDatasetDropStatement(AqlMetadataProvider metadataProvider, Statement stmt, |
| - IHyracksClientConnection hcc, List<JobSpecification> jobsToExecute) throws Exception { |
| -- MetadataTransactionContext mdTxnCtx = metadataProvider.getMetadataTxnContext(); |
| -- DropStatement stmtDelete = (DropStatement) stmt; |
| -- String dataverseName = stmtDelete.getDataverseName() == null ? activeDefaultDataverse == null ? null |
| -- : activeDefaultDataverse.getDataverseName() : stmtDelete.getDataverseName().getValue(); |
| -- if (dataverseName == null) { |
| -- throw new AlgebricksException(" dataverse not specified "); |
| -- } |
| -- String datasetName = stmtDelete.getDatasetName().getValue(); |
| -- Dataset ds = MetadataManager.INSTANCE.getDataset(mdTxnCtx, dataverseName, datasetName); |
| -- if (ds == null) { |
| -- if (!stmtDelete.getIfExists()) |
| -- throw new AlgebricksException("There is no dataset with this name " + datasetName + " in dataverse " |
| -- + dataverseName + "."); |
| -- } else { |
| + |
| -+ MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction(); |
| -+ boolean bActiveTxn = true; |
| -+ metadataProvider.setMetadataTxnContext(mdTxnCtx); |
| -+ acquireWriteLatch(); |
| -+ |
| -+ try { |
| -+ DropStatement stmtDelete = (DropStatement) stmt; |
| -+ String dataverseName = stmtDelete.getDataverseName() == null ? activeDefaultDataverse == null ? null |
| -+ : activeDefaultDataverse.getDataverseName() : stmtDelete.getDataverseName().getValue(); |
| -+ if (dataverseName == null) { |
| -+ throw new AlgebricksException(" dataverse not specified "); |
| -+ } |
| -+ String datasetName = stmtDelete.getDatasetName().getValue(); |
| -+ |
| -+ Dataset ds = MetadataManager.INSTANCE.getDataset(mdTxnCtx, dataverseName, datasetName); |
| -+ if (ds == null) { |
| -+ if (!stmtDelete.getIfExists()) { |
| -+ throw new AlgebricksException("There is no dataset with this name " + datasetName |
| -+ + " in dataverse " + dataverseName + "."); |
| ++ //wait until there is flush. |
| ++ boolean isNotified = false; |
| ++ while (!isNotified) { |
| ++ try { |
| ++ logManager.getLogPage(pageIndex).wait(); |
| ++ isNotified = true; |
| ++ } catch (InterruptedException e) { |
| ++ e.printStackTrace(); |
| + } |
| -+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx); |
| -+ return; |
| + } |
| ++ } |
| ++ } |
| + |
| - if (ds.getDatasetType() == DatasetType.INTERNAL || ds.getDatasetType() == DatasetType.FEED) { |
| -+ |
| -+ //#. prepare jobs to drop the datatset and the indexes in NC |
| - List<Index> indexes = MetadataManager.INSTANCE.getDatasetIndexes(mdTxnCtx, dataverseName, datasetName); |
| - for (int j = 0; j < indexes.size(); j++) { |
| -- if (indexes.get(j).isPrimaryIndex()) { |
| -- compileIndexDropStatement(hcc, dataverseName, datasetName, indexes.get(j).getIndexName(), |
| -- metadataProvider); |
| -+ if (indexes.get(j).isSecondaryIndex()) { |
| -+ CompiledIndexDropStatement cds = new CompiledIndexDropStatement(dataverseName, datasetName, |
| -+ indexes.get(j).getIndexName()); |
| -+ jobsToExecute.add(IndexOperations.buildDropSecondaryIndexJobSpec(cds, metadataProvider)); |
| + @Override |
| + public void run() { |
| + while (true) { |
| + try { |
| +- int pageToFlush = logManager.getNextPageInSequence(logManager.getLastFlushedPage()); |
| ++ int pageToFlush = logManager.getNextPageInSequence(lastFlushedPageIndex); |
| + |
| +- /* |
| +- * A wait call on the linkedBLockingQueue. The flusher thread is |
| +- * notified when an object is added to the queue. Please note |
| +- * that each page has an associated blocking queue. |
| +- */ |
| +- logManager.getPendingFlushRequests(pageToFlush).take(); |
| ++ // A wait call on the linkedBLockingQueue. The flusher thread is |
| ++ // notified when an object is added to the queue. Please note |
| ++ // that each page has an associated blocking queue. |
| ++ flushRequestQueue[pageToFlush].take(); |
| + |
| +- /* |
| +- * The LogFlusher was waiting for a page to be marked as a |
| +- * candidate for flushing. Now that has happened. The thread |
| +- * shall proceed to take a lock on the log page |
| +- */ |
| +- synchronized (logManager.getLogPages()[pageToFlush]) { |
| ++ synchronized (logManager.getLogPage(pageToFlush)) { |
| + |
| +- /* |
| +- * lock the internal state of the log manager and create a |
| +- * log file if necessary. |
| +- */ |
| ++ // lock the internal state of the log manager and create a |
| ++ // log file if necessary. |
| + int prevLogFileId = logManager.getLogFileId(logManager.getLastFlushedLsn().get()); |
| + int nextLogFileId = logManager.getLogFileId(logManager.getLastFlushedLsn().get() |
| + + logManager.getLogManagerProperties().getLogPageSize()); |
| +@@ -936,198 +847,60 @@ |
| + logManager.getLogManagerProperties().getLogPageSize()); |
| } |
| - } |
| -+ CompiledDatasetDropStatement cds = new CompiledDatasetDropStatement(dataverseName, datasetName); |
| -+ jobsToExecute.add(DatasetOperations.createDropDatasetJobSpec(cds, metadataProvider)); |
| -+ |
| -+ //#. mark the existing dataset as PendingDropOp |
| -+ MetadataManager.INSTANCE.dropDataset(mdTxnCtx, dataverseName, datasetName); |
| -+ MetadataManager.INSTANCE.addDataset( |
| -+ mdTxnCtx, |
| -+ new Dataset(dataverseName, datasetName, ds.getItemTypeName(), ds.getDatasetDetails(), ds |
| -+ .getDatasetType(), ds.getDatasetId(), IMetadataEntity.PENDING_DROP_OP)); |
| -+ |
| -+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx); |
| -+ bActiveTxn = false; |
| -+ |
| -+ //#. run the jobs |
| -+ for (JobSpecification jobSpec : jobsToExecute) { |
| -+ runJob(hcc, jobSpec); |
| -+ } |
| -+ |
| -+ mdTxnCtx = MetadataManager.INSTANCE.beginTransaction(); |
| -+ bActiveTxn = true; |
| -+ metadataProvider.setMetadataTxnContext(mdTxnCtx); |
| - } |
| -- compileDatasetDropStatement(hcc, dataverseName, datasetName, metadataProvider); |
| -+ |
| -+ //#. finally, delete the dataset. |
| -+ MetadataManager.INSTANCE.dropDataset(mdTxnCtx, dataverseName, datasetName); |
| -+ |
| -+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx); |
| -+ } catch (Exception e) { |
| -+ if (bActiveTxn) { |
| -+ MetadataManager.INSTANCE.abortTransaction(mdTxnCtx); |
| -+ } |
| -+ throw new AlgebricksException(e); |
| -+ } finally { |
| -+ releaseWriteLatch(); |
| - } |
| - } |
| |
| - private void handleIndexDropStatement(AqlMetadataProvider metadataProvider, Statement stmt, |
| - IHyracksClientConnection hcc, List<JobSpecification> jobsToExecute) throws Exception { |
| -- MetadataTransactionContext mdTxnCtx = metadataProvider.getMetadataTxnContext(); |
| -- IndexDropStatement stmtIndexDrop = (IndexDropStatement) stmt; |
| -- String datasetName = stmtIndexDrop.getDatasetName().getValue(); |
| -- String dataverseName = stmtIndexDrop.getDataverseName() == null ? activeDefaultDataverse == null ? null |
| -- : activeDefaultDataverse.getDataverseName() : stmtIndexDrop.getDataverseName().getValue(); |
| -- if (dataverseName == null) { |
| -- throw new AlgebricksException(" dataverse not specified "); |
| -+ |
| -+ MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction(); |
| -+ boolean bActiveTxn = true; |
| -+ metadataProvider.setMetadataTxnContext(mdTxnCtx); |
| -+ acquireWriteLatch(); |
| -+ |
| -+ try { |
| -+ IndexDropStatement stmtIndexDrop = (IndexDropStatement) stmt; |
| -+ String datasetName = stmtIndexDrop.getDatasetName().getValue(); |
| -+ String dataverseName = stmtIndexDrop.getDataverseName() == null ? activeDefaultDataverse == null ? null |
| -+ : activeDefaultDataverse.getDataverseName() : stmtIndexDrop.getDataverseName().getValue(); |
| -+ if (dataverseName == null) { |
| -+ throw new AlgebricksException(" dataverse not specified "); |
| -+ } |
| -+ |
| -+ Dataset ds = MetadataManager.INSTANCE.getDataset(mdTxnCtx, dataverseName, datasetName); |
| -+ if (ds == null) { |
| -+ throw new AlgebricksException("There is no dataset with this name " + datasetName + " in dataverse " |
| -+ + dataverseName); |
| -+ } |
| -+ |
| -+ if (ds.getDatasetType() == DatasetType.INTERNAL || ds.getDatasetType() == DatasetType.FEED) { |
| -+ String indexName = stmtIndexDrop.getIndexName().getValue(); |
| -+ Index index = MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataverseName, datasetName, indexName); |
| -+ if (index == null) { |
| -+ if (!stmtIndexDrop.getIfExists()) { |
| -+ throw new AlgebricksException("There is no index with this name " + indexName + "."); |
| -+ } |
| -+ } else { |
| -+ //#. prepare a job to drop the index in NC. |
| -+ CompiledIndexDropStatement cds = new CompiledIndexDropStatement(dataverseName, datasetName, |
| -+ indexName); |
| -+ jobsToExecute.add(IndexOperations.buildDropSecondaryIndexJobSpec(cds, metadataProvider)); |
| -+ |
| -+ //#. mark PendingDropOp on the existing index |
| -+ MetadataManager.INSTANCE.dropIndex(mdTxnCtx, dataverseName, datasetName, indexName); |
| -+ MetadataManager.INSTANCE.addIndex( |
| -+ mdTxnCtx, |
| -+ new Index(dataverseName, datasetName, indexName, index.getIndexType(), index |
| -+ .getKeyFieldNames(), index.isPrimaryIndex(), IMetadataEntity.PENDING_DROP_OP)); |
| -+ |
| -+ //#. commit the existing transaction before calling runJob. |
| -+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx); |
| -+ bActiveTxn = false; |
| -+ |
| -+ for (JobSpecification jobSpec : jobsToExecute) { |
| -+ runJob(hcc, jobSpec); |
| -+ } |
| -+ |
| -+ //#. begin a new transaction |
| -+ mdTxnCtx = MetadataManager.INSTANCE.beginTransaction(); |
| -+ bActiveTxn = true; |
| -+ metadataProvider.setMetadataTxnContext(mdTxnCtx); |
| -+ |
| -+ //#. finally, delete the existing index |
| -+ MetadataManager.INSTANCE.dropIndex(mdTxnCtx, dataverseName, datasetName, indexName); |
| -+ } |
| -+ } else { |
| -+ throw new AlgebricksException(datasetName |
| -+ + " is an external dataset. Indexes are not maintained for external datasets."); |
| -+ } |
| -+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx); |
| -+ |
| -+ } catch (Exception e) { |
| -+ if (bActiveTxn) { |
| -+ MetadataManager.INSTANCE.abortTransaction(mdTxnCtx); |
| -+ } |
| -+ throw new AlgebricksException(e); |
| -+ |
| -+ } finally { |
| -+ releaseWriteLatch(); |
| - } |
| -- Dataset ds = MetadataManager.INSTANCE.getDataset(mdTxnCtx, dataverseName, datasetName); |
| -- if (ds == null) |
| -- throw new AlgebricksException("There is no dataset with this name " + datasetName + " in dataverse " |
| -- + dataverseName); |
| -- if (ds.getDatasetType() == DatasetType.INTERNAL || ds.getDatasetType() == DatasetType.FEED) { |
| -- String indexName = stmtIndexDrop.getIndexName().getValue(); |
| -- Index idx = MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataverseName, datasetName, indexName); |
| -- if (idx == null) { |
| -- if (!stmtIndexDrop.getIfExists()) |
| -- throw new AlgebricksException("There is no index with this name " + indexName + "."); |
| -- } else |
| -- compileIndexDropStatement(hcc, dataverseName, datasetName, indexName, metadataProvider); |
| -- } else { |
| -- throw new AlgebricksException(datasetName |
| -- + " is an external dataset. Indexes are not maintained for external datasets."); |
| -- } |
| - } |
| +- logManager.getLogPage(pageToFlush).flush(); // put the |
| +- // content to |
| +- // disk, the |
| +- // thread still |
| +- // has a lock on |
| +- // the log page |
| ++ //#. sleep during the groupCommitWaitTime |
| ++ sleep(groupCommitWaitPeriod); |
| |
| - private void handleTypeDropStatement(AqlMetadataProvider metadataProvider, Statement stmt, |
| - List<JobSpecification> jobsToExecute) throws AlgebricksException, MetadataException, RemoteException, |
| - ACIDException { |
| -- MetadataTransactionContext mdTxnCtx = metadataProvider.getMetadataTxnContext(); |
| -- TypeDropStatement stmtTypeDrop = (TypeDropStatement) stmt; |
| -- String dataverseName = stmtTypeDrop.getDataverseName() == null ? (activeDefaultDataverse == null ? null |
| -- : activeDefaultDataverse.getDataverseName()) : stmtTypeDrop.getDataverseName().getValue(); |
| -- if (dataverseName == null) { |
| -- throw new AlgebricksException(" dataverse not specified "); |
| -+ |
| -+ MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction(); |
| -+ metadataProvider.setMetadataTxnContext(mdTxnCtx); |
| -+ acquireWriteLatch(); |
| -+ |
| -+ try { |
| -+ TypeDropStatement stmtTypeDrop = (TypeDropStatement) stmt; |
| -+ String dataverseName = stmtTypeDrop.getDataverseName() == null ? (activeDefaultDataverse == null ? null |
| -+ : activeDefaultDataverse.getDataverseName()) : stmtTypeDrop.getDataverseName().getValue(); |
| -+ if (dataverseName == null) { |
| -+ throw new AlgebricksException(" dataverse not specified "); |
| -+ } |
| -+ String typeName = stmtTypeDrop.getTypeName().getValue(); |
| -+ Datatype dt = MetadataManager.INSTANCE.getDatatype(mdTxnCtx, dataverseName, typeName); |
| -+ if (dt == null) { |
| -+ if (!stmtTypeDrop.getIfExists()) |
| -+ throw new AlgebricksException("There is no datatype with this name " + typeName + "."); |
| -+ } else { |
| -+ MetadataManager.INSTANCE.dropDatatype(mdTxnCtx, dataverseName, typeName); |
| -+ } |
| -+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx); |
| -+ } catch (Exception e) { |
| -+ MetadataManager.INSTANCE.abortTransaction(mdTxnCtx); |
| -+ throw new AlgebricksException(e); |
| -+ } finally { |
| -+ releaseWriteLatch(); |
| - } |
| -- String typeName = stmtTypeDrop.getTypeName().getValue(); |
| -- Datatype dt = MetadataManager.INSTANCE.getDatatype(mdTxnCtx, dataverseName, typeName); |
| -- if (dt == null) { |
| -- if (!stmtTypeDrop.getIfExists()) |
| -- throw new AlgebricksException("There is no datatype with this name " + typeName + "."); |
| -- } else { |
| -- MetadataManager.INSTANCE.dropDatatype(mdTxnCtx, dataverseName, typeName); |
| -- } |
| - } |
| +- /* |
| +- * acquire lock on the log manager as we need to update the |
| +- * internal bookkeeping data. |
| +- */ |
| ++ //#. set the logPageStatus to INACTIVE in order to prevent other txns from writing on this page. |
| ++ logManager.getLogPageStatus(pageToFlush).set(PageState.INACTIVE); |
| |
| - private void handleNodegroupDropStatement(AqlMetadataProvider metadataProvider, Statement stmt, |
| - List<JobSpecification> jobsToExecute) throws MetadataException, AlgebricksException, RemoteException, |
| - ACIDException { |
| -- MetadataTransactionContext mdTxnCtx = metadataProvider.getMetadataTxnContext(); |
| -- NodeGroupDropStatement stmtDelete = (NodeGroupDropStatement) stmt; |
| -- String nodegroupName = stmtDelete.getNodeGroupName().getValue(); |
| -- NodeGroup ng = MetadataManager.INSTANCE.getNodegroup(mdTxnCtx, nodegroupName); |
| -- if (ng == null) { |
| -- if (!stmtDelete.getIfExists()) |
| -- throw new AlgebricksException("There is no nodegroup with this name " + nodegroupName + "."); |
| -- } else { |
| -- MetadataManager.INSTANCE.dropNodegroup(mdTxnCtx, nodegroupName); |
| -+ |
| -+ MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction(); |
| -+ metadataProvider.setMetadataTxnContext(mdTxnCtx); |
| -+ acquireWriteLatch(); |
| -+ |
| -+ try { |
| -+ NodeGroupDropStatement stmtDelete = (NodeGroupDropStatement) stmt; |
| -+ String nodegroupName = stmtDelete.getNodeGroupName().getValue(); |
| -+ NodeGroup ng = MetadataManager.INSTANCE.getNodegroup(mdTxnCtx, nodegroupName); |
| -+ if (ng == null) { |
| -+ if (!stmtDelete.getIfExists()) |
| -+ throw new AlgebricksException("There is no nodegroup with this name " + nodegroupName + "."); |
| -+ } else { |
| -+ MetadataManager.INSTANCE.dropNodegroup(mdTxnCtx, nodegroupName); |
| -+ } |
| -+ |
| -+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx); |
| -+ } catch (Exception e) { |
| -+ MetadataManager.INSTANCE.abortTransaction(mdTxnCtx); |
| -+ throw new AlgebricksException(e); |
| -+ } finally { |
| -+ releaseWriteLatch(); |
| - } |
| - } |
| +- // increment the last flushed lsn. |
| +- long lastFlushedLsn = logManager.incrementLastFlushedLsn(logManager.getLogManagerProperties() |
| +- .getLogPageSize()); |
| ++ //#. need to wait until the logPageOwnerCount reaches 1 (LOG_WRITER) |
| ++ // meaning every one has finished writing logs on this page. |
| ++ while (logManager.getLogPageOwnershipCount(pageToFlush).get() != PageOwnershipStatus.LOG_WRITER) { |
| ++ sleep(0); |
| ++ } |
| |
| - private void handleCreateFunctionStatement(AqlMetadataProvider metadataProvider, Statement stmt, |
| - List<JobSpecification> jobsToExecute) throws AlgebricksException, MetadataException, RemoteException, |
| - ACIDException { |
| -- MetadataTransactionContext mdTxnCtx = metadataProvider.getMetadataTxnContext(); |
| -- CreateFunctionStatement cfs = (CreateFunctionStatement) stmt; |
| -- String dataverse = cfs.getSignature().getNamespace() == null ? activeDefaultDataverse == null ? null |
| -- : activeDefaultDataverse.getDataverseName() : cfs.getSignature().getNamespace(); |
| -- if (dataverse == null) { |
| -- throw new AlgebricksException(" dataverse not specified "); |
| -+ MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction(); |
| -+ metadataProvider.setMetadataTxnContext(mdTxnCtx); |
| -+ acquireWriteLatch(); |
| -+ |
| -+ try { |
| -+ CreateFunctionStatement cfs = (CreateFunctionStatement) stmt; |
| -+ String dataverse = cfs.getSignature().getNamespace() == null ? activeDefaultDataverse == null ? null |
| -+ : activeDefaultDataverse.getDataverseName() : cfs.getSignature().getNamespace(); |
| -+ if (dataverse == null) { |
| -+ throw new AlgebricksException(" dataverse not specified "); |
| -+ } |
| -+ Dataverse dv = MetadataManager.INSTANCE.getDataverse(mdTxnCtx, dataverse); |
| -+ if (dv == null) { |
| -+ throw new AlgebricksException("There is no dataverse with this name " + dataverse + "."); |
| -+ } |
| -+ Function function = new Function(dataverse, cfs.getaAterixFunction().getName(), cfs.getaAterixFunction() |
| -+ .getArity(), cfs.getParamList(), Function.RETURNTYPE_VOID, cfs.getFunctionBody(), |
| -+ Function.LANGUAGE_AQL, FunctionKind.SCALAR.toString()); |
| -+ MetadataManager.INSTANCE.addFunction(mdTxnCtx, function); |
| -+ |
| -+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx); |
| -+ } catch (Exception e) { |
| -+ MetadataManager.INSTANCE.abortTransaction(mdTxnCtx); |
| -+ throw new AlgebricksException(e); |
| -+ } finally { |
| -+ releaseWriteLatch(); |
| - } |
| -- Dataverse dv = MetadataManager.INSTANCE.getDataverse(mdTxnCtx, dataverse); |
| -- if (dv == null) { |
| -- throw new AlgebricksException("There is no dataverse with this name " + dataverse + "."); |
| -- } |
| -- Function function = new Function(dataverse, cfs.getaAterixFunction().getName(), cfs.getaAterixFunction() |
| -- .getArity(), cfs.getParamList(), Function.RETURNTYPE_VOID, cfs.getFunctionBody(), |
| -- Function.LANGUAGE_AQL, FunctionKind.SCALAR.toString()); |
| -- MetadataManager.INSTANCE.addFunction(mdTxnCtx, function); |
| - } |
| +- /* |
| +- * the log manager gains back ownership of the page. this is |
| +- * reflected by incrementing the owner count of the page. |
| +- * recall that when the page is begin flushed the owner |
| +- * count is actually 0 Value of zero implicitly indicates |
| +- * that the page is operated upon by the log flusher thread. |
| +- */ |
| +- logManager.getLogPageOwnershipCount(pageToFlush).incrementAndGet(); |
| ++ //#. set the logPageOwnerCount to 0 (LOG_FLUSHER) |
| ++ // meaning it is flushing. |
| ++ logManager.getLogPageOwnershipCount(pageToFlush).set(PageOwnershipStatus.LOG_FLUSHER); |
| |
| - private void handleFunctionDropStatement(AqlMetadataProvider metadataProvider, Statement stmt, |
| - List<JobSpecification> jobsToExecute) throws MetadataException, RemoteException, ACIDException, |
| - AlgebricksException { |
| -- MetadataTransactionContext mdTxnCtx = metadataProvider.getMetadataTxnContext(); |
| -- FunctionDropStatement stmtDropFunction = (FunctionDropStatement) stmt; |
| -- FunctionSignature signature = stmtDropFunction.getFunctionSignature(); |
| -- Function function = MetadataManager.INSTANCE.getFunction(mdTxnCtx, signature); |
| -- if (function == null) { |
| -- if (!stmtDropFunction.getIfExists()) |
| -- throw new AlgebricksException("Unknonw function " + signature); |
| -- } else { |
| -- MetadataManager.INSTANCE.dropFunction(mdTxnCtx, signature); |
| -+ MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction(); |
| -+ metadataProvider.setMetadataTxnContext(mdTxnCtx); |
| -+ acquireWriteLatch(); |
| -+ |
| -+ try { |
| -+ FunctionDropStatement stmtDropFunction = (FunctionDropStatement) stmt; |
| -+ FunctionSignature signature = stmtDropFunction.getFunctionSignature(); |
| -+ Function function = MetadataManager.INSTANCE.getFunction(mdTxnCtx, signature); |
| -+ if (function == null) { |
| -+ if (!stmtDropFunction.getIfExists()) |
| -+ throw new AlgebricksException("Unknonw function " + signature); |
| -+ } else { |
| -+ MetadataManager.INSTANCE.dropFunction(mdTxnCtx, signature); |
| -+ } |
| -+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx); |
| -+ } catch (Exception e) { |
| -+ MetadataManager.INSTANCE.abortTransaction(mdTxnCtx); |
| -+ throw new AlgebricksException(e); |
| -+ } finally { |
| -+ releaseWriteLatch(); |
| - } |
| - } |
| +- /* |
| +- * get the number of log buffers that have been written so |
| +- * far. A log buffer = number of log pages * size of a log |
| +- * page |
| +- */ |
| +- int numCycles = (int) lastFlushedLsn / logManager.getLogManagerProperties().getLogBufferSize(); |
| +- if (lastFlushedLsn % logManager.getLogManagerProperties().getLogBufferSize() == 0) { |
| +- numCycles--; |
| +- } |
| ++ // put the content to disk (the thread still has a lock on the log page) |
| ++ logManager.getLogPage(pageToFlush).flush(); |
| |
| - private void handleLoadFromFileStatement(AqlMetadataProvider metadataProvider, Statement stmt, |
| - IHyracksClientConnection hcc, List<JobSpecification> jobsToExecute) throws Exception { |
| -- MetadataTransactionContext mdTxnCtx = metadataProvider.getMetadataTxnContext(); |
| -- LoadFromFileStatement loadStmt = (LoadFromFileStatement) stmt; |
| -- String dataverseName = loadStmt.getDataverseName() == null ? activeDefaultDataverse == null ? null |
| -- : activeDefaultDataverse.getDataverseName() : loadStmt.getDataverseName().getValue(); |
| -- CompiledLoadFromFileStatement cls = new CompiledLoadFromFileStatement(dataverseName, loadStmt.getDatasetName() |
| -- .getValue(), loadStmt.getAdapter(), loadStmt.getProperties(), loadStmt.dataIsAlreadySorted()); |
| +- /* |
| +- * Map the log page to a new region in the log file. |
| +- */ |
| ++ // increment the last flushed lsn and lastFlushedPage |
| ++ logManager.incrementLastFlushedLsn(logManager.getLogManagerProperties().getLogPageSize()); |
| ++ lastFlushedPageIndex = pageToFlush; |
| |
| -- IDataFormat format = getDataFormat(metadataProvider.getMetadataTxnContext(), dataverseName); |
| -- Job job = DatasetOperations.createLoadDatasetJobSpec(metadataProvider, cls, format); |
| -- jobsToExecute.add(job.getJobSpec()); |
| -- // Also load the dataset's secondary indexes. |
| -- List<Index> datasetIndexes = MetadataManager.INSTANCE.getDatasetIndexes(mdTxnCtx, dataverseName, loadStmt |
| -- .getDatasetName().getValue()); |
| -- for (Index index : datasetIndexes) { |
| -- if (!index.isSecondaryIndex()) { |
| -- continue; |
| -+ MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction(); |
| -+ boolean bActiveTxn = true; |
| -+ metadataProvider.setMetadataTxnContext(mdTxnCtx); |
| -+ acquireReadLatch(); |
| ++ // decrement activeTxnCountOnIndexes |
| ++ logManager.decrementActiveTxnCountOnIndexes(pageToFlush); |
| + |
| -+ try { |
| -+ LoadFromFileStatement loadStmt = (LoadFromFileStatement) stmt; |
| -+ String dataverseName = loadStmt.getDataverseName() == null ? activeDefaultDataverse == null ? null |
| -+ : activeDefaultDataverse.getDataverseName() : loadStmt.getDataverseName().getValue(); |
| -+ CompiledLoadFromFileStatement cls = new CompiledLoadFromFileStatement(dataverseName, loadStmt |
| -+ .getDatasetName().getValue(), loadStmt.getAdapter(), loadStmt.getProperties(), |
| -+ loadStmt.dataIsAlreadySorted()); |
| ++ // reset the count to 1 |
| ++ logManager.getLogPageOwnershipCount(pageToFlush).set(PageOwnershipStatus.LOG_WRITER); |
| + |
| -+ IDataFormat format = getDataFormat(metadataProvider.getMetadataTxnContext(), dataverseName); |
| -+ Job job = DatasetOperations.createLoadDatasetJobSpec(metadataProvider, cls, format); |
| -+ jobsToExecute.add(job.getJobSpec()); |
| -+ // Also load the dataset's secondary indexes. |
| -+ List<Index> datasetIndexes = MetadataManager.INSTANCE.getDatasetIndexes(mdTxnCtx, dataverseName, loadStmt |
| -+ .getDatasetName().getValue()); |
| -+ for (Index index : datasetIndexes) { |
| -+ if (!index.isSecondaryIndex()) { |
| -+ continue; |
| -+ } |
| -+ // Create CompiledCreateIndexStatement from metadata entity 'index'. |
| -+ CompiledCreateIndexStatement cis = new CompiledCreateIndexStatement(index.getIndexName(), |
| -+ dataverseName, index.getDatasetName(), index.getKeyFieldNames(), index.getGramLength(), |
| -+ index.getIndexType()); |
| -+ jobsToExecute.add(IndexOperations.buildSecondaryIndexLoadingJobSpec(cis, metadataProvider)); |
| - } |
| -- // Create CompiledCreateIndexStatement from metadata entity 'index'. |
| -- CompiledCreateIndexStatement cis = new CompiledCreateIndexStatement(index.getIndexName(), dataverseName, |
| -- index.getDatasetName(), index.getKeyFieldNames(), index.getGramLength(), index.getIndexType()); |
| -- jobsToExecute.add(IndexOperations.buildSecondaryIndexLoadingJobSpec(cis, metadataProvider)); |
| -+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx); |
| -+ bActiveTxn = false; |
| -+ |
| -+ for (JobSpecification jobspec : jobsToExecute) { |
| -+ runJob(hcc, jobspec); |
| -+ } |
| -+ } catch (Exception e) { |
| -+ if (bActiveTxn) { |
| -+ MetadataManager.INSTANCE.abortTransaction(mdTxnCtx); |
| -+ } |
| -+ throw new AlgebricksException(e); |
| -+ } finally { |
| -+ releaseReadLatch(); |
| - } |
| - } |
| ++ // Map the log page to a new region in the log file. |
| + long nextWritePosition = logManager.getLogPages()[pageToFlush].getNextWritePosition() |
| + + logManager.getLogManagerProperties().getLogBufferSize(); |
| |
| - private void handleWriteFromQueryResultStatement(AqlMetadataProvider metadataProvider, Statement stmt, |
| - IHyracksClientConnection hcc, List<JobSpecification> jobsToExecute) throws Exception { |
| -- metadataProvider.setWriteTransaction(true); |
| -- WriteFromQueryResultStatement st1 = (WriteFromQueryResultStatement) stmt; |
| -- String dataverseName = st1.getDataverseName() == null ? activeDefaultDataverse == null ? null |
| -- : activeDefaultDataverse.getDataverseName() : st1.getDataverseName().getValue(); |
| -- CompiledWriteFromQueryResultStatement clfrqs = new CompiledWriteFromQueryResultStatement(dataverseName, st1 |
| -- .getDatasetName().getValue(), st1.getQuery(), st1.getVarCounter()); |
| +- /* |
| +- * long nextPos = (numCycles + 1) |
| +- * logManager.getLogManagerProperties() .getLogBufferSize() |
| +- * + pageToFlush logManager.getLogManagerProperties() |
| +- * .getLogPageSize(); |
| +- */ |
| + logManager.resetLogPage(nextWritePosition, pageToFlush); |
| |
| -- Pair<JobSpecification, FileSplit> compiled = rewriteCompileQuery(metadataProvider, clfrqs.getQuery(), clfrqs); |
| -- if (compiled.first != null) { |
| -- jobsToExecute.add(compiled.first); |
| -+ MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction(); |
| -+ boolean bActiveTxn = true; |
| -+ metadataProvider.setMetadataTxnContext(mdTxnCtx); |
| -+ acquireReadLatch(); |
| -+ |
| -+ try { |
| -+ metadataProvider.setWriteTransaction(true); |
| -+ WriteFromQueryResultStatement st1 = (WriteFromQueryResultStatement) stmt; |
| -+ String dataverseName = st1.getDataverseName() == null ? activeDefaultDataverse == null ? null |
| -+ : activeDefaultDataverse.getDataverseName() : st1.getDataverseName().getValue(); |
| -+ CompiledWriteFromQueryResultStatement clfrqs = new CompiledWriteFromQueryResultStatement(dataverseName, st1 |
| -+ .getDatasetName().getValue(), st1.getQuery(), st1.getVarCounter()); |
| -+ |
| -+ Pair<JobSpecification, FileSplit> compiled = rewriteCompileQuery(metadataProvider, clfrqs.getQuery(), |
| -+ clfrqs); |
| -+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx); |
| -+ bActiveTxn = false; |
| -+ if (compiled.first != null) { |
| -+ runJob(hcc, compiled.first); |
| -+ } |
| -+ } catch (Exception e) { |
| -+ if (bActiveTxn) { |
| -+ MetadataManager.INSTANCE.abortTransaction(mdTxnCtx); |
| -+ } |
| -+ throw new AlgebricksException(e); |
| -+ } finally { |
| -+ releaseReadLatch(); |
| - } |
| - } |
| + // mark the page as ACTIVE |
| + logManager.getLogPageStatus(pageToFlush).set(LogManager.PageState.ACTIVE); |
| |
| - private void handleInsertStatement(AqlMetadataProvider metadataProvider, Statement stmt, |
| - IHyracksClientConnection hcc, List<JobSpecification> jobsToExecute) throws Exception { |
| -- metadataProvider.setWriteTransaction(true); |
| -- InsertStatement stmtInsert = (InsertStatement) stmt; |
| -- String dataverseName = stmtInsert.getDataverseName() == null ? activeDefaultDataverse == null ? null |
| -- : activeDefaultDataverse.getDataverseName() : stmtInsert.getDataverseName().getValue(); |
| -- CompiledInsertStatement clfrqs = new CompiledInsertStatement(dataverseName, stmtInsert.getDatasetName() |
| -- .getValue(), stmtInsert.getQuery(), stmtInsert.getVarCounter()); |
| -- Pair<JobSpecification, FileSplit> compiled = rewriteCompileQuery(metadataProvider, clfrqs.getQuery(), clfrqs); |
| -- if (compiled.first != null) { |
| -- jobsToExecute.add(compiled.first); |
| -+ |
| -+ MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction(); |
| -+ boolean bActiveTxn = true; |
| -+ metadataProvider.setMetadataTxnContext(mdTxnCtx); |
| -+ acquireReadLatch(); |
| -+ |
| -+ try { |
| -+ metadataProvider.setWriteTransaction(true); |
| -+ InsertStatement stmtInsert = (InsertStatement) stmt; |
| -+ String dataverseName = stmtInsert.getDataverseName() == null ? activeDefaultDataverse == null ? null |
| -+ : activeDefaultDataverse.getDataverseName() : stmtInsert.getDataverseName().getValue(); |
| -+ CompiledInsertStatement clfrqs = new CompiledInsertStatement(dataverseName, stmtInsert.getDatasetName() |
| -+ .getValue(), stmtInsert.getQuery(), stmtInsert.getVarCounter()); |
| -+ Pair<JobSpecification, FileSplit> compiled = rewriteCompileQuery(metadataProvider, clfrqs.getQuery(), |
| -+ clfrqs); |
| -+ |
| -+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx); |
| -+ bActiveTxn = false; |
| -+ |
| -+ if (compiled.first != null) { |
| -+ runJob(hcc, compiled.first); |
| -+ } |
| -+ |
| -+ } catch (Exception e) { |
| -+ if (bActiveTxn) { |
| -+ MetadataManager.INSTANCE.abortTransaction(mdTxnCtx); |
| -+ } |
| -+ throw new AlgebricksException(e); |
| -+ } finally { |
| -+ releaseReadLatch(); |
| - } |
| - } |
| +- // notify all waiting (transaction) threads. |
| +- // Transaction thread may be waiting for the page to be |
| +- // available or may have a commit log record on the page |
| +- // that got flushed. |
| +- logManager.getLogPages()[pageToFlush].notifyAll(); |
| +- logManager.setLastFlushedPage(pageToFlush); |
| ++ //#. checks the queue whether there is another flush request on the same log buffer |
| ++ // If there is another request, then simply remove it. |
| ++ if (flushRequestQueue[pageToFlush].peek() != null) { |
| ++ flushRequestQueue[pageToFlush].take(); |
| ++ } |
| |
| - private void handleDeleteStatement(AqlMetadataProvider metadataProvider, Statement stmt, |
| - IHyracksClientConnection hcc, List<JobSpecification> jobsToExecute) throws Exception { |
| -- metadataProvider.setWriteTransaction(true); |
| -- DeleteStatement stmtDelete = (DeleteStatement) stmt; |
| -- String dataverseName = stmtDelete.getDataverseName() == null ? activeDefaultDataverse == null ? null |
| -- : activeDefaultDataverse.getDataverseName() : stmtDelete.getDataverseName().getValue(); |
| -- CompiledDeleteStatement clfrqs = new CompiledDeleteStatement(stmtDelete.getVariableExpr(), dataverseName, |
| -- stmtDelete.getDatasetName().getValue(), stmtDelete.getCondition(), stmtDelete.getDieClause(), |
| -- stmtDelete.getVarCounter(), metadataProvider); |
| -- Pair<JobSpecification, FileSplit> compiled = rewriteCompileQuery(metadataProvider, clfrqs.getQuery(), clfrqs); |
| -- if (compiled.first != null) { |
| -- jobsToExecute.add(compiled.first); |
| -+ |
| -+ MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction(); |
| -+ boolean bActiveTxn = true; |
| -+ metadataProvider.setMetadataTxnContext(mdTxnCtx); |
| -+ acquireReadLatch(); |
| -+ |
| -+ try { |
| -+ metadataProvider.setWriteTransaction(true); |
| -+ DeleteStatement stmtDelete = (DeleteStatement) stmt; |
| -+ String dataverseName = stmtDelete.getDataverseName() == null ? activeDefaultDataverse == null ? null |
| -+ : activeDefaultDataverse.getDataverseName() : stmtDelete.getDataverseName().getValue(); |
| -+ CompiledDeleteStatement clfrqs = new CompiledDeleteStatement(stmtDelete.getVariableExpr(), dataverseName, |
| -+ stmtDelete.getDatasetName().getValue(), stmtDelete.getCondition(), stmtDelete.getDieClause(), |
| -+ stmtDelete.getVarCounter(), metadataProvider); |
| -+ Pair<JobSpecification, FileSplit> compiled = rewriteCompileQuery(metadataProvider, clfrqs.getQuery(), |
| -+ clfrqs); |
| -+ |
| -+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx); |
| -+ bActiveTxn = false; |
| -+ |
| -+ if (compiled.first != null) { |
| -+ runJob(hcc, compiled.first); |
| -+ } |
| -+ |
| -+ } catch (Exception e) { |
| -+ if (bActiveTxn) { |
| -+ MetadataManager.INSTANCE.abortTransaction(mdTxnCtx); |
| -+ } |
| -+ throw new AlgebricksException(e); |
| -+ } finally { |
| -+ releaseReadLatch(); |
| ++ // notify all waiting (transaction) threads. |
| ++ logManager.getLogPage(pageToFlush).notifyAll(); |
| + } |
| + } catch (IOException ioe) { |
| + ioe.printStackTrace(); |
| + throw new Error(" exception in flushing log page", ioe); |
| + } catch (InterruptedException e) { |
| + e.printStackTrace(); |
| +- break; // must break from the loop as the exception indicates |
| +- // some thing horrendous has happened elsewhere |
| ++ break; |
| + } |
| } |
| } |
| +-} |
| +- |
| +-/* |
| +- * TODO: By default the commit policy is to commit at each request and not have |
| +- * a group commit. The following code needs to change to support group commit. |
| +- * The code for group commit has not been tested thoroughly and is under |
| +- * development. |
| +- */ |
| +-class BasicCommitResolver implements ICommitResolver { |
| +- |
| +- public boolean shouldCommitPage(int pageIndex, LogManager logManager, |
| +- CommitRequestStatistics commitRequestStatistics) { |
| +- return true; |
| +- } |
| +- |
| +- public void init(LogManager logManager) { |
| +- } |
| +-} |
| +- |
| +-class GroupCommitResolver implements ICommitResolver { |
| +- |
| +- public boolean shouldCommitPage(int pageIndex, LogManager logManager, |
| +- CommitRequestStatistics commitRequestStatistics) { |
| +- long maxCommitWait = logManager.getLogManagerProperties().getGroupCommitWaitPeriod(); |
| +- long timestamp = commitRequestStatistics.getPageLevelLastCommitRequestTimestamp(pageIndex); |
| +- if (timestamp == -1) { |
| +- if (maxCommitWait == 0) { |
| +- return true; |
| +- } else { |
| +- timestamp = System.currentTimeMillis(); |
| +- } |
| +- } |
| +- long currenTime = System.currentTimeMillis(); |
| +- if (currenTime - timestamp > maxCommitWait) { |
| +- return true; |
| +- } |
| +- return false; |
| +- } |
| +- |
| +- public void init(LogManager logManager) { |
| +- GroupCommitHandlerThread groupCommitHandler = new GroupCommitHandlerThread(logManager); |
| +- groupCommitHandler.setDaemon(true); |
| +- groupCommitHandler.start(); |
| +- } |
| +- |
| +- class GroupCommitHandlerThread extends Thread { |
| +- |
| +- private LogManager logManager; |
| +- |
| +- public GroupCommitHandlerThread(LogManager logManager) { |
| +- this.logManager = logManager; |
| +- setName("Group Commit Handler"); |
| +- } |
| +- |
| +- @Override |
| +- public void run() { |
| +- int pageIndex = -1; |
| +- while (true) { |
| +- pageIndex = logManager.getNextPageInSequence(pageIndex); |
| +- long lastCommitRequeestTimestamp = logManager.getCommitRequestStatistics() |
| +- .getPageLevelLastCommitRequestTimestamp(pageIndex); |
| +- if (lastCommitRequeestTimestamp != -1 |
| +- && System.currentTimeMillis() - lastCommitRequeestTimestamp > logManager |
| +- .getLogManagerProperties().getGroupCommitWaitPeriod()) { |
| +- int dirtyCount = logManager.getLogPageOwnershipCount(pageIndex).decrementAndGet(); |
| +- if (dirtyCount == 0) { |
| +- try { |
| +- logManager.getLogPageStatus(pageIndex).set(LogManager.PageState.INACTIVE); |
| +- logManager.getPendingFlushRequests(pageIndex).put(Thread.currentThread()); |
| +- } catch (InterruptedException e) { |
| +- e.printStackTrace(); |
| +- break; |
| +- } |
| +- logManager.getCommitRequestStatistics().committedPage(pageIndex); |
| +- } |
| +- } |
| +- } |
| +- } |
| +- } |
| +- |
| +-} |
| +- |
| +-interface ICommitResolver { |
| +- public boolean shouldCommitPage(int pageIndex, LogManager logManager, |
| +- CommitRequestStatistics commitRequestStatistics); |
| +- |
| +- public void init(LogManager logManager); |
| +-} |
| +- |
| +-/** |
| +- * Represents a collection of all commit requests by transactions for each log |
| +- * page. The requests are accumulated until the commit policy triggers a flush |
| +- * of the corresponding log page. Upon a flush of a page, all commit requests |
| +- * for the page are cleared. |
| +- */ |
| +-class CommitRequestStatistics { |
| +- |
| +- AtomicInteger[] pageLevelCommitRequestCount; |
| +- AtomicLong[] pageLevelLastCommitRequestTimestamp; |
| +- |
| +- public CommitRequestStatistics(int numPages) { |
| +- pageLevelCommitRequestCount = new AtomicInteger[numPages]; |
| +- pageLevelLastCommitRequestTimestamp = new AtomicLong[numPages]; |
| +- for (int i = 0; i < numPages; i++) { |
| +- pageLevelCommitRequestCount[i] = new AtomicInteger(0); |
| +- pageLevelLastCommitRequestTimestamp[i] = new AtomicLong(-1L); |
| +- } |
| +- } |
| +- |
| +- public void registerCommitRequest(int pageIndex) { |
| +- pageLevelCommitRequestCount[pageIndex].incrementAndGet(); |
| +- pageLevelLastCommitRequestTimestamp[pageIndex].set(System.currentTimeMillis()); |
| +- } |
| +- |
| +- public long getPageLevelLastCommitRequestTimestamp(int pageIndex) { |
| +- return pageLevelLastCommitRequestTimestamp[pageIndex].get(); |
| +- } |
| +- |
| +- public void committedPage(int pageIndex) { |
| +- pageLevelCommitRequestCount[pageIndex].set(0); |
| +- pageLevelLastCommitRequestTimestamp[pageIndex].set(-1L); |
| +- } |
| +- |
| +-} |
| ++} |
| +\ No newline at end of file |
| +Index: asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogRecordHelper.java |
| +=================================================================== |
| +--- asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogRecordHelper.java (revision 1194) |
| ++++ asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogRecordHelper.java (working copy) |
| +@@ -152,6 +152,9 @@ |
| + case LogType.UPDATE: |
| + logTypeDisplay = "UPDATE"; |
| + break; |
| ++ case LogType.ENTITY_COMMIT: |
| ++ logTypeDisplay = "ENTITY_COMMIT"; |
| ++ break; |
| + } |
| + builder.append(" LSN : ").append(logicalLogLocator.getLsn()); |
| + builder.append(" Log Type : ").append(logTypeDisplay); |
| +Index: asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogType.java |
| +=================================================================== |
| +--- asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogType.java (revision 1194) |
| ++++ asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogType.java (working copy) |
| +@@ -18,5 +18,6 @@ |
| |
| -@@ -704,46 +1106,109 @@ |
| + public static final byte UPDATE = 0; |
| + public static final byte COMMIT = 1; |
| ++ public static final byte ENTITY_COMMIT = 2; |
| |
| - private void handleBeginFeedStatement(AqlMetadataProvider metadataProvider, Statement stmt, |
| - IHyracksClientConnection hcc, List<JobSpecification> jobsToExecute) throws Exception { |
| -- BeginFeedStatement bfs = (BeginFeedStatement) stmt; |
| -- String dataverseName = bfs.getDataverseName() == null ? activeDefaultDataverse == null ? null |
| -- : activeDefaultDataverse.getDataverseName() : bfs.getDatasetName().getValue(); |
| + } |
| +Index: asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogManagerProperties.java |
| +=================================================================== |
| +--- asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogManagerProperties.java (revision 1194) |
| ++++ asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogManagerProperties.java (working copy) |
| +@@ -1,5 +1,5 @@ |
| + /* |
| +- * Copyright 2009-2010 by The Regents of the University of California |
| ++ * Copyright 2009-2012 by The Regents of the University of California |
| + * Licensed under the Apache License, Version 2.0 (the "License"); |
| + * you may not use this file except in compliance with the License. |
| + * you may obtain a copy of the License from |
| +@@ -41,7 +41,7 @@ |
| + private int logPageSize = 128 * 1024; // 128 KB |
| + private int numLogPages = 8; // number of log pages in the log buffer. |
| |
| -- CompiledBeginFeedStatement cbfs = new CompiledBeginFeedStatement(dataverseName, |
| -- bfs.getDatasetName().getValue(), bfs.getQuery(), bfs.getVarCounter()); |
| -+ MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction(); |
| -+ boolean bActiveTxn = true; |
| -+ metadataProvider.setMetadataTxnContext(mdTxnCtx); |
| -+ acquireReadLatch(); |
| +- private long groupCommitWaitPeriod = 0; // time in milliseconds for which a |
| ++ private long groupCommitWaitPeriod = 1; // time in milliseconds for which a |
| + // commit record will wait before |
| + // the housing page is marked for |
| + // flushing. |
| +Index: asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/recovery/RecoveryManager.java |
| +=================================================================== |
| +--- asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/recovery/RecoveryManager.java (revision 1194) |
| ++++ asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/recovery/RecoveryManager.java (working copy) |
| +@@ -184,6 +184,7 @@ |
| + break; |
| |
| -- Dataset dataset; |
| -- dataset = MetadataManager.INSTANCE.getDataset(metadataProvider.getMetadataTxnContext(), dataverseName, bfs |
| -- .getDatasetName().getValue()); |
| -- IDatasetDetails datasetDetails = dataset.getDatasetDetails(); |
| -- if (datasetDetails.getDatasetType() != DatasetType.FEED) { |
| -- throw new IllegalArgumentException("Dataset " + bfs.getDatasetName().getValue() + " is not a feed dataset"); |
| -+ try { |
| -+ BeginFeedStatement bfs = (BeginFeedStatement) stmt; |
| -+ String dataverseName = bfs.getDataverseName() == null ? activeDefaultDataverse == null ? null |
| -+ : activeDefaultDataverse.getDataverseName() : bfs.getDatasetName().getValue(); |
| -+ |
| -+ CompiledBeginFeedStatement cbfs = new CompiledBeginFeedStatement(dataverseName, bfs.getDatasetName() |
| -+ .getValue(), bfs.getQuery(), bfs.getVarCounter()); |
| -+ |
| -+ Dataset dataset; |
| -+ dataset = MetadataManager.INSTANCE.getDataset(metadataProvider.getMetadataTxnContext(), dataverseName, bfs |
| -+ .getDatasetName().getValue()); |
| -+ IDatasetDetails datasetDetails = dataset.getDatasetDetails(); |
| -+ if (datasetDetails.getDatasetType() != DatasetType.FEED) { |
| -+ throw new IllegalArgumentException("Dataset " + bfs.getDatasetName().getValue() |
| -+ + " is not a feed dataset"); |
| -+ } |
| -+ bfs.initialize(metadataProvider.getMetadataTxnContext(), dataset); |
| -+ cbfs.setQuery(bfs.getQuery()); |
| -+ Pair<JobSpecification, FileSplit> compiled = rewriteCompileQuery(metadataProvider, bfs.getQuery(), cbfs); |
| -+ |
| -+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx); |
| -+ bActiveTxn = false; |
| -+ |
| -+ if (compiled.first != null) { |
| -+ runJob(hcc, compiled.first); |
| -+ } |
| -+ |
| -+ } catch (Exception e) { |
| -+ if (bActiveTxn) { |
| -+ MetadataManager.INSTANCE.abortTransaction(mdTxnCtx); |
| -+ } |
| -+ throw new AlgebricksException(e); |
| -+ } finally { |
| -+ releaseReadLatch(); |
| - } |
| -- bfs.initialize(metadataProvider.getMetadataTxnContext(), dataset); |
| -- cbfs.setQuery(bfs.getQuery()); |
| -- Pair<JobSpecification, FileSplit> compiled = rewriteCompileQuery(metadataProvider, bfs.getQuery(), cbfs); |
| -- if (compiled.first != null) { |
| -- jobsToExecute.add(compiled.first); |
| -- } |
| - } |
| + case LogType.COMMIT: |
| ++ case LogType.ENTITY_COMMIT: |
| + tempKeyTxnId.setTxnId(logRecordHelper.getJobId(currentLogLocator), |
| + logRecordHelper.getDatasetId(currentLogLocator), |
| + logRecordHelper.getPKHashValue(currentLogLocator)); |
| +@@ -218,6 +219,7 @@ |
| + IIndex index = null; |
| + LocalResource localResource = null; |
| + ILocalResourceMetadata localResourceMetadata = null; |
| ++ List<Long> resourceIdList = new ArrayList<Long>(); |
| |
| - private void handleControlFeedStatement(AqlMetadataProvider metadataProvider, Statement stmt, |
| - IHyracksClientConnection hcc, List<JobSpecification> jobsToExecute) throws Exception { |
| -- ControlFeedStatement cfs = (ControlFeedStatement) stmt; |
| -- String dataverseName = cfs.getDataverseName() == null ? activeDefaultDataverse == null ? null |
| -- : activeDefaultDataverse.getDataverseName() : cfs.getDatasetName().getValue(); |
| -- CompiledControlFeedStatement clcfs = new CompiledControlFeedStatement(cfs.getOperationType(), dataverseName, |
| -- cfs.getDatasetName().getValue(), cfs.getAlterAdapterConfParams()); |
| -- jobsToExecute.add(FeedOperations.buildControlFeedJobSpec(clcfs, metadataProvider)); |
| -+ |
| -+ MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction(); |
| -+ boolean bActiveTxn = true; |
| -+ metadataProvider.setMetadataTxnContext(mdTxnCtx); |
| -+ acquireReadLatch(); |
| -+ |
| -+ try { |
| -+ ControlFeedStatement cfs = (ControlFeedStatement) stmt; |
| -+ String dataverseName = cfs.getDataverseName() == null ? activeDefaultDataverse == null ? null |
| -+ : activeDefaultDataverse.getDataverseName() : cfs.getDatasetName().getValue(); |
| -+ CompiledControlFeedStatement clcfs = new CompiledControlFeedStatement(cfs.getOperationType(), |
| -+ dataverseName, cfs.getDatasetName().getValue(), cfs.getAlterAdapterConfParams()); |
| -+ JobSpecification jobSpec = FeedOperations.buildControlFeedJobSpec(clcfs, metadataProvider); |
| -+ |
| -+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx); |
| -+ bActiveTxn = false; |
| -+ |
| -+ runJob(hcc, jobSpec); |
| -+ |
| -+ } catch (Exception e) { |
| -+ if (bActiveTxn) { |
| -+ MetadataManager.INSTANCE.abortTransaction(mdTxnCtx); |
| -+ } |
| -+ throw new AlgebricksException(e); |
| -+ } finally { |
| -+ releaseReadLatch(); |
| -+ } |
| - } |
| + //#. get indexLifeCycleManager |
| + IAsterixAppRuntimeContextProvider appRuntimeContext = txnSubsystem.getAsterixAppRuntimeContextProvider(); |
| +@@ -272,6 +274,8 @@ |
| + index = localResourceMetadata.createIndexInstance(appRuntimeContext, |
| + localResource.getResourceName(), localResource.getPartition()); |
| + indexLifecycleManager.register(resourceId, index); |
| ++ indexLifecycleManager.open(resourceId); |
| ++ resourceIdList.add(resourceId); |
| + } |
| |
| - private QueryResult handleQuery(AqlMetadataProvider metadataProvider, Query query, IHyracksClientConnection hcc, |
| - List<JobSpecification> jobsToExecute) throws Exception { |
| -- Pair<JobSpecification, FileSplit> compiled = rewriteCompileQuery(metadataProvider, query, null); |
| -- if (compiled.first != null) { |
| -- GlobalConfig.ASTERIX_LOGGER.info(compiled.first.toJSON().toString(1)); |
| -- jobsToExecute.add(compiled.first); |
| -+ |
| -+ MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction(); |
| -+ boolean bActiveTxn = true; |
| -+ metadataProvider.setMetadataTxnContext(mdTxnCtx); |
| -+ acquireReadLatch(); |
| -+ |
| -+ try { |
| -+ Pair<JobSpecification, FileSplit> compiled = rewriteCompileQuery(metadataProvider, query, null); |
| -+ |
| -+ QueryResult queryResult = new QueryResult(query, compiled.second.getLocalFile().getFile().getAbsolutePath()); |
| -+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx); |
| -+ bActiveTxn = false; |
| -+ |
| -+ if (compiled.first != null) { |
| -+ GlobalConfig.ASTERIX_LOGGER.info(compiled.first.toJSON().toString(1)); |
| -+ runJob(hcc, compiled.first); |
| -+ } |
| -+ |
| -+ return queryResult; |
| -+ } catch (Exception e) { |
| -+ if (bActiveTxn) { |
| -+ MetadataManager.INSTANCE.abortTransaction(mdTxnCtx); |
| -+ } |
| -+ throw new AlgebricksException(e); |
| -+ } finally { |
| -+ releaseReadLatch(); |
| - } |
| -- return new QueryResult(query, compiled.second.getLocalFile().getFile().getAbsolutePath()); |
| - } |
| + /***************************************************/ |
| +@@ -300,6 +304,7 @@ |
| + break; |
| |
| - private void runCreateIndexJob(IHyracksClientConnection hcc, CreateIndexStatement stmtCreateIndex, |
| -@@ -768,20 +1233,32 @@ |
| - private void handleCreateNodeGroupStatement(AqlMetadataProvider metadataProvider, Statement stmt, |
| - List<JobSpecification> jobsToExecute) throws MetadataException, AlgebricksException, RemoteException, |
| - ACIDException { |
| -- MetadataTransactionContext mdTxnCtx = metadataProvider.getMetadataTxnContext(); |
| -- NodegroupDecl stmtCreateNodegroup = (NodegroupDecl) stmt; |
| -- String ngName = stmtCreateNodegroup.getNodegroupName().getValue(); |
| -- NodeGroup ng = MetadataManager.INSTANCE.getNodegroup(mdTxnCtx, ngName); |
| -- if (ng != null) { |
| -- if (!stmtCreateNodegroup.getIfNotExists()) |
| -- throw new AlgebricksException("A nodegroup with this name " + ngName + " already exists."); |
| -- } else { |
| -- List<Identifier> ncIdentifiers = stmtCreateNodegroup.getNodeControllerNames(); |
| -- List<String> ncNames = new ArrayList<String>(ncIdentifiers.size()); |
| -- for (Identifier id : ncIdentifiers) { |
| -- ncNames.add(id.getValue()); |
| -+ |
| -+ MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction(); |
| -+ metadataProvider.setMetadataTxnContext(mdTxnCtx); |
| -+ acquireWriteLatch(); |
| -+ |
| -+ try { |
| -+ NodegroupDecl stmtCreateNodegroup = (NodegroupDecl) stmt; |
| -+ String ngName = stmtCreateNodegroup.getNodegroupName().getValue(); |
| -+ NodeGroup ng = MetadataManager.INSTANCE.getNodegroup(mdTxnCtx, ngName); |
| -+ if (ng != null) { |
| -+ if (!stmtCreateNodegroup.getIfNotExists()) |
| -+ throw new AlgebricksException("A nodegroup with this name " + ngName + " already exists."); |
| -+ } else { |
| -+ List<Identifier> ncIdentifiers = stmtCreateNodegroup.getNodeControllerNames(); |
| -+ List<String> ncNames = new ArrayList<String>(ncIdentifiers.size()); |
| -+ for (Identifier id : ncIdentifiers) { |
| -+ ncNames.add(id.getValue()); |
| -+ } |
| -+ MetadataManager.INSTANCE.addNodegroup(mdTxnCtx, new NodeGroup(ngName, ncNames)); |
| + case LogType.COMMIT: |
| ++ case LogType.ENTITY_COMMIT: |
| + //do nothing |
| + break; |
| + |
| +@@ -308,6 +313,11 @@ |
| } |
| -- MetadataManager.INSTANCE.addNodegroup(mdTxnCtx, new NodeGroup(ngName, ncNames)); |
| -+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx); |
| -+ } catch (Exception e) { |
| -+ MetadataManager.INSTANCE.abortTransaction(mdTxnCtx); |
| -+ throw new AlgebricksException(e); |
| -+ } finally { |
| -+ releaseWriteLatch(); |
| } |
| - } |
| - |
| -@@ -791,10 +1268,37 @@ |
| - |
| - private void compileIndexDropStatement(IHyracksClientConnection hcc, String dataverseName, String datasetName, |
| - String indexName, AqlMetadataProvider metadataProvider) throws Exception { |
| -+ MetadataTransactionContext mdTxnCtx = metadataProvider.getMetadataTxnContext(); |
| -+ Index index = MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataverseName, datasetName, indexName); |
| -+ |
| -+ //#. mark PendingDropOp on the existing index |
| -+ MetadataManager.INSTANCE.dropIndex(mdTxnCtx, dataverseName, datasetName, indexName); |
| -+ MetadataManager.INSTANCE.addIndex( |
| -+ mdTxnCtx, |
| -+ new Index(dataverseName, datasetName, indexName, index.getIndexType(), index.getKeyFieldNames(), index |
| -+ .isPrimaryIndex(), IMetadataEntity.PENDING_DROP_OP)); |
| - CompiledIndexDropStatement cds = new CompiledIndexDropStatement(dataverseName, datasetName, indexName); |
| -- runJob(hcc, IndexOperations.buildDropSecondaryIndexJobSpec(cds, metadataProvider)); |
| -- MetadataManager.INSTANCE.dropIndex(metadataProvider.getMetadataTxnContext(), dataverseName, datasetName, |
| -- indexName); |
| -+ JobSpecification jobSpec = IndexOperations.buildDropSecondaryIndexJobSpec(cds, metadataProvider); |
| -+ |
| -+ //#. commit the existing transaction before calling runJob. |
| -+ // the caller should begin the transaction before calling this function. |
| -+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx); |
| -+ |
| -+ try { |
| -+ runJob(hcc, jobSpec); |
| -+ } catch (Exception e) { |
| -+ //need to create the mdTxnCtx to be aborted by caller properly |
| -+ mdTxnCtx = MetadataManager.INSTANCE.beginTransaction(); |
| -+ metadataProvider.setMetadataTxnContext(mdTxnCtx); |
| -+ throw e; |
| + |
| ++ //close all indexes |
| ++ for (long r : resourceIdList) { |
| ++ indexLifecycleManager.close(r); |
| + } |
| -+ |
| -+ //#. begin a new transaction |
| -+ mdTxnCtx = MetadataManager.INSTANCE.beginTransaction(); |
| -+ metadataProvider.setMetadataTxnContext(mdTxnCtx); |
| -+ |
| -+ //#. finally, delete the existing index |
| -+ MetadataManager.INSTANCE.dropIndex(mdTxnCtx, dataverseName, datasetName, indexName); |
| ++ |
| + JobIdFactory.initJobId(maxJobId); |
| } |
| |
| - private void compileDatasetDropStatement(IHyracksClientConnection hcc, String dataverseName, String datasetName, |
| -@@ -803,10 +1307,32 @@ |
| - CompiledDatasetDropStatement cds = new CompiledDatasetDropStatement(dataverseName, datasetName); |
| - Dataset ds = MetadataManager.INSTANCE.getDataset(mdTxnCtx, dataverseName, datasetName); |
| - if (ds.getDatasetType() == DatasetType.INTERNAL || ds.getDatasetType() == DatasetType.FEED) { |
| -- JobSpecification[] jobSpecs = DatasetOperations.createDropDatasetJobSpec(cds, metadataProvider); |
| -- for (JobSpecification spec : jobSpecs) |
| -- runJob(hcc, spec); |
| -+ JobSpecification jobSpec = DatasetOperations.createDropDatasetJobSpec(cds, metadataProvider); |
| -+ |
| -+ //#. mark PendingDropOp on the existing dataset |
| -+ MetadataManager.INSTANCE.dropDataset(mdTxnCtx, dataverseName, datasetName); |
| -+ MetadataManager.INSTANCE.addDataset(mdTxnCtx, new Dataset(dataverseName, datasetName, ds.getItemTypeName(), |
| -+ ds.getDatasetDetails(), ds.getDatasetType(), ds.getDatasetId(), IMetadataEntity.PENDING_DROP_OP)); |
| -+ |
| -+ //#. commit the transaction |
| -+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx); |
| -+ |
| -+ //#. run the job |
| -+ try { |
| -+ runJob(hcc, jobSpec); |
| -+ } catch (Exception e) { |
| -+ //need to create the mdTxnCtx to be aborted by caller properly |
| -+ mdTxnCtx = MetadataManager.INSTANCE.beginTransaction(); |
| -+ metadataProvider.setMetadataTxnContext(mdTxnCtx); |
| -+ throw e; |
| +@@ -539,6 +549,7 @@ |
| + break; |
| + |
| + case LogType.COMMIT: |
| ++ case LogType.ENTITY_COMMIT: |
| + undoLSNSet = loserTxnTable.get(tempKeyTxnId); |
| + if (undoLSNSet != null) { |
| + loserTxnTable.remove(tempKeyTxnId); |
| +Index: asterix-app/src/test/java/edu/uci/ics/asterix/test/metadata/MetadataTest.java |
| +=================================================================== |
| +--- asterix-app/src/test/java/edu/uci/ics/asterix/test/metadata/MetadataTest.java (revision 1194) |
| ++++ asterix-app/src/test/java/edu/uci/ics/asterix/test/metadata/MetadataTest.java (working copy) |
| +@@ -42,6 +42,16 @@ |
| + List<CompilationUnit> cUnits = tcCtx.getTestCase().getCompilationUnit(); |
| + for (CompilationUnit cUnit : cUnits) { |
| + File testFile = tcCtx.getTestFile(cUnit); |
| ++ |
| ++ /***************** |
| ++ if (!testFile.getAbsolutePath().contains("meta09.aql")) { |
| ++ System.out.println(testFile.getAbsolutePath()); |
| ++ continue; |
| + } |
| -+ |
| -+ //#. start a new transaction |
| -+ mdTxnCtx = MetadataManager.INSTANCE.beginTransaction(); |
| -+ metadataProvider.setMetadataTxnContext(mdTxnCtx); |
| - } |
| -+ |
| -+ //#. finally, delete the existing dataset. |
| - MetadataManager.INSTANCE.dropDataset(mdTxnCtx, dataverseName, datasetName); |
| - } |
| ++ System.out.println(testFile.getAbsolutePath()); |
| ++ *****************/ |
| ++ |
| ++ |
| + File expectedResultFile = tcCtx.getExpectedResultFile(cUnit); |
| + File actualFile = new File(PATH_ACTUAL + File.separator |
| + + tcCtx.getTestCase().getFilePath().replace(File.separator, "_") + "_" + cUnit.getName() + ".adm"); |
| +Index: asterix-app/src/test/java/edu/uci/ics/asterix/test/runtime/ExecutionTest.java |
| +=================================================================== |
| +--- asterix-app/src/test/java/edu/uci/ics/asterix/test/runtime/ExecutionTest.java (revision 1194) |
| ++++ asterix-app/src/test/java/edu/uci/ics/asterix/test/runtime/ExecutionTest.java (working copy) |
| +@@ -95,9 +95,10 @@ |
| + File testFile = tcCtx.getTestFile(cUnit); |
| |
| -@@ -831,4 +1357,20 @@ |
| - } |
| - return format; |
| - } |
| -+ |
| -+ private void acquireWriteLatch() { |
| -+ cacheLatch.writeLock().lock(); |
| -+ } |
| -+ |
| -+ private void releaseWriteLatch() { |
| -+ cacheLatch.writeLock().unlock(); |
| -+ } |
| -+ |
| -+ private void acquireReadLatch() { |
| -+ cacheLatch.readLock().lock(); |
| -+ } |
| -+ |
| -+ private void releaseReadLatch() { |
| -+ cacheLatch.readLock().unlock(); |
| -+ } |
| - } |
| + /*************** to avoid run failure cases **************** |
| +- if (!testFile.getAbsolutePath().contains("index-selection/")) { |
| ++ if (!testFile.getAbsolutePath().contains("query-issue205.aql")) { |
| + continue; |
| + } |
| ++ System.out.println(testFile.getAbsolutePath()); |
| + ************************************************************/ |
| + |
| + File expectedResultFile = tcCtx.getExpectedResultFile(cUnit); |