blob: e0e67149c8d811a6a6331ed68ea4a57f48412246 [file] [log] [blame]
Index: asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionManager.java
===================================================================
--- asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionManager.java (revision 1194)
+++ asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionManager.java (working copy)
@@ -103,12 +103,14 @@
//for entity-level commit
if (PKHashVal != -1) {
transactionProvider.getLockManager().unlock(datasetId, PKHashVal, txnContext, true);
+ /*****************************
try {
//decrease the transaction reference count on index
txnContext.decreaseActiveTransactionCountOnIndexes();
} catch (HyracksDataException e) {
throw new ACIDException("failed to complete index operation", e);
}
+ *****************************/
return;
}
Index: asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionContext.java
===================================================================
--- asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionContext.java (revision 1194)
+++ asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionContext.java (working copy)
@@ -19,6 +19,7 @@
import java.util.HashSet;
import java.util.List;
import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
import edu.uci.ics.asterix.transaction.management.exception.ACIDException;
import edu.uci.ics.asterix.transaction.management.opcallbacks.AbstractOperationCallback;
@@ -169,5 +170,14 @@
closeable.close(this);
}
}
+
+ @Override
+ public int hashCode() {
+ return jobId.getId();
+ }
+ @Override
+ public boolean equals(Object o) {
+ return (o == this);
+ }
}
Index: asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/LockManager.java
===================================================================
--- asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/LockManager.java (revision 1194)
+++ asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/LockManager.java (working copy)
@@ -567,7 +567,7 @@
if (commitFlag) {
if (txnContext.getTransactionType().equals(TransactionContext.TransactionType.READ_WRITE)) {
try {
- txnSubsystem.getLogManager().log(LogType.COMMIT, txnContext, datasetId.getId(),
+ txnSubsystem.getLogManager().log(LogType.ENTITY_COMMIT, txnContext, datasetId.getId(),
entityHashValue, -1, (byte) 0, 0, null, null, logicalLogLocator);
} catch (ACIDException e) {
try {
Index: asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/FileBasedBuffer.java
===================================================================
--- asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/FileBasedBuffer.java (revision 1194)
+++ asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/FileBasedBuffer.java (working copy)
@@ -75,6 +75,7 @@
buffer.position(0);
buffer.limit(size);
fileChannel.write(buffer);
+ fileChannel.force(false);
erase();
}
Index: asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogManager.java
===================================================================
--- asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogManager.java (revision 1194)
+++ asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogManager.java (working copy)
@@ -1,5 +1,5 @@
/*
- * Copyright 2009-2010 by The Regents of the University of California
+ * Copyright 2009-2012 by The Regents of the University of California
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* you may obtain a copy of the License from
@@ -21,7 +21,12 @@
import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
import java.util.Properties;
+import java.util.Set;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
@@ -30,22 +35,25 @@
import edu.uci.ics.asterix.transaction.management.exception.ACIDException;
import edu.uci.ics.asterix.transaction.management.service.logging.IndexLogger.ReusableLogContentObject;
+import edu.uci.ics.asterix.transaction.management.service.logging.LogManager.PageOwnershipStatus;
+import edu.uci.ics.asterix.transaction.management.service.logging.LogManager.PageState;
import edu.uci.ics.asterix.transaction.management.service.transaction.TransactionContext;
import edu.uci.ics.asterix.transaction.management.service.transaction.TransactionManagementConstants;
import edu.uci.ics.asterix.transaction.management.service.transaction.TransactionSubsystem;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
public class LogManager implements ILogManager {
public static final boolean IS_DEBUG_MODE = false;//true
private static final Logger LOGGER = Logger.getLogger(LogManager.class.getName());
- private TransactionSubsystem provider;
+ private final TransactionSubsystem provider;
private LogManagerProperties logManagerProperties;
+ private LogPageFlushThread logPageFlusher;
/*
* the array of log pages. The number of log pages is configurable. Pages
* taken together form an in-memory log buffer.
*/
-
private IFileBasedBuffer[] logPages;
private ILogRecordHelper logRecordHelper;
@@ -54,6 +62,7 @@
* Number of log pages that constitute the in-memory log buffer.
*/
private int numLogPages;
+
/*
* Initially all pages have an owner count of 1 that is the LogManager. When
* a transaction requests to write in a log page, the owner count is
@@ -62,12 +71,11 @@
* (covering the whole log record). When the content has been put, the log
* manager computes the checksum and puts it after the content. At this
* point, the ownership count is decremented as the transaction is done with
- * using the page. When a page is full, the log manager decrements the count
- * by one indicating that it has released its ownership of the log page.
- * There could be other transaction(s) still owning the page (that is they
- * could still be mid-way putting the log content). When the ownership count
- * eventually reaches zero, the thread responsible for flushing the log page
- * is notified and the page is flushed to disk.
+ * using the page. When a page is requested to be flushed, logPageFlusher
+ * set the count to 0(LOG_FLUSHER: meaning that the page is being flushed)
+ * only if the count is 1(LOG_WRITER: meaning that there is no other
+ * transactions who own the page to write logs.) After flushing the page,
+ * logPageFlusher set this count to 1.
*/
private AtomicInteger[] logPageOwnerCount;
@@ -78,18 +86,16 @@
/*
* LogPageStatus: A page is either ACTIVE or INACTIVE. The status for each
- * page is maintained in a map called logPageStatus. A page is ACTIVE when
- * the LogManager can allocate space in the page for writing a log record.
- * Initially all pages are ACTIVE. As transactions fill up space by writing
- * log records, a page may not have sufficient space left for serving a
- * request by a transaction. When this happens, the page is marked INACTIVE.
- * An INACTIVE page with no owners ( logPageOwnerCount.get(<pageIndex>) ==
- * 0) indicates that the page must be flushed to disk before any other log
- * record is written on the page.F
+ * page is maintained in logPageStatus. A page is ACTIVE when the LogManager
+ * can allocate space in the page for writing a log record. Initially all
+ * pages are ACTIVE. As transactions fill up space by writing log records,
+ * a page may not have sufficient space left for serving a request by a
+ * transaction. When this happens, the page is flushed to disk by calling
+ * logPageFlusher.requestFlush(). In the requestFlush(), after groupCommitWaitTime,
+ * the page status is set to INACTIVE. Then, there is no more writer on the
+ * page(meaning the corresponding logPageOwnerCount is 1), the page is flushed
+ * by the logPageFlusher and the status is reset to ACTIVE by the logPageFlusher.
*/
-
- // private Map<Integer, Integer> logPageStatus = new
- // ConcurrentHashMap<Integer, Integer>();
private AtomicInteger[] logPageStatus;
static class PageState {
@@ -98,41 +104,8 @@
}
private AtomicLong lastFlushedLsn = new AtomicLong(-1);
- private AtomicInteger lastFlushedPage = new AtomicInteger(-1);
/*
- * pendingFlushRequests is a map with key as Integer denoting the page
- * index. When a (transaction) thread discovers the need to flush a page, it
- * puts its Thread object into the corresponding value that is a
- * LinkedBlockingQueue. The LogManager has a LogFlusher thread that scans
- * this map in order of page index (and circling around). The flusher thread
- * needs to flush pages in order and waits for a thread to deposit an object
- * in the blocking queue corresponding to the next page in order. A request
- * to flush a page is conveyed to the flush thread by simply depositing an
- * object in to corresponding blocking queue. It is blocking in the sense
- * that the flusher thread will continue to wait for an object to arrive in
- * the queue. The object itself is ignored by the fliusher and just acts as
- * a signal/event that a page needs to be flushed.
- */
-
- private LinkedBlockingQueue[] pendingFlushRequests;
-
- /*
- * ICommitResolver is an interface that provides an API that can answer a
- * simple boolean - Given the commit requests so far, should a page be
- * flushed. The implementation of the interface contains the logic (or you
- * can say the policy) for commit. It could be group commit in which case
- * the commit resolver may not return a true indicating that it wishes to
- * delay flushing of the page.
- */
- private ICommitResolver commitResolver;
-
- /*
- * An object that keeps track of the submitted commit requests.
- */
- private CommitRequestStatistics commitRequestStatistics;
-
- /*
* When the transaction eco-system comes to life, the log manager positions
* itself to the end of the last written log. the startingLsn represent the
* lsn value of the next log record to be written after a system (re)start.
@@ -146,16 +119,10 @@
*/
private AtomicLong lsn = new AtomicLong(0);
- /*
- * A map that tracks the flush requests submitted for each page. The
- * requests for a page are cleared when the page is flushed.
- */
- public LinkedBlockingQueue<Thread> getPendingFlushRequests(int pageIndex) {
- return pendingFlushRequests[pageIndex];
- }
+ private List<HashMap<TransactionContext, Integer>> activeTxnCountMaps;
- public void addFlushRequest(int pageIndex) {
- pendingFlushRequests[pageIndex].add(pendingFlushRequests);
+ public void addFlushRequest(int pageIndex, long lsn, boolean isSynchronous) {
+ logPageFlusher.requestFlush(pageIndex, lsn, isSynchronous);
}
public AtomicLong getLastFlushedLsn() {
@@ -233,19 +200,12 @@
numLogPages = logManagerProperties.getNumLogPages();
logPageOwnerCount = new AtomicInteger[numLogPages];
logPageStatus = new AtomicInteger[numLogPages];
- pendingFlushRequests = new LinkedBlockingQueue[numLogPages];
- if (logManagerProperties.getGroupCommitWaitPeriod() > 0) { // configure
- // the
- // Commit
- // Resolver
- commitResolver = new GroupCommitResolver(); // Group Commit is
- // enabled
- commitRequestStatistics = new CommitRequestStatistics(numLogPages);
- } else {
- commitResolver = new BasicCommitResolver(); // the basic commit
- // resolver
+
+ activeTxnCountMaps = new ArrayList<HashMap<TransactionContext, Integer>>(numLogPages);
+ for (int i = 0; i < numLogPages; i++) {
+ activeTxnCountMaps.add(new HashMap<TransactionContext, Integer>());
}
- this.commitResolver.init(this); // initialize the commit resolver
+
logPages = new FileBasedBuffer[numLogPages];
/*
@@ -264,7 +224,6 @@
for (int i = 0; i < numLogPages; i++) {
logPageOwnerCount[i] = new AtomicInteger(PageOwnershipStatus.LOG_WRITER);
logPageStatus[i] = new AtomicInteger(PageState.ACTIVE);
- pendingFlushRequests[i] = new LinkedBlockingQueue<Thread>();
}
/*
@@ -278,9 +237,9 @@
* daemon thread so that it does not stop the JVM from exiting when all
* other threads are done with their work.
*/
- LogPageFlushThread logFlusher = new LogPageFlushThread(this);
- logFlusher.setDaemon(true);
- logFlusher.start();
+ logPageFlusher = new LogPageFlushThread(this);
+ logPageFlusher.setDaemon(true);
+ logPageFlusher.start();
}
public int getLogPageIndex(long lsnValue) {
@@ -312,7 +271,7 @@
*/
private void waitUntillPageIsAvailableForWritingLog(int pageIndex) throws ACIDException {
if (logPageStatus[pageIndex].get() == PageState.ACTIVE
- && getLogPageOwnershipCount(pageIndex).get() >= PageOwnershipStatus.LOG_WRITER) {
+ && logPageOwnerCount[pageIndex].get() >= PageOwnershipStatus.LOG_WRITER) {
return;
}
try {
@@ -338,47 +297,40 @@
*/
private long getLsn(int entrySize, byte logType) throws ACIDException {
long pageSize = logManagerProperties.getLogPageSize();
- boolean requiresFlushing = logType == LogType.COMMIT;
+
while (true) {
boolean forwardPage = false;
- boolean shouldFlushPage = false;
long old = lsn.get();
- int pageIndex = getLogPageIndex(old); // get the log page
- // corresponding to the
- // current lsn value
+
+ //get the log page corresponding to the current lsn value
+ int pageIndex = getLogPageIndex(old);
long retVal = old;
- long next = old + entrySize; // the lsn value for the next request,
- // if the current request is served.
+
+ // the lsn value for the next request if the current request is served.
+ long next = old + entrySize;
int prevPage = -1;
- if ((next - 1) / pageSize != old / pageSize // check if the log
- // record will cross
- // page boundaries, a
- // case that is not
- // allowed.
- || (next % pageSize == 0)) {
+
+ // check if the log record will cross page boundaries, a case that is not allowed.
+ if ((next - 1) / pageSize != old / pageSize || (next % pageSize == 0)) {
+
if ((old != 0 && old % pageSize == 0)) {
- retVal = old; // On second thought, this shall never be the
- // case as it means that the lsn is
- // currently at the beginning of a page and
- // we still need to forward the page which
- // means that the entrySize exceeds a log
- // page size. If this is the case, an
- // exception is thrown before calling this
- // API.
- // would remove this case.
+ // On second thought, this shall never be the case as it means that the lsn is
+ // currently at the beginning of a page and we still need to forward the page which
+ // means that the entrySize exceeds a log page size. If this is the case, an
+ // exception is thrown before calling this API. would remove this case.
+ retVal = old;
} else {
- retVal = ((old / pageSize) + 1) * pageSize; // set the lsn
- // to point to
- // the beginning
- // of the next
- // page.
+ // set the lsn to point to the beginning of the next page.
+ retVal = ((old / pageSize) + 1) * pageSize;
}
+
next = retVal;
- forwardPage = true; // as the log record shall cross log page
- // boundary, we must re-assign the lsn (so
- // that the log record begins on a different
- // location.
+
+ // as the log record shall cross log page boundary, we must re-assign the lsn so
+ // that the log record begins on a different location.
+ forwardPage = true;
+
prevPage = pageIndex;
pageIndex = getNextPageInSequence(pageIndex);
}
@@ -397,109 +349,51 @@
*/
waitUntillPageIsAvailableForWritingLog(pageIndex);
- if (!forwardPage && requiresFlushing) {
- shouldFlushPage = commitResolver.shouldCommitPage(pageIndex, this, commitRequestStatistics);
- if (shouldFlushPage) {
- next = ((next / pageSize) + 1) * pageSize; /*
- * next
- * represents the
- * next value of
- * lsn after this
- * log record has
- * been written.
- * If the page
- * needs to be
- * flushed, then
- * we do not give
- * any more LSNs
- * from this
- * page.
- */
- }
- }
- if (!lsn.compareAndSet(old, next)) { // Atomic call -> returns true
- // only when the value
- // represented by lsn is same as
- // "old". The value is updated
- // to "next".
+ if (!lsn.compareAndSet(old, next)) {
+ // Atomic call -> returns true only when the value represented by lsn is same as
+ // "old". The value is updated to "next".
continue;
}
if (forwardPage) {
- //TODO
- //this is not safe since the incoming thread may reach the same page slot with this page
- //(differ by the log buffer size)
- logPageStatus[prevPage].set(PageState.INACTIVE); // mark
- // previous
- // page
- // inactive
+ addFlushRequest(prevPage, old, false);
- /*
- * decrement on the behalf of the log manager. if there are no
- * more owners (count == 0) the page must be marked as a
- * candidate to be flushed.
- */
- int pageDirtyCount = getLogPageOwnershipCount(prevPage).decrementAndGet();
- if (pageDirtyCount == 0) {
- addFlushRequest(prevPage);
- }
-
- /*
- * The transaction thread that discovers the need to forward a
- * page is made to re-acquire a lsn.
- */
+ // The transaction thread that discovers the need to forward a
+ // page is made to re-acquire a lsn.
continue;
+
} else {
- /*
- * the transaction thread has been given a space in a log page,
- * but is made to wait until the page is available.
- */
+ // the transaction thread has been given a space in a log page,
+ // but is made to wait until the page is available.
+ // (Is this needed? when does this wait happen?)
waitUntillPageIsAvailableForWritingLog(pageIndex);
- /*
- * increment the counter as the transaction thread now holds a
- * space in the log page and hence is an owner.
- */
+
+ // increment the counter as the transaction thread now holds a
+ // space in the log page and hence is an owner.
logPageOwnerCount[pageIndex].incrementAndGet();
- }
- if (requiresFlushing) {
- if (!shouldFlushPage) {
- /*
- * the log record requires the page to be flushed but under
- * the commit policy, the flush task has been deferred. The
- * transaction thread submits its request to flush the page.
- */
- commitRequestStatistics.registerCommitRequest(pageIndex);
- } else {
- /*
- * the flush request was approved by the commit resolver.
- * Thus the page is marked INACTIVE as no more logs will be
- * written on this page. The log manager needs to release
- * its ownership. Note that transaction threads may still
- * continue to be owners of the log page till they fill up
- * the space allocated to them.
- */
- logPageStatus[pageIndex].set(PageState.INACTIVE);
- logPageOwnerCount[pageIndex].decrementAndGet(); // on
- // the
- // behalf
- // of
- // log
- // manager
+
+ // Before the count is incremented, if the flusher flushed the allocated page,
+ // then retry to get new LSN. Otherwise, the log with allocated lsn will be lost.
+ if (lastFlushedLsn.get() >= retVal) {
+ logPageOwnerCount[pageIndex].decrementAndGet();
+ continue;
}
}
+
return retVal;
}
}
@Override
- public void log(byte logType, TransactionContext context, int datasetId, int PKHashValue, long resourceId,
+ public void log(byte logType, TransactionContext txnCtx, int datasetId, int PKHashValue, long resourceId,
byte resourceMgrId, int logContentSize, ReusableLogContentObject reusableLogContentObject, ILogger logger,
LogicalLogLocator logicalLogLocator) throws ACIDException {
- /*
- * logLocator is a re-usable object that is appropriately set in each
- * invocation. If the reference is null, the log manager must throw an
- * exception
- */
+
+ HashMap<TransactionContext, Integer> map = null;
+ int activeTxnCount;
+
+ // logLocator is a re-usable object that is appropriately set in each invocation.
+ // If the reference is null, the log manager must throw an exception.
if (logicalLogLocator == null) {
throw new ACIDException(
" you need to pass in a non-null logLocator, if you dont have it, then pass in a dummy so that the +"
@@ -519,20 +413,19 @@
// all constraints checked and we are good to go and acquire a lsn.
long previousLSN = -1;
- long currentLSN; // the will be set to the location (a long value)
- // where the log record needs to be placed.
- /*
- * The logs written by a transaction need to be linked to each other for
- * a successful rollback/recovery. However there could be multiple
- * threads operating concurrently that are part of a common transaction.
- * These threads need to synchronize and record the lsn corresponding to
- * the last log record written by (any thread of) the transaction.
- */
- synchronized (context) {
- previousLSN = context.getLastLogLocator().getLsn();
+ // the will be set to the location (a long value) where the log record needs to be placed.
+ long currentLSN;
+
+ // The logs written by a transaction need to be linked to each other for
+ // a successful rollback/recovery. However there could be multiple
+ // threads operating concurrently that are part of a common transaction.
+ // These threads need to synchronize and record the lsn corresponding to
+ // the last log record written by (any thread of) the transaction.
+ synchronized (txnCtx) {
+ previousLSN = txnCtx.getLastLogLocator().getLsn();
currentLSN = getLsn(totalLogSize, logType);
- context.setLastLSN(currentLSN);
+ txnCtx.setLastLSN(currentLSN);
if (IS_DEBUG_MODE) {
System.out.println("--------------> LSN(" + currentLSN + ") is allocated");
}
@@ -547,48 +440,37 @@
* performed correctly that is ownership is released.
*/
- boolean decremented = false; // indicates if the transaction thread
- // has release ownership of the
- // page.
- boolean addedFlushRequest = false; // indicates if the transaction
- // thread has submitted a flush
- // request.
+ // indicates if the transaction thread has release ownership of the page.
+ boolean decremented = false;
int pageIndex = (int) getLogPageIndex(currentLSN);
- /*
- * the lsn has been obtained for the log record. need to set the
- * LogLocator instance accordingly.
- */
-
+ // the lsn has been obtained for the log record. need to set the
+ // LogLocator instance accordingly.
try {
-
logicalLogLocator.setBuffer(logPages[pageIndex]);
int pageOffset = getLogPageOffset(currentLSN);
logicalLogLocator.setMemoryOffset(pageOffset);
- /*
- * write the log header.
- */
- logRecordHelper.writeLogHeader(logicalLogLocator, logType, context, datasetId, PKHashValue, previousLSN,
+ // write the log header.
+ logRecordHelper.writeLogHeader(logicalLogLocator, logType, txnCtx, datasetId, PKHashValue, previousLSN,
resourceId, resourceMgrId, logContentSize);
// increment the offset so that the transaction can fill up the
// content in the correct region of the allocated space.
logicalLogLocator.increaseMemoryOffset(logRecordHelper.getLogHeaderSize(logType));
- // a COMMIT log record does not have any content
- // and hence the logger (responsible for putting the log content) is
- // not invoked.
+ // a COMMIT log record does not have any content and hence
+ // the logger (responsible for putting the log content) is not invoked.
if (logContentSize != 0) {
- logger.preLog(context, reusableLogContentObject);
+ logger.preLog(txnCtx, reusableLogContentObject);
}
if (logContentSize != 0) {
// call the logger implementation and ask to fill in the log
// record content at the allocated space.
- logger.log(context, logicalLogLocator, logContentSize, reusableLogContentObject);
- logger.postLog(context, reusableLogContentObject);
+ logger.log(txnCtx, logicalLogLocator, logContentSize, reusableLogContentObject);
+ logger.postLog(txnCtx, reusableLogContentObject);
if (IS_DEBUG_MODE) {
logicalLogLocator.setMemoryOffset(logicalLogLocator.getMemoryOffset()
- logRecordHelper.getLogHeaderSize(logType));
@@ -597,10 +479,8 @@
}
}
- /*
- * The log record has been written. For integrity checks, compute
- * the checksum and put it at the end of the log record.
- */
+ // The log record has been written. For integrity checks, compute
+ // the checksum and put it at the end of the log record.
int startPosChecksum = logicalLogLocator.getMemoryOffset() - logRecordHelper.getLogHeaderSize(logType);
int length = totalLogSize - logRecordHelper.getLogChecksumSize();
long checksum = DataUtil.getChecksum(logPages[pageIndex], startPosChecksum, length);
@@ -611,46 +491,31 @@
System.out.println("--------------> LSN(" + currentLSN + ") is written");
}
- /*
- * release the ownership as the log record has been placed in
- * created space.
- */
- int pageDirtyCount = logPageOwnerCount[pageIndex].decrementAndGet();
+ // release the ownership as the log record has been placed in created space.
+ logPageOwnerCount[pageIndex].decrementAndGet();
// indicating that the transaction thread has released ownership
decremented = true;
- /*
- * If the transaction thread happens to be the last owner of the log
- * page the page must by marked as a candidate to be flushed.
- */
- if (pageDirtyCount == 0 && logPageStatus[pageIndex].get() == PageState.INACTIVE) {
- addFlushRequest(pageIndex);
- addedFlushRequest = true;
- }
-
- /*
- * If the log type is commit, a flush request is registered, if the
- * log record has not reached the disk. It may be possible that this
- * thread does not get CPU cycles and in-between the log record has
- * been flushed to disk because the containing log page filled up.
- */
- if (logType == LogType.COMMIT) {
- synchronized (logPages[pageIndex]) {
- while (getLastFlushedLsn().get() < currentLSN) {
- logPages[pageIndex].wait();
- }
+ if (logType == LogType.ENTITY_COMMIT) {
+ map = activeTxnCountMaps.get(pageIndex);
+ if (map.containsKey(txnCtx)) {
+ activeTxnCount = (Integer) map.get(txnCtx);
+ activeTxnCount++;
+ map.put(txnCtx, activeTxnCount);
+ } else {
+ map.put(txnCtx, 1);
}
+ addFlushRequest(pageIndex, currentLSN, false);
+ } else if (logType == LogType.COMMIT) {
+ addFlushRequest(pageIndex, currentLSN, true);
}
} catch (Exception e) {
e.printStackTrace();
- throw new ACIDException(context, "Thread: " + Thread.currentThread().getName()
+ throw new ACIDException(txnCtx, "Thread: " + Thread.currentThread().getName()
+ " logger encountered exception", e);
} finally {
- /*
- * If an exception was encountered and we did not release ownership
- */
if (!decremented) {
logPageOwnerCount[pageIndex].decrementAndGet();
}
@@ -667,9 +532,6 @@
logPages[pageIndex].reset(filePath, LogUtil.getFileOffset(this, nextWritePosition),
logManagerProperties.getLogPageSize());
-
- //TODO Check if this is necessary
- //Arrays.fill(logPages[pageIndex].getArray(), (byte) 0);
}
@Override
@@ -747,16 +609,13 @@
//minimize memory allocation overhead. current code allocates the log page size per reading a log record.
byte[] pageContent = new byte[logManagerProperties.getLogPageSize()];
- // take a lock on the log page so that the page is not flushed to
- // disk interim
+
+ // take a lock on the log page so that the page is not flushed to disk interim
synchronized (logPages[pageIndex]) {
- if (lsnValue > getLastFlushedLsn().get()) { // need to check
- // again
- // (this
- // thread may have got
- // de-scheduled and must
- // refresh!)
+ // need to check again (this thread may have got de-scheduled and must refresh!)
+ if (lsnValue > getLastFlushedLsn().get()) {
+
// get the log record length
logPages[pageIndex].getBytes(pageContent, 0, pageContent.length);
byte logType = pageContent[pageOffset + 4];
@@ -765,9 +624,7 @@
int logRecordSize = logHeaderSize + logBodySize + logRecordHelper.getLogChecksumSize();
logRecord = new byte[logRecordSize];
- /*
- * copy the log record content
- */
+ // copy the log record content
System.arraycopy(pageContent, pageOffset, logRecord, 0, logRecordSize);
MemBasedBuffer memBuffer = new MemBasedBuffer(logRecord);
if (logicalLogLocator == null) {
@@ -790,9 +647,7 @@
}
}
- /*
- * the log record is residing on the disk, read it from there.
- */
+ // the log record is residing on the disk, read it from there.
readDiskLog(lsnValue, logicalLogLocator);
}
@@ -860,30 +715,40 @@
return logPageOwnerCount[pageIndex];
}
- public ICommitResolver getCommitResolver() {
- return commitResolver;
- }
-
- public CommitRequestStatistics getCommitRequestStatistics() {
- return commitRequestStatistics;
- }
-
public IFileBasedBuffer[] getLogPages() {
return logPages;
}
- public int getLastFlushedPage() {
- return lastFlushedPage.get();
- }
-
- public void setLastFlushedPage(int lastFlushedPage) {
- this.lastFlushedPage.set(lastFlushedPage);
- }
-
@Override
public TransactionSubsystem getTransactionSubsystem() {
return provider;
}
+
+ public void decrementActiveTxnCountOnIndexes(int pageIndex) throws HyracksDataException {
+ TransactionContext ctx = null;
+ int count = 0;
+ int i = 0;
+
+ HashMap<TransactionContext, Integer> map = activeTxnCountMaps.get(pageIndex);
+ Set<Map.Entry<TransactionContext, Integer>> entrySet = map.entrySet();
+ if (entrySet != null) {
+ for (Map.Entry<TransactionContext, Integer> entry : entrySet) {
+ if (entry != null) {
+ if (entry.getValue() != null) {
+ count = entry.getValue();
+ }
+ if (count > 0) {
+ ctx = entry.getKey();
+ for (i = 0; i < count; i++) {
+ ctx.decreaseActiveTransactionCountOnIndexes();
+ }
+ }
+ }
+ }
+ }
+
+ map.clear();
+ }
}
/*
@@ -895,36 +760,82 @@
class LogPageFlushThread extends Thread {
private LogManager logManager;
+ /*
+ * pendingFlushRequests is a map with key as Integer denoting the page
+ * index. When a (transaction) thread discovers the need to flush a page, it
+ * puts its Thread object into the corresponding value that is a
+ * LinkedBlockingQueue. The LogManager has a LogFlusher thread that scans
+ * this map in order of page index (and circling around). The flusher thread
+ * needs to flush pages in order and waits for a thread to deposit an object
+ * in the blocking queue corresponding to the next page in order. A request
+ * to flush a page is conveyed to the flush thread by simply depositing an
+ * object in to corresponding blocking queue. It is blocking in the sense
+ * that the flusher thread will continue to wait for an object to arrive in
+ * the queue. The object itself is ignored by the fliusher and just acts as
+ * a signal/event that a page needs to be flushed.
+ */
+ private final LinkedBlockingQueue<Object>[] flushRequestQueue;
+ private final Object[] flushRequests;
+ private int lastFlushedPageIndex;
+ private final long groupCommitWaitPeriod;
public LogPageFlushThread(LogManager logManager) {
this.logManager = logManager;
setName("Flusher");
+ int numLogPages = logManager.getLogManagerProperties().getNumLogPages();
+ this.flushRequestQueue = new LinkedBlockingQueue[numLogPages];
+ this.flushRequests = new Object[numLogPages];
+ for (int i = 0; i < numLogPages; i++) {
+ flushRequestQueue[i] = new LinkedBlockingQueue<Object>(1);
+ flushRequests[i] = new Object();
+ }
+ this.lastFlushedPageIndex = -1;
+ groupCommitWaitPeriod = logManager.getLogManagerProperties().getGroupCommitWaitPeriod();
}
+ public void requestFlush(int pageIndex, long lsn, boolean isSynchronous) {
+ synchronized (logManager.getLogPage(pageIndex)) {
+ //return if flushedLSN >= lsn
+ if (logManager.getLastFlushedLsn().get() >= lsn) {
+ return;
+ }
+
+ //put a new request to the queue only if the request on the page is not in the queue.
+ flushRequestQueue[pageIndex].offer(flushRequests[pageIndex]);
+
+ //return if the request is asynchronous
+ if (!isSynchronous) {
+ return;
+ }
+
+ //wait until there is flush.
+ boolean isNotified = false;
+ while (!isNotified) {
+ try {
+ logManager.getLogPage(pageIndex).wait();
+ isNotified = true;
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+ }
+ }
+
@Override
public void run() {
while (true) {
try {
- int pageToFlush = logManager.getNextPageInSequence(logManager.getLastFlushedPage());
+ int pageToFlush = logManager.getNextPageInSequence(lastFlushedPageIndex);
- /*
- * A wait call on the linkedBLockingQueue. The flusher thread is
- * notified when an object is added to the queue. Please note
- * that each page has an associated blocking queue.
- */
- logManager.getPendingFlushRequests(pageToFlush).take();
+ // A wait call on the linkedBLockingQueue. The flusher thread is
+ // notified when an object is added to the queue. Please note
+ // that each page has an associated blocking queue.
+ flushRequestQueue[pageToFlush].take();
- /*
- * The LogFlusher was waiting for a page to be marked as a
- * candidate for flushing. Now that has happened. The thread
- * shall proceed to take a lock on the log page
- */
- synchronized (logManager.getLogPages()[pageToFlush]) {
+ synchronized (logManager.getLogPage(pageToFlush)) {
- /*
- * lock the internal state of the log manager and create a
- * log file if necessary.
- */
+ // lock the internal state of the log manager and create a
+ // log file if necessary.
int prevLogFileId = logManager.getLogFileId(logManager.getLastFlushedLsn().get());
int nextLogFileId = logManager.getLogFileId(logManager.getLastFlushedLsn().get()
+ logManager.getLogManagerProperties().getLogPageSize());
@@ -936,198 +847,60 @@
logManager.getLogManagerProperties().getLogPageSize());
}
- logManager.getLogPage(pageToFlush).flush(); // put the
- // content to
- // disk, the
- // thread still
- // has a lock on
- // the log page
+ //#. sleep during the groupCommitWaitTime
+ sleep(groupCommitWaitPeriod);
- /*
- * acquire lock on the log manager as we need to update the
- * internal bookkeeping data.
- */
+ //#. set the logPageStatus to INACTIVE in order to prevent other txns from writing on this page.
+ logManager.getLogPageStatus(pageToFlush).set(PageState.INACTIVE);
- // increment the last flushed lsn.
- long lastFlushedLsn = logManager.incrementLastFlushedLsn(logManager.getLogManagerProperties()
- .getLogPageSize());
+ //#. need to wait until the logPageOwnerCount reaches 1 (LOG_WRITER)
+ // meaning every one has finished writing logs on this page.
+ while (logManager.getLogPageOwnershipCount(pageToFlush).get() != PageOwnershipStatus.LOG_WRITER) {
+ sleep(0);
+ }
- /*
- * the log manager gains back ownership of the page. this is
- * reflected by incrementing the owner count of the page.
- * recall that when the page is begin flushed the owner
- * count is actually 0 Value of zero implicitly indicates
- * that the page is operated upon by the log flusher thread.
- */
- logManager.getLogPageOwnershipCount(pageToFlush).incrementAndGet();
+ //#. set the logPageOwnerCount to 0 (LOG_FLUSHER)
+ // meaning it is flushing.
+ logManager.getLogPageOwnershipCount(pageToFlush).set(PageOwnershipStatus.LOG_FLUSHER);
- /*
- * get the number of log buffers that have been written so
- * far. A log buffer = number of log pages * size of a log
- * page
- */
- int numCycles = (int) lastFlushedLsn / logManager.getLogManagerProperties().getLogBufferSize();
- if (lastFlushedLsn % logManager.getLogManagerProperties().getLogBufferSize() == 0) {
- numCycles--;
- }
+ // put the content to disk (the thread still has a lock on the log page)
+ logManager.getLogPage(pageToFlush).flush();
- /*
- * Map the log page to a new region in the log file.
- */
+ // increment the last flushed lsn and lastFlushedPage
+ logManager.incrementLastFlushedLsn(logManager.getLogManagerProperties().getLogPageSize());
+ lastFlushedPageIndex = pageToFlush;
+ // decrement activeTxnCountOnIndexes
+ logManager.decrementActiveTxnCountOnIndexes(pageToFlush);
+
+ // reset the count to 1
+ logManager.getLogPageOwnershipCount(pageToFlush).set(PageOwnershipStatus.LOG_WRITER);
+
+ // Map the log page to a new region in the log file.
long nextWritePosition = logManager.getLogPages()[pageToFlush].getNextWritePosition()
+ logManager.getLogManagerProperties().getLogBufferSize();
- /*
- * long nextPos = (numCycles + 1)
- * logManager.getLogManagerProperties() .getLogBufferSize()
- * + pageToFlush logManager.getLogManagerProperties()
- * .getLogPageSize();
- */
logManager.resetLogPage(nextWritePosition, pageToFlush);
// mark the page as ACTIVE
logManager.getLogPageStatus(pageToFlush).set(LogManager.PageState.ACTIVE);
- // notify all waiting (transaction) threads.
- // Transaction thread may be waiting for the page to be
- // available or may have a commit log record on the page
- // that got flushed.
- logManager.getLogPages()[pageToFlush].notifyAll();
- logManager.setLastFlushedPage(pageToFlush);
+ //#. checks the queue whether there is another flush request on the same log buffer
+ // If there is another request, then simply remove it.
+ if (flushRequestQueue[pageToFlush].peek() != null) {
+ flushRequestQueue[pageToFlush].take();
+ }
+ // notify all waiting (transaction) threads.
+ logManager.getLogPage(pageToFlush).notifyAll();
}
} catch (IOException ioe) {
ioe.printStackTrace();
throw new Error(" exception in flushing log page", ioe);
} catch (InterruptedException e) {
e.printStackTrace();
- break; // must break from the loop as the exception indicates
- // some thing horrendous has happened elsewhere
+ break;
}
}
}
-}
-
-/*
- * TODO: By default the commit policy is to commit at each request and not have
- * a group commit. The following code needs to change to support group commit.
- * The code for group commit has not been tested thoroughly and is under
- * development.
- */
-class BasicCommitResolver implements ICommitResolver {
-
- public boolean shouldCommitPage(int pageIndex, LogManager logManager,
- CommitRequestStatistics commitRequestStatistics) {
- return true;
- }
-
- public void init(LogManager logManager) {
- }
-}
-
-class GroupCommitResolver implements ICommitResolver {
-
- public boolean shouldCommitPage(int pageIndex, LogManager logManager,
- CommitRequestStatistics commitRequestStatistics) {
- long maxCommitWait = logManager.getLogManagerProperties().getGroupCommitWaitPeriod();
- long timestamp = commitRequestStatistics.getPageLevelLastCommitRequestTimestamp(pageIndex);
- if (timestamp == -1) {
- if (maxCommitWait == 0) {
- return true;
- } else {
- timestamp = System.currentTimeMillis();
- }
- }
- long currenTime = System.currentTimeMillis();
- if (currenTime - timestamp > maxCommitWait) {
- return true;
- }
- return false;
- }
-
- public void init(LogManager logManager) {
- GroupCommitHandlerThread groupCommitHandler = new GroupCommitHandlerThread(logManager);
- groupCommitHandler.setDaemon(true);
- groupCommitHandler.start();
- }
-
- class GroupCommitHandlerThread extends Thread {
-
- private LogManager logManager;
-
- public GroupCommitHandlerThread(LogManager logManager) {
- this.logManager = logManager;
- setName("Group Commit Handler");
- }
-
- @Override
- public void run() {
- int pageIndex = -1;
- while (true) {
- pageIndex = logManager.getNextPageInSequence(pageIndex);
- long lastCommitRequeestTimestamp = logManager.getCommitRequestStatistics()
- .getPageLevelLastCommitRequestTimestamp(pageIndex);
- if (lastCommitRequeestTimestamp != -1
- && System.currentTimeMillis() - lastCommitRequeestTimestamp > logManager
- .getLogManagerProperties().getGroupCommitWaitPeriod()) {
- int dirtyCount = logManager.getLogPageOwnershipCount(pageIndex).decrementAndGet();
- if (dirtyCount == 0) {
- try {
- logManager.getLogPageStatus(pageIndex).set(LogManager.PageState.INACTIVE);
- logManager.getPendingFlushRequests(pageIndex).put(Thread.currentThread());
- } catch (InterruptedException e) {
- e.printStackTrace();
- break;
- }
- logManager.getCommitRequestStatistics().committedPage(pageIndex);
- }
- }
- }
- }
- }
-
-}
-
-interface ICommitResolver {
- public boolean shouldCommitPage(int pageIndex, LogManager logManager,
- CommitRequestStatistics commitRequestStatistics);
-
- public void init(LogManager logManager);
-}
-
-/**
- * Represents a collection of all commit requests by transactions for each log
- * page. The requests are accumulated until the commit policy triggers a flush
- * of the corresponding log page. Upon a flush of a page, all commit requests
- * for the page are cleared.
- */
-class CommitRequestStatistics {
-
- AtomicInteger[] pageLevelCommitRequestCount;
- AtomicLong[] pageLevelLastCommitRequestTimestamp;
-
- public CommitRequestStatistics(int numPages) {
- pageLevelCommitRequestCount = new AtomicInteger[numPages];
- pageLevelLastCommitRequestTimestamp = new AtomicLong[numPages];
- for (int i = 0; i < numPages; i++) {
- pageLevelCommitRequestCount[i] = new AtomicInteger(0);
- pageLevelLastCommitRequestTimestamp[i] = new AtomicLong(-1L);
- }
- }
-
- public void registerCommitRequest(int pageIndex) {
- pageLevelCommitRequestCount[pageIndex].incrementAndGet();
- pageLevelLastCommitRequestTimestamp[pageIndex].set(System.currentTimeMillis());
- }
-
- public long getPageLevelLastCommitRequestTimestamp(int pageIndex) {
- return pageLevelLastCommitRequestTimestamp[pageIndex].get();
- }
-
- public void committedPage(int pageIndex) {
- pageLevelCommitRequestCount[pageIndex].set(0);
- pageLevelLastCommitRequestTimestamp[pageIndex].set(-1L);
- }
-
-}
+}
\ No newline at end of file
Index: asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogRecordHelper.java
===================================================================
--- asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogRecordHelper.java (revision 1194)
+++ asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogRecordHelper.java (working copy)
@@ -152,6 +152,9 @@
case LogType.UPDATE:
logTypeDisplay = "UPDATE";
break;
+ case LogType.ENTITY_COMMIT:
+ logTypeDisplay = "ENTITY_COMMIT";
+ break;
}
builder.append(" LSN : ").append(logicalLogLocator.getLsn());
builder.append(" Log Type : ").append(logTypeDisplay);
Index: asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogType.java
===================================================================
--- asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogType.java (revision 1194)
+++ asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogType.java (working copy)
@@ -18,5 +18,6 @@
public static final byte UPDATE = 0;
public static final byte COMMIT = 1;
+ public static final byte ENTITY_COMMIT = 2;
}
Index: asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogManagerProperties.java
===================================================================
--- asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogManagerProperties.java (revision 1194)
+++ asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogManagerProperties.java (working copy)
@@ -1,5 +1,5 @@
/*
- * Copyright 2009-2010 by The Regents of the University of California
+ * Copyright 2009-2012 by The Regents of the University of California
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* you may obtain a copy of the License from
@@ -41,7 +41,7 @@
private int logPageSize = 128 * 1024; // 128 KB
private int numLogPages = 8; // number of log pages in the log buffer.
- private long groupCommitWaitPeriod = 0; // time in milliseconds for which a
+ private long groupCommitWaitPeriod = 1; // time in milliseconds for which a
// commit record will wait before
// the housing page is marked for
// flushing.
Index: asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/recovery/RecoveryManager.java
===================================================================
--- asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/recovery/RecoveryManager.java (revision 1194)
+++ asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/recovery/RecoveryManager.java (working copy)
@@ -184,6 +184,7 @@
break;
case LogType.COMMIT:
+ case LogType.ENTITY_COMMIT:
tempKeyTxnId.setTxnId(logRecordHelper.getJobId(currentLogLocator),
logRecordHelper.getDatasetId(currentLogLocator),
logRecordHelper.getPKHashValue(currentLogLocator));
@@ -218,6 +219,7 @@
IIndex index = null;
LocalResource localResource = null;
ILocalResourceMetadata localResourceMetadata = null;
+ List<Long> resourceIdList = new ArrayList<Long>();
//#. get indexLifeCycleManager
IAsterixAppRuntimeContextProvider appRuntimeContext = txnSubsystem.getAsterixAppRuntimeContextProvider();
@@ -272,6 +274,8 @@
index = localResourceMetadata.createIndexInstance(appRuntimeContext,
localResource.getResourceName(), localResource.getPartition());
indexLifecycleManager.register(resourceId, index);
+ indexLifecycleManager.open(resourceId);
+ resourceIdList.add(resourceId);
}
/***************************************************/
@@ -300,6 +304,7 @@
break;
case LogType.COMMIT:
+ case LogType.ENTITY_COMMIT:
//do nothing
break;
@@ -308,6 +313,11 @@
}
}
+ //close all indexes
+ for (long r : resourceIdList) {
+ indexLifecycleManager.close(r);
+ }
+
JobIdFactory.initJobId(maxJobId);
}
@@ -539,6 +549,7 @@
break;
case LogType.COMMIT:
+ case LogType.ENTITY_COMMIT:
undoLSNSet = loserTxnTable.get(tempKeyTxnId);
if (undoLSNSet != null) {
loserTxnTable.remove(tempKeyTxnId);
Index: asterix-app/src/test/java/edu/uci/ics/asterix/test/metadata/MetadataTest.java
===================================================================
--- asterix-app/src/test/java/edu/uci/ics/asterix/test/metadata/MetadataTest.java (revision 1194)
+++ asterix-app/src/test/java/edu/uci/ics/asterix/test/metadata/MetadataTest.java (working copy)
@@ -42,6 +42,16 @@
List<CompilationUnit> cUnits = tcCtx.getTestCase().getCompilationUnit();
for (CompilationUnit cUnit : cUnits) {
File testFile = tcCtx.getTestFile(cUnit);
+
+ /*****************
+ if (!testFile.getAbsolutePath().contains("meta09.aql")) {
+ System.out.println(testFile.getAbsolutePath());
+ continue;
+ }
+ System.out.println(testFile.getAbsolutePath());
+ *****************/
+
+
File expectedResultFile = tcCtx.getExpectedResultFile(cUnit);
File actualFile = new File(PATH_ACTUAL + File.separator
+ tcCtx.getTestCase().getFilePath().replace(File.separator, "_") + "_" + cUnit.getName() + ".adm");
Index: asterix-app/src/test/java/edu/uci/ics/asterix/test/runtime/ExecutionTest.java
===================================================================
--- asterix-app/src/test/java/edu/uci/ics/asterix/test/runtime/ExecutionTest.java (revision 1194)
+++ asterix-app/src/test/java/edu/uci/ics/asterix/test/runtime/ExecutionTest.java (working copy)
@@ -95,9 +95,10 @@
File testFile = tcCtx.getTestFile(cUnit);
/*************** to avoid run failure cases ****************
- if (!testFile.getAbsolutePath().contains("index-selection/")) {
+ if (!testFile.getAbsolutePath().contains("query-issue205.aql")) {
continue;
}
+ System.out.println(testFile.getAbsolutePath());
************************************************************/
File expectedResultFile = tcCtx.getExpectedResultFile(cUnit);
Index: diff_file
===================================================================
--- diff_file (revision 1194)
+++ diff_file (working copy)
@@ -1,2098 +1,1252 @@
-Index: asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entitytupletranslators/DataverseTupleTranslator.java
+Index: asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionManager.java
===================================================================
---- asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entitytupletranslators/DataverseTupleTranslator.java (revision 1061)
-+++ asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entitytupletranslators/DataverseTupleTranslator.java (working copy)
-@@ -25,8 +25,11 @@
- import edu.uci.ics.asterix.metadata.bootstrap.MetadataPrimaryIndexes;
- import edu.uci.ics.asterix.metadata.bootstrap.MetadataRecordTypes;
- import edu.uci.ics.asterix.metadata.entities.Dataverse;
-+import edu.uci.ics.asterix.om.base.AInt32;
-+import edu.uci.ics.asterix.om.base.AMutableInt32;
- import edu.uci.ics.asterix.om.base.ARecord;
- import edu.uci.ics.asterix.om.base.AString;
-+import edu.uci.ics.asterix.om.types.BuiltinType;
- import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
- import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
+--- asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionManager.java (revision 1194)
++++ asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionManager.java (working copy)
+@@ -103,12 +103,14 @@
+ //for entity-level commit
+ if (PKHashVal != -1) {
+ transactionProvider.getLockManager().unlock(datasetId, PKHashVal, txnContext, true);
++ /*****************************
+ try {
+ //decrease the transaction reference count on index
+ txnContext.decreaseActiveTransactionCountOnIndexes();
+ } catch (HyracksDataException e) {
+ throw new ACIDException("failed to complete index operation", e);
+ }
++ *****************************/
+ return;
+ }
-@@ -40,12 +43,18 @@
- // Payload field containing serialized Dataverse.
- public static final int DATAVERSE_PAYLOAD_TUPLE_FIELD_INDEX = 1;
-
-+ private AMutableInt32 aInt32;
-+ protected ISerializerDeserializer<AInt32> aInt32Serde;
-+
- @SuppressWarnings("unchecked")
- private ISerializerDeserializer<ARecord> recordSerDes = AqlSerializerDeserializerProvider.INSTANCE
- .getSerializerDeserializer(MetadataRecordTypes.DATAVERSE_RECORDTYPE);
-
-+ @SuppressWarnings("unchecked")
- public DataverseTupleTranslator(boolean getTuple) {
- super(getTuple, MetadataPrimaryIndexes.DATAVERSE_DATASET.getFieldCount());
-+ aInt32 = new AMutableInt32(-1);
-+ aInt32Serde = AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.AINT32);
- }
-
- @Override
-@@ -57,7 +66,8 @@
- DataInput in = new DataInputStream(stream);
- ARecord dataverseRecord = recordSerDes.deserialize(in);
- return new Dataverse(((AString) dataverseRecord.getValueByPos(0)).getStringValue(),
-- ((AString) dataverseRecord.getValueByPos(1)).getStringValue());
-+ ((AString) dataverseRecord.getValueByPos(1)).getStringValue(),
-+ ((AInt32) dataverseRecord.getValueByPos(2)).getIntegerValue());
- }
-
- @Override
-@@ -88,6 +98,12 @@
- stringSerde.serialize(aString, fieldValue.getDataOutput());
- recordBuilder.addField(MetadataRecordTypes.DATAVERSE_ARECORD_TIMESTAMP_FIELD_INDEX, fieldValue);
-
-+ // write field 3
-+ fieldValue.reset();
-+ aInt32.setValue(instance.getPendingOp());
-+ aInt32Serde.serialize(aInt32, fieldValue.getDataOutput());
-+ recordBuilder.addField(MetadataRecordTypes.DATAVERSE_ARECORD_PENDINGOP_FIELD_INDEX, fieldValue);
-+
- recordBuilder.write(tupleBuilder.getDataOutput(), true);
- tupleBuilder.addFieldEndOffset();
-
-Index: asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entitytupletranslators/DatasetTupleTranslator.java
+Index: asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionContext.java
===================================================================
---- asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entitytupletranslators/DatasetTupleTranslator.java (revision 1061)
-+++ asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entitytupletranslators/DatasetTupleTranslator.java (working copy)
-@@ -77,9 +77,9 @@
- protected ISerializerDeserializer<AInt32> aInt32Serde;
+--- asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionContext.java (revision 1194)
++++ asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionContext.java (working copy)
+@@ -19,6 +19,7 @@
+ import java.util.HashSet;
+ import java.util.List;
+ import java.util.Set;
++import java.util.concurrent.atomic.AtomicInteger;
- @SuppressWarnings("unchecked")
-- public DatasetTupleTranslator(boolean getTuple) {
-+ public DatasetTupleTranslator(boolean getTuple) {
- super(getTuple, MetadataPrimaryIndexes.DATASET_DATASET.getFieldCount());
-- aInt32 = new AMutableInt32(-1);
-+ aInt32 = new AMutableInt32(-1);
- aInt32Serde = AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.AINT32);
- }
-
-@@ -104,8 +104,10 @@
- .getValueByPos(MetadataRecordTypes.DATASET_ARECORD_DATATYPENAME_FIELD_INDEX)).getStringValue();
- DatasetType datasetType = DatasetType.valueOf(((AString) datasetRecord.getValueByPos(3)).getStringValue());
- IDatasetDetails datasetDetails = null;
-- int datasetId = ((AInt32) datasetRecord
-+ int datasetId = ((AInt32) datasetRecord
- .getValueByPos(MetadataRecordTypes.DATASET_ARECORD_DATASETID_FIELD_INDEX)).getIntegerValue();
-+ int pendingOp = ((AInt32) datasetRecord
-+ .getValueByPos(MetadataRecordTypes.DATASET_ARECORD_PENDINGOP_FIELD_INDEX)).getIntegerValue();
- switch (datasetType) {
- case FEED:
- case INTERNAL: {
-@@ -197,7 +199,7 @@
- }
- datasetDetails = new ExternalDatasetDetails(adapter, properties);
+ import edu.uci.ics.asterix.transaction.management.exception.ACIDException;
+ import edu.uci.ics.asterix.transaction.management.opcallbacks.AbstractOperationCallback;
+@@ -169,5 +170,14 @@
+ closeable.close(this);
}
-- return new Dataset(dataverseName, datasetName, typeName, datasetDetails, datasetType, datasetId);
-+ return new Dataset(dataverseName, datasetName, typeName, datasetDetails, datasetType, datasetId, pendingOp);
}
++
++ @Override
++ public int hashCode() {
++ return jobId.getId();
++ }
- @Override
-@@ -248,13 +250,19 @@
- aString.setValue(Calendar.getInstance().getTime().toString());
- stringSerde.serialize(aString, fieldValue.getDataOutput());
- recordBuilder.addField(MetadataRecordTypes.DATASET_ARECORD_TIMESTAMP_FIELD_INDEX, fieldValue);
--
-+
- // write field 8
- fieldValue.reset();
- aInt32.setValue(dataset.getDatasetId());
- aInt32Serde.serialize(aInt32, fieldValue.getDataOutput());
- recordBuilder.addField(MetadataRecordTypes.DATASET_ARECORD_DATASETID_FIELD_INDEX, fieldValue);
--
-+
-+ // write field 9
-+ fieldValue.reset();
-+ aInt32.setValue(dataset.getPendingOp());
-+ aInt32Serde.serialize(aInt32, fieldValue.getDataOutput());
-+ recordBuilder.addField(MetadataRecordTypes.DATASET_ARECORD_PENDINGOP_FIELD_INDEX, fieldValue);
-+
- // write record
- recordBuilder.write(tupleBuilder.getDataOutput(), true);
- tupleBuilder.addFieldEndOffset();
-@@ -290,13 +298,15 @@
- fieldValue.reset();
- aString.setValue(name);
- stringSerde.serialize(aString, fieldValue.getDataOutput());
-- propertyRecordBuilder.addField(MetadataRecordTypes.DATASOURCE_ADAPTER_PROPERTIES_ARECORD_NAME_FIELD_INDEX, fieldValue);
-+ propertyRecordBuilder.addField(MetadataRecordTypes.DATASOURCE_ADAPTER_PROPERTIES_ARECORD_NAME_FIELD_INDEX,
-+ fieldValue);
-
- // write field 1
- fieldValue.reset();
- aString.setValue(value);
- stringSerde.serialize(aString, fieldValue.getDataOutput());
-- propertyRecordBuilder.addField(MetadataRecordTypes.DATASOURCE_ADAPTER_PROPERTIES_ARECORD_VALUE_FIELD_INDEX, fieldValue);
-+ propertyRecordBuilder.addField(MetadataRecordTypes.DATASOURCE_ADAPTER_PROPERTIES_ARECORD_VALUE_FIELD_INDEX,
-+ fieldValue);
-
- propertyRecordBuilder.write(out, true);
- }
-Index: asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entitytupletranslators/IndexTupleTranslator.java
++ @Override
++ public boolean equals(Object o) {
++ return (o == this);
++ }
+ }
+Index: asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/LockManager.java
===================================================================
---- asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entitytupletranslators/IndexTupleTranslator.java (revision 1061)
-+++ asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entitytupletranslators/IndexTupleTranslator.java (working copy)
-@@ -96,13 +96,15 @@
- }
- Boolean isPrimaryIndex = ((ABoolean) rec.getValueByPos(MetadataRecordTypes.INDEX_ARECORD_ISPRIMARY_FIELD_INDEX))
- .getBoolean();
-+ int pendingOp = ((AInt32) rec.getValueByPos(MetadataRecordTypes.INDEX_ARECORD_PENDINGOP_FIELD_INDEX))
-+ .getIntegerValue();
- // Check if there is a gram length as well.
- int gramLength = -1;
- int gramLenPos = rec.getType().findFieldPosition(GRAM_LENGTH_FIELD_NAME);
- if (gramLenPos >= 0) {
- gramLength = ((AInt32) rec.getValueByPos(gramLenPos)).getIntegerValue();
- }
-- return new Index(dvName, dsName, indexName, indexStructure, searchKey, gramLength, isPrimaryIndex);
-+ return new Index(dvName, dsName, indexName, indexStructure, searchKey, gramLength, isPrimaryIndex, pendingOp);
- }
-
- @Override
-@@ -174,7 +176,12 @@
- stringSerde.serialize(aString, fieldValue.getDataOutput());
- recordBuilder.addField(MetadataRecordTypes.INDEX_ARECORD_TIMESTAMP_FIELD_INDEX, fieldValue);
-
-- // write optional field 7
-+ // write field 7
-+ fieldValue.reset();
-+ intSerde.serialize(new AInt32(instance.getPendingOp()), fieldValue.getDataOutput());
-+ recordBuilder.addField(MetadataRecordTypes.INDEX_ARECORD_PENDINGOP_FIELD_INDEX, fieldValue);
-+
-+ // write optional field 8
- if (instance.getGramLength() > 0) {
- fieldValue.reset();
- nameValue.reset();
-Index: asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java
+--- asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/LockManager.java (revision 1194)
++++ asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/LockManager.java (working copy)
+@@ -567,7 +567,7 @@
+ if (commitFlag) {
+ if (txnContext.getTransactionType().equals(TransactionContext.TransactionType.READ_WRITE)) {
+ try {
+- txnSubsystem.getLogManager().log(LogType.COMMIT, txnContext, datasetId.getId(),
++ txnSubsystem.getLogManager().log(LogType.ENTITY_COMMIT, txnContext, datasetId.getId(),
+ entityHashValue, -1, (byte) 0, 0, null, null, logicalLogLocator);
+ } catch (ACIDException e) {
+ try {
+Index: asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/FileBasedBuffer.java
===================================================================
---- asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java (revision 1061)
-+++ asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java (working copy)
-@@ -129,7 +129,7 @@
-
- public class AqlMetadataProvider implements IMetadataProvider<AqlSourceId, String> {
- private static Logger LOGGER = Logger.getLogger(AqlMetadataProvider.class.getName());
-- private final MetadataTransactionContext mdTxnCtx;
-+ private MetadataTransactionContext mdTxnCtx;
- private boolean isWriteTransaction;
- private Map<String, String[]> stores;
- private Map<String, String> config;
-@@ -156,8 +156,7 @@
- return config;
+--- asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/FileBasedBuffer.java (revision 1194)
++++ asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/FileBasedBuffer.java (working copy)
+@@ -75,6 +75,7 @@
+ buffer.position(0);
+ buffer.limit(size);
+ fileChannel.write(buffer);
++ fileChannel.force(false);
+ erase();
}
-- public AqlMetadataProvider(MetadataTransactionContext mdTxnCtx, Dataverse defaultDataverse) {
-- this.mdTxnCtx = mdTxnCtx;
-+ public AqlMetadataProvider(Dataverse defaultDataverse) {
- this.defaultDataverse = defaultDataverse;
- this.stores = AsterixProperties.INSTANCE.getStores();
- }
-@@ -181,6 +180,10 @@
- public void setWriterFactory(IAWriterFactory writerFactory) {
- this.writerFactory = writerFactory;
- }
-+
-+ public void setMetadataTxnContext(MetadataTransactionContext mdTxnCtx) {
-+ this.mdTxnCtx = mdTxnCtx;
-+ }
-
- public MetadataTransactionContext getMetadataTxnContext() {
- return mdTxnCtx;
-Index: asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entities/Dataset.java
+Index: asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogManager.java
===================================================================
---- asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entities/Dataset.java (revision 1061)
-+++ asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entities/Dataset.java (working copy)
-@@ -35,15 +35,18 @@
- private final DatasetType datasetType;
- private IDatasetDetails datasetDetails;
- private final int datasetId;
-+ // Type of pending operations with respect to atomic DDL operation
-+ private final int pendingOp;
+--- asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogManager.java (revision 1194)
++++ asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogManager.java (working copy)
+@@ -1,5 +1,5 @@
+ /*
+- * Copyright 2009-2010 by The Regents of the University of California
++ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+@@ -21,7 +21,12 @@
+ import java.io.RandomAccessFile;
+ import java.nio.ByteBuffer;
+ import java.nio.channels.FileChannel;
++import java.util.ArrayList;
++import java.util.HashMap;
++import java.util.List;
++import java.util.Map;
+ import java.util.Properties;
++import java.util.Set;
+ import java.util.concurrent.LinkedBlockingQueue;
+ import java.util.concurrent.atomic.AtomicInteger;
+ import java.util.concurrent.atomic.AtomicLong;
+@@ -30,22 +35,25 @@
- public Dataset(String dataverseName, String datasetName, String itemTypeName, IDatasetDetails datasetDetails,
-- DatasetType datasetType, int datasetId) {
-+ DatasetType datasetType, int datasetId, int pendingOp) {
- this.dataverseName = dataverseName;
- this.datasetName = datasetName;
- this.itemTypeName = itemTypeName;
- this.datasetType = datasetType;
- this.datasetDetails = datasetDetails;
- this.datasetId = datasetId;
-+ this.pendingOp = pendingOp;
- }
+ import edu.uci.ics.asterix.transaction.management.exception.ACIDException;
+ import edu.uci.ics.asterix.transaction.management.service.logging.IndexLogger.ReusableLogContentObject;
++import edu.uci.ics.asterix.transaction.management.service.logging.LogManager.PageOwnershipStatus;
++import edu.uci.ics.asterix.transaction.management.service.logging.LogManager.PageState;
+ import edu.uci.ics.asterix.transaction.management.service.transaction.TransactionContext;
+ import edu.uci.ics.asterix.transaction.management.service.transaction.TransactionManagementConstants;
+ import edu.uci.ics.asterix.transaction.management.service.transaction.TransactionSubsystem;
++import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
- public String getDataverseName() {
-@@ -73,6 +76,10 @@
- public int getDatasetId() {
- return datasetId;
- }
-+
-+ public int getPendingOp() {
-+ return pendingOp;
-+ }
+ public class LogManager implements ILogManager {
- @Override
- public Object addToCache(MetadataCache cache) {
-Index: asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entities/Index.java
-===================================================================
---- asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entities/Index.java (revision 1061)
-+++ asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entities/Index.java (working copy)
-@@ -45,9 +45,11 @@
- private final boolean isPrimaryIndex;
- // Specific to NGRAM indexes.
- private final int gramLength;
-+ // Type of pending operations with respect to atomic DDL operation
-+ private final int pendingOp;
+ public static final boolean IS_DEBUG_MODE = false;//true
+ private static final Logger LOGGER = Logger.getLogger(LogManager.class.getName());
+- private TransactionSubsystem provider;
++ private final TransactionSubsystem provider;
+ private LogManagerProperties logManagerProperties;
++ private LogPageFlushThread logPageFlusher;
- public Index(String dataverseName, String datasetName, String indexName, IndexType indexType,
-- List<String> keyFieldNames, int gramLength, boolean isPrimaryIndex) {
-+ List<String> keyFieldNames, int gramLength, boolean isPrimaryIndex, int pendingOp) {
- this.dataverseName = dataverseName;
- this.datasetName = datasetName;
- this.indexName = indexName;
-@@ -55,10 +57,11 @@
- this.keyFieldNames = keyFieldNames;
- this.gramLength = gramLength;
- this.isPrimaryIndex = isPrimaryIndex;
-+ this.pendingOp = pendingOp;
- }
+ /*
+ * the array of log pages. The number of log pages is configurable. Pages
+ * taken together form an in-memory log buffer.
+ */
+-
+ private IFileBasedBuffer[] logPages;
- public Index(String dataverseName, String datasetName, String indexName, IndexType indexType,
-- List<String> keyFieldNames, boolean isPrimaryIndex) {
-+ List<String> keyFieldNames, boolean isPrimaryIndex, int pendingOp) {
- this.dataverseName = dataverseName;
- this.datasetName = datasetName;
- this.indexName = indexName;
-@@ -66,6 +69,7 @@
- this.keyFieldNames = keyFieldNames;
- this.gramLength = -1;
- this.isPrimaryIndex = isPrimaryIndex;
-+ this.pendingOp = pendingOp;
- }
+ private ILogRecordHelper logRecordHelper;
+@@ -54,6 +62,7 @@
+ * Number of log pages that constitute the in-memory log buffer.
+ */
+ private int numLogPages;
++
+ /*
+ * Initially all pages have an owner count of 1 that is the LogManager. When
+ * a transaction requests to write in a log page, the owner count is
+@@ -62,12 +71,11 @@
+ * (covering the whole log record). When the content has been put, the log
+ * manager computes the checksum and puts it after the content. At this
+ * point, the ownership count is decremented as the transaction is done with
+- * using the page. When a page is full, the log manager decrements the count
+- * by one indicating that it has released its ownership of the log page.
+- * There could be other transaction(s) still owning the page (that is they
+- * could still be mid-way putting the log content). When the ownership count
+- * eventually reaches zero, the thread responsible for flushing the log page
+- * is notified and the page is flushed to disk.
++ * using the page. When a page is requested to be flushed, logPageFlusher
++ * set the count to 0(LOG_FLUSHER: meaning that the page is being flushed)
++ * only if the count is 1(LOG_WRITER: meaning that there is no other
++ * transactions who own the page to write logs.) After flushing the page,
++ * logPageFlusher set this count to 1.
+ */
+ private AtomicInteger[] logPageOwnerCount;
- public String getDataverseName() {
-@@ -95,6 +99,10 @@
- public boolean isPrimaryIndex() {
- return isPrimaryIndex;
- }
-+
-+ public int getPendingOp() {
-+ return pendingOp;
-+ }
+@@ -78,18 +86,16 @@
- public boolean isSecondaryIndex() {
- return !isPrimaryIndex();
-Index: asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entities/Dataverse.java
-===================================================================
---- asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entities/Dataverse.java (revision 1061)
-+++ asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entities/Dataverse.java (working copy)
-@@ -27,10 +27,12 @@
- // Enforced to be unique within an Asterix cluster..
- private final String dataverseName;
- private final String dataFormat;
-+ private final int pendingOp;
+ /*
+ * LogPageStatus: A page is either ACTIVE or INACTIVE. The status for each
+- * page is maintained in a map called logPageStatus. A page is ACTIVE when
+- * the LogManager can allocate space in the page for writing a log record.
+- * Initially all pages are ACTIVE. As transactions fill up space by writing
+- * log records, a page may not have sufficient space left for serving a
+- * request by a transaction. When this happens, the page is marked INACTIVE.
+- * An INACTIVE page with no owners ( logPageOwnerCount.get(<pageIndex>) ==
+- * 0) indicates that the page must be flushed to disk before any other log
+- * record is written on the page.F
++ * page is maintained in logPageStatus. A page is ACTIVE when the LogManager
++ * can allocate space in the page for writing a log record. Initially all
++ * pages are ACTIVE. As transactions fill up space by writing log records,
++ * a page may not have sufficient space left for serving a request by a
++ * transaction. When this happens, the page is flushed to disk by calling
++ * logPageFlusher.requestFlush(). In the requestFlush(), after groupCommitWaitTime,
++ * the page status is set to INACTIVE. Then, there is no more writer on the
++ * page(meaning the corresponding logPageOwnerCount is 1), the page is flushed
++ * by the logPageFlusher and the status is reset to ACTIVE by the logPageFlusher.
+ */
+-
+- // private Map<Integer, Integer> logPageStatus = new
+- // ConcurrentHashMap<Integer, Integer>();
+ private AtomicInteger[] logPageStatus;
-- public Dataverse(String dataverseName, String format) {
-+ public Dataverse(String dataverseName, String format, int pendingOp) {
- this.dataverseName = dataverseName;
- this.dataFormat = format;
-+ this.pendingOp = pendingOp;
+ static class PageState {
+@@ -98,41 +104,8 @@
}
- public String getDataverseName() {
-@@ -40,6 +42,10 @@
- public String getDataFormat() {
- return dataFormat;
- }
-+
-+ public int getPendingOp() {
-+ return pendingOp;
-+ }
+ private AtomicLong lastFlushedLsn = new AtomicLong(-1);
+- private AtomicInteger lastFlushedPage = new AtomicInteger(-1);
- @Override
- public Object addToCache(MetadataCache cache) {
-Index: asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataNode.java
-===================================================================
---- asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataNode.java (revision 1061)
-+++ asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataNode.java (working copy)
-@@ -25,6 +25,7 @@
- import edu.uci.ics.asterix.common.exceptions.AsterixException;
- import edu.uci.ics.asterix.common.functions.FunctionSignature;
- import edu.uci.ics.asterix.formats.nontagged.AqlSerializerDeserializerProvider;
-+import edu.uci.ics.asterix.metadata.api.IMetadataEntity;
- import edu.uci.ics.asterix.metadata.api.IMetadataIndex;
- import edu.uci.ics.asterix.metadata.api.IMetadataNode;
- import edu.uci.ics.asterix.metadata.api.IValueExtractor;
-@@ -160,7 +161,7 @@
- // Add the primary index for the dataset.
- InternalDatasetDetails id = (InternalDatasetDetails) dataset.getDatasetDetails();
- Index primaryIndex = new Index(dataset.getDataverseName(), dataset.getDatasetName(),
-- dataset.getDatasetName(), IndexType.BTREE, id.getPrimaryKey(), true);
-+ dataset.getDatasetName(), IndexType.BTREE, id.getPrimaryKey(), true, dataset.getPendingOp());
- addIndex(jobId, primaryIndex);
- ITupleReference nodeGroupTuple = createTuple(id.getNodeGroupName(), dataset.getDataverseName(),
- dataset.getDatasetName());
-@@ -260,7 +261,7 @@
- IIndexAccessor indexAccessor = indexInstance.createAccessor(NoOpOperationCallback.INSTANCE,
- NoOpOperationCallback.INSTANCE);
- TransactionContext txnCtx = transactionProvider.getTransactionManager().getTransactionContext(jobId);
-- transactionProvider.getLockManager().lock(index.getDatasetId(), -1, LockMode.X, txnCtx);
-+ //transactionProvider.getLockManager().lock(index.getDatasetId(), -1, LockMode.X, txnCtx);
- // TODO: fix exceptions once new BTree exception model is in hyracks.
- indexAccessor.insert(tuple);
- //TODO: extract the key from the tuple and get the PKHashValue from the key.
-@@ -536,7 +537,7 @@
- // The transaction with txnId will have an S lock on the
- // resource. Note that lock converters have a higher priority than
- // regular waiters in the LockManager.
-- transactionProvider.getLockManager().lock(index.getDatasetId(), -1, LockMode.X, txnCtx);
-+ //transactionProvider.getLockManager().lock(index.getDatasetId(), -1, LockMode.X, txnCtx);
- indexAccessor.delete(tuple);
- //TODO: extract the key from the tuple and get the PKHashValue from the key.
- //check how to get the oldValue.
-@@ -803,7 +804,9 @@
- private <ResultType> void searchIndex(JobId jobId, IMetadataIndex index, ITupleReference searchKey,
- IValueExtractor<ResultType> valueExtractor, List<ResultType> results) throws Exception {
- TransactionContext txnCtx = transactionProvider.getTransactionManager().getTransactionContext(jobId);
-- transactionProvider.getLockManager().lock(index.getDatasetId(), -1, LockMode.S, txnCtx);
-+ //#. currently lock is not needed to access any metadata
-+ // since the non-compatible concurrent access is always protected by the latch in the MetadataManager.
-+ //transactionProvider.getLockManager().lock(index.getDatasetId(), -1, LockMode.S, txnCtx);
- IBinaryComparatorFactory[] comparatorFactories = index.getKeyBinaryComparatorFactory();
- long resourceID = index.getResourceID();
- IIndex indexInstance = indexLifecycleManager.getIndex(resourceID);
-Index: asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/api/IMetadataEntity.java
-===================================================================
---- asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/api/IMetadataEntity.java (revision 1061)
-+++ asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/api/IMetadataEntity.java (working copy)
-@@ -20,6 +20,11 @@
- import edu.uci.ics.asterix.metadata.MetadataCache;
-
- public interface IMetadataEntity extends Serializable {
-+
-+ public static final int PENDING_NO_OP = 0;
-+ public static final int PENDING_ADD_OP = 1;
-+ public static final int PENDING_DROP_OP = 2;
-+
- Object addToCache(MetadataCache cache);
-
- Object dropFromCache(MetadataCache cache);
-Index: asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataManager.java
-===================================================================
---- asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataManager.java (revision 1061)
-+++ asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataManager.java (working copy)
-@@ -17,6 +17,8 @@
-
- import java.rmi.RemoteException;
- import java.util.List;
-+import java.util.concurrent.locks.ReadWriteLock;
-+import java.util.concurrent.locks.ReentrantReadWriteLock;
-
- import edu.uci.ics.asterix.common.functions.FunctionSignature;
- import edu.uci.ics.asterix.metadata.api.IAsterixStateProxy;
-@@ -79,11 +81,10 @@
- public class MetadataManager implements IMetadataManager {
- // Set in init().
- public static MetadataManager INSTANCE;
+ /*
+- * pendingFlushRequests is a map with key as Integer denoting the page
+- * index. When a (transaction) thread discovers the need to flush a page, it
+- * puts its Thread object into the corresponding value that is a
+- * LinkedBlockingQueue. The LogManager has a LogFlusher thread that scans
+- * this map in order of page index (and circling around). The flusher thread
+- * needs to flush pages in order and waits for a thread to deposit an object
+- * in the blocking queue corresponding to the next page in order. A request
+- * to flush a page is conveyed to the flush thread by simply depositing an
+- * object in to corresponding blocking queue. It is blocking in the sense
+- * that the flusher thread will continue to wait for an object to arrive in
+- * the queue. The object itself is ignored by the fliusher and just acts as
+- * a signal/event that a page needs to be flushed.
+- */
-
- private final MetadataCache cache = new MetadataCache();
- private IAsterixStateProxy proxy;
- private IMetadataNode metadataNode;
+- private LinkedBlockingQueue[] pendingFlushRequests;
-
-+
- public MetadataManager(IAsterixStateProxy proxy) {
- if (proxy == null) {
- throw new Error("Null proxy given to MetadataManager.");
-@@ -206,11 +207,14 @@
+- /*
+- * ICommitResolver is an interface that provides an API that can answer a
+- * simple boolean - Given the commit requests so far, should a page be
+- * flushed. The implementation of the interface contains the logic (or you
+- * can say the policy) for commit. It could be group commit in which case
+- * the commit resolver may not return a true indicating that it wishes to
+- * delay flushing of the page.
+- */
+- private ICommitResolver commitResolver;
+-
+- /*
+- * An object that keeps track of the submitted commit requests.
+- */
+- private CommitRequestStatistics commitRequestStatistics;
+-
+- /*
+ * When the transaction eco-system comes to life, the log manager positions
+ * itself to the end of the last written log. the startingLsn represent the
+ * lsn value of the next log record to be written after a system (re)start.
+@@ -146,16 +119,10 @@
+ */
+ private AtomicLong lsn = new AtomicLong(0);
- @Override
- public void addDataset(MetadataTransactionContext ctx, Dataset dataset) throws MetadataException {
-+ // add dataset into metadataNode
- try {
- metadataNode.addDataset(ctx.getJobId(), dataset);
- } catch (RemoteException e) {
- throw new MetadataException(e);
- }
-+
-+ // reflect the dataset into the cache
- ctx.addDataset(dataset);
+- /*
+- * A map that tracks the flush requests submitted for each page. The
+- * requests for a page are cleared when the page is flushed.
+- */
+- public LinkedBlockingQueue<Thread> getPendingFlushRequests(int pageIndex) {
+- return pendingFlushRequests[pageIndex];
+- }
++ private List<HashMap<TransactionContext, Integer>> activeTxnCountMaps;
+
+- public void addFlushRequest(int pageIndex) {
+- pendingFlushRequests[pageIndex].add(pendingFlushRequests);
++ public void addFlushRequest(int pageIndex, long lsn, boolean isSynchronous) {
++ logPageFlusher.requestFlush(pageIndex, lsn, isSynchronous);
}
-@@ -585,4 +589,5 @@
+ public AtomicLong getLastFlushedLsn() {
+@@ -233,19 +200,12 @@
+ numLogPages = logManagerProperties.getNumLogPages();
+ logPageOwnerCount = new AtomicInteger[numLogPages];
+ logPageStatus = new AtomicInteger[numLogPages];
+- pendingFlushRequests = new LinkedBlockingQueue[numLogPages];
+- if (logManagerProperties.getGroupCommitWaitPeriod() > 0) { // configure
+- // the
+- // Commit
+- // Resolver
+- commitResolver = new GroupCommitResolver(); // Group Commit is
+- // enabled
+- commitRequestStatistics = new CommitRequestStatistics(numLogPages);
+- } else {
+- commitResolver = new BasicCommitResolver(); // the basic commit
+- // resolver
++
++ activeTxnCountMaps = new ArrayList<HashMap<TransactionContext, Integer>>(numLogPages);
++ for (int i = 0; i < numLogPages; i++) {
++ activeTxnCountMaps.add(new HashMap<TransactionContext, Integer>());
}
- return adapter;
- }
+- this.commitResolver.init(this); // initialize the commit resolver
+
- }
-\ No newline at end of file
-Index: asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataTransactionContext.java
-===================================================================
---- asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataTransactionContext.java (revision 1061)
-+++ asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataTransactionContext.java (working copy)
-@@ -19,6 +19,7 @@
+ logPages = new FileBasedBuffer[numLogPages];
- import edu.uci.ics.asterix.common.functions.FunctionSignature;
- import edu.uci.ics.asterix.external.dataset.adapter.AdapterIdentifier;
-+import edu.uci.ics.asterix.metadata.api.IMetadataEntity;
- import edu.uci.ics.asterix.metadata.entities.Dataset;
- import edu.uci.ics.asterix.metadata.entities.DatasourceAdapter;
- import edu.uci.ics.asterix.metadata.entities.Datatype;
-@@ -104,19 +105,19 @@
- }
+ /*
+@@ -264,7 +224,6 @@
+ for (int i = 0; i < numLogPages; i++) {
+ logPageOwnerCount[i] = new AtomicInteger(PageOwnershipStatus.LOG_WRITER);
+ logPageStatus[i] = new AtomicInteger(PageState.ACTIVE);
+- pendingFlushRequests[i] = new LinkedBlockingQueue<Thread>();
+ }
- public void dropDataset(String dataverseName, String datasetName) {
-- Dataset dataset = new Dataset(dataverseName, datasetName, null, null, null, -1);
-+ Dataset dataset = new Dataset(dataverseName, datasetName, null, null, null, -1, IMetadataEntity.PENDING_NO_OP);
- droppedCache.addDatasetIfNotExists(dataset);
- logAndApply(new MetadataLogicalOperation(dataset, false));
+ /*
+@@ -278,9 +237,9 @@
+ * daemon thread so that it does not stop the JVM from exiting when all
+ * other threads are done with their work.
+ */
+- LogPageFlushThread logFlusher = new LogPageFlushThread(this);
+- logFlusher.setDaemon(true);
+- logFlusher.start();
++ logPageFlusher = new LogPageFlushThread(this);
++ logPageFlusher.setDaemon(true);
++ logPageFlusher.start();
}
- public void dropIndex(String dataverseName, String datasetName, String indexName) {
-- Index index = new Index(dataverseName, datasetName, indexName, null, null, false);
-+ Index index = new Index(dataverseName, datasetName, indexName, null, null, false, IMetadataEntity.PENDING_NO_OP);
- droppedCache.addIndexIfNotExists(index);
- logAndApply(new MetadataLogicalOperation(index, false));
- }
-
- public void dropDataverse(String dataverseName) {
-- Dataverse dataverse = new Dataverse(dataverseName, null);
-+ Dataverse dataverse = new Dataverse(dataverseName, null, IMetadataEntity.PENDING_NO_OP);
- droppedCache.addDataverseIfNotExists(dataverse);
- logAndApply(new MetadataLogicalOperation(dataverse, false));
- }
-@@ -162,7 +163,7 @@
+ public int getLogPageIndex(long lsnValue) {
+@@ -312,7 +271,7 @@
+ */
+ private void waitUntillPageIsAvailableForWritingLog(int pageIndex) throws ACIDException {
+ if (logPageStatus[pageIndex].get() == PageState.ACTIVE
+- && getLogPageOwnershipCount(pageIndex).get() >= PageOwnershipStatus.LOG_WRITER) {
++ && logPageOwnerCount[pageIndex].get() >= PageOwnershipStatus.LOG_WRITER) {
+ return;
}
- return droppedCache.getDataset(dataverseName, datasetName) != null;
- }
--
+ try {
+@@ -338,47 +297,40 @@
+ */
+ private long getLsn(int entrySize, byte logType) throws ACIDException {
+ long pageSize = logManagerProperties.getLogPageSize();
+- boolean requiresFlushing = logType == LogType.COMMIT;
+
- public boolean indexIsDropped(String dataverseName, String datasetName, String indexName) {
- if (droppedCache.getDataverse(dataverseName) != null) {
- return true;
-Index: asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataRecordTypes.java
-===================================================================
---- asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataRecordTypes.java (revision 1061)
-+++ asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataRecordTypes.java (working copy)
-@@ -80,10 +80,11 @@
- public static final int DATAVERSE_ARECORD_NAME_FIELD_INDEX = 0;
- public static final int DATAVERSE_ARECORD_FORMAT_FIELD_INDEX = 1;
- public static final int DATAVERSE_ARECORD_TIMESTAMP_FIELD_INDEX = 2;
-+ public static final int DATAVERSE_ARECORD_PENDINGOP_FIELD_INDEX = 3;
+ while (true) {
+ boolean forwardPage = false;
+- boolean shouldFlushPage = false;
+ long old = lsn.get();
+- int pageIndex = getLogPageIndex(old); // get the log page
+- // corresponding to the
+- // current lsn value
++
++ //get the log page corresponding to the current lsn value
++ int pageIndex = getLogPageIndex(old);
+ long retVal = old;
+- long next = old + entrySize; // the lsn value for the next request,
+- // if the current request is served.
++
++ // the lsn value for the next request if the current request is served.
++ long next = old + entrySize;
+ int prevPage = -1;
+- if ((next - 1) / pageSize != old / pageSize // check if the log
+- // record will cross
+- // page boundaries, a
+- // case that is not
+- // allowed.
+- || (next % pageSize == 0)) {
++
++ // check if the log record will cross page boundaries, a case that is not allowed.
++ if ((next - 1) / pageSize != old / pageSize || (next % pageSize == 0)) {
++
+ if ((old != 0 && old % pageSize == 0)) {
+- retVal = old; // On second thought, this shall never be the
+- // case as it means that the lsn is
+- // currently at the beginning of a page and
+- // we still need to forward the page which
+- // means that the entrySize exceeds a log
+- // page size. If this is the case, an
+- // exception is thrown before calling this
+- // API.
+- // would remove this case.
++ // On second thought, this shall never be the case as it means that the lsn is
++ // currently at the beginning of a page and we still need to forward the page which
++ // means that the entrySize exceeds a log page size. If this is the case, an
++ // exception is thrown before calling this API. would remove this case.
++ retVal = old;
- private static final ARecordType createDataverseRecordType() {
-- return new ARecordType("DataverseRecordType", new String[] { "DataverseName", "DataFormat", "Timestamp" },
-- new IAType[] { BuiltinType.ASTRING, BuiltinType.ASTRING, BuiltinType.ASTRING }, true);
-+ return new ARecordType("DataverseRecordType", new String[] { "DataverseName", "DataFormat", "Timestamp", "PendingOp" },
-+ new IAType[] { BuiltinType.ASTRING, BuiltinType.ASTRING, BuiltinType.ASTRING, BuiltinType.AINT32 }, true);
- }
+ } else {
+- retVal = ((old / pageSize) + 1) * pageSize; // set the lsn
+- // to point to
+- // the beginning
+- // of the next
+- // page.
++ // set the lsn to point to the beginning of the next page.
++ retVal = ((old / pageSize) + 1) * pageSize;
+ }
++
+ next = retVal;
+- forwardPage = true; // as the log record shall cross log page
+- // boundary, we must re-assign the lsn (so
+- // that the log record begins on a different
+- // location.
++
++ // as the log record shall cross log page boundary, we must re-assign the lsn so
++ // that the log record begins on a different location.
++ forwardPage = true;
++
+ prevPage = pageIndex;
+ pageIndex = getNextPageInSequence(pageIndex);
+ }
+@@ -397,109 +349,51 @@
+ */
+ waitUntillPageIsAvailableForWritingLog(pageIndex);
- // Helper constants for accessing fields in an ARecord of anonymous type
-@@ -158,10 +159,11 @@
- public static final int DATASET_ARECORD_FEEDDETAILS_FIELD_INDEX = 6;
- public static final int DATASET_ARECORD_TIMESTAMP_FIELD_INDEX = 7;
- public static final int DATASET_ARECORD_DATASETID_FIELD_INDEX = 8;
-+ public static final int DATASET_ARECORD_PENDINGOP_FIELD_INDEX = 9;
+- if (!forwardPage && requiresFlushing) {
+- shouldFlushPage = commitResolver.shouldCommitPage(pageIndex, this, commitRequestStatistics);
+- if (shouldFlushPage) {
+- next = ((next / pageSize) + 1) * pageSize; /*
+- * next
+- * represents the
+- * next value of
+- * lsn after this
+- * log record has
+- * been written.
+- * If the page
+- * needs to be
+- * flushed, then
+- * we do not give
+- * any more LSNs
+- * from this
+- * page.
+- */
+- }
+- }
+- if (!lsn.compareAndSet(old, next)) { // Atomic call -> returns true
+- // only when the value
+- // represented by lsn is same as
+- // "old". The value is updated
+- // to "next".
++ if (!lsn.compareAndSet(old, next)) {
++ // Atomic call -> returns true only when the value represented by lsn is same as
++ // "old". The value is updated to "next".
+ continue;
+ }
- private static final ARecordType createDatasetRecordType() {
- String[] fieldNames = { "DataverseName", "DatasetName", "DataTypeName", "DatasetType", "InternalDetails",
-- "ExternalDetails", "FeedDetails", "Timestamp", "DatasetId" };
-+ "ExternalDetails", "FeedDetails", "Timestamp", "DatasetId", "PendingOp" };
+ if (forwardPage) {
+- //TODO
+- //this is not safe since the incoming thread may reach the same page slot with this page
+- //(differ by the log buffer size)
+- logPageStatus[prevPage].set(PageState.INACTIVE); // mark
+- // previous
+- // page
+- // inactive
++ addFlushRequest(prevPage, old, false);
- List<IAType> internalRecordUnionList = new ArrayList<IAType>();
- internalRecordUnionList.add(BuiltinType.ANULL);
-@@ -179,7 +181,8 @@
- AUnionType feedRecordUnion = new AUnionType(feedRecordUnionList, null);
-
- IAType[] fieldTypes = { BuiltinType.ASTRING, BuiltinType.ASTRING, BuiltinType.ASTRING, BuiltinType.ASTRING,
-- internalRecordUnion, externalRecordUnion, feedRecordUnion, BuiltinType.ASTRING, BuiltinType.AINT32 };
-+ internalRecordUnion, externalRecordUnion, feedRecordUnion, BuiltinType.ASTRING, BuiltinType.AINT32,
-+ BuiltinType.AINT32 };
- return new ARecordType("DatasetRecordType", fieldNames, fieldTypes, true);
+- /*
+- * decrement on the behalf of the log manager. if there are no
+- * more owners (count == 0) the page must be marked as a
+- * candidate to be flushed.
+- */
+- int pageDirtyCount = getLogPageOwnershipCount(prevPage).decrementAndGet();
+- if (pageDirtyCount == 0) {
+- addFlushRequest(prevPage);
+- }
+-
+- /*
+- * The transaction thread that discovers the need to forward a
+- * page is made to re-acquire a lsn.
+- */
++ // The transaction thread that discovers the need to forward a
++ // page is made to re-acquire a lsn.
+ continue;
++
+ } else {
+- /*
+- * the transaction thread has been given a space in a log page,
+- * but is made to wait until the page is available.
+- */
++ // the transaction thread has been given a space in a log page,
++ // but is made to wait until the page is available.
++ // (Is this needed? when does this wait happen?)
+ waitUntillPageIsAvailableForWritingLog(pageIndex);
+- /*
+- * increment the counter as the transaction thread now holds a
+- * space in the log page and hence is an owner.
+- */
++
++ // increment the counter as the transaction thread now holds a
++ // space in the log page and hence is an owner.
+ logPageOwnerCount[pageIndex].incrementAndGet();
+- }
+- if (requiresFlushing) {
+- if (!shouldFlushPage) {
+- /*
+- * the log record requires the page to be flushed but under
+- * the commit policy, the flush task has been deferred. The
+- * transaction thread submits its request to flush the page.
+- */
+- commitRequestStatistics.registerCommitRequest(pageIndex);
+- } else {
+- /*
+- * the flush request was approved by the commit resolver.
+- * Thus the page is marked INACTIVE as no more logs will be
+- * written on this page. The log manager needs to release
+- * its ownership. Note that transaction threads may still
+- * continue to be owners of the log page till they fill up
+- * the space allocated to them.
+- */
+- logPageStatus[pageIndex].set(PageState.INACTIVE);
+- logPageOwnerCount[pageIndex].decrementAndGet(); // on
+- // the
+- // behalf
+- // of
+- // log
+- // manager
++
++ // Before the count is incremented, if the flusher flushed the allocated page,
++ // then retry to get new LSN. Otherwise, the log with allocated lsn will be lost.
++ if (lastFlushedLsn.get() >= retVal) {
++ logPageOwnerCount[pageIndex].decrementAndGet();
++ continue;
+ }
+ }
++
+ return retVal;
+ }
}
-@@ -264,13 +267,14 @@
- public static final int INDEX_ARECORD_SEARCHKEY_FIELD_INDEX = 4;
- public static final int INDEX_ARECORD_ISPRIMARY_FIELD_INDEX = 5;
- public static final int INDEX_ARECORD_TIMESTAMP_FIELD_INDEX = 6;
-+ public static final int INDEX_ARECORD_PENDINGOP_FIELD_INDEX = 7;
+ @Override
+- public void log(byte logType, TransactionContext context, int datasetId, int PKHashValue, long resourceId,
++ public void log(byte logType, TransactionContext txnCtx, int datasetId, int PKHashValue, long resourceId,
+ byte resourceMgrId, int logContentSize, ReusableLogContentObject reusableLogContentObject, ILogger logger,
+ LogicalLogLocator logicalLogLocator) throws ACIDException {
+- /*
+- * logLocator is a re-usable object that is appropriately set in each
+- * invocation. If the reference is null, the log manager must throw an
+- * exception
+- */
++
++ HashMap<TransactionContext, Integer> map = null;
++ int activeTxnCount;
++
++ // logLocator is a re-usable object that is appropriately set in each invocation.
++ // If the reference is null, the log manager must throw an exception.
+ if (logicalLogLocator == null) {
+ throw new ACIDException(
+ " you need to pass in a non-null logLocator, if you dont have it, then pass in a dummy so that the +"
+@@ -519,20 +413,19 @@
- private static final ARecordType createIndexRecordType() {
- AOrderedListType olType = new AOrderedListType(BuiltinType.ASTRING, null);
- String[] fieldNames = { "DataverseName", "DatasetName", "IndexName", "IndexStructure", "SearchKey",
-- "IsPrimary", "Timestamp" };
-+ "IsPrimary", "Timestamp", "PendingOp" };
- IAType[] fieldTypes = { BuiltinType.ASTRING, BuiltinType.ASTRING, BuiltinType.ASTRING, BuiltinType.ASTRING,
-- olType, BuiltinType.ABOOLEAN, BuiltinType.ASTRING };
-+ olType, BuiltinType.ABOOLEAN, BuiltinType.ASTRING, BuiltinType.AINT32 };
- return new ARecordType("IndexRecordType", fieldNames, fieldTypes, true);
- };
+ // all constraints checked and we are good to go and acquire a lsn.
+ long previousLSN = -1;
+- long currentLSN; // the will be set to the location (a long value)
+- // where the log record needs to be placed.
-Index: asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataBootstrap.java
-===================================================================
---- asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataBootstrap.java (revision 1061)
-+++ asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataBootstrap.java (working copy)
-@@ -31,6 +31,7 @@
- import edu.uci.ics.asterix.metadata.IDatasetDetails;
- import edu.uci.ics.asterix.metadata.MetadataManager;
- import edu.uci.ics.asterix.metadata.MetadataTransactionContext;
-+import edu.uci.ics.asterix.metadata.api.IMetadataEntity;
- import edu.uci.ics.asterix.metadata.api.IMetadataIndex;
- import edu.uci.ics.asterix.metadata.entities.AsterixBuiltinTypeMap;
- import edu.uci.ics.asterix.metadata.entities.Dataset;
-@@ -226,7 +227,7 @@
- public static void insertInitialDataverses(MetadataTransactionContext mdTxnCtx) throws Exception {
- String dataverseName = MetadataPrimaryIndexes.DATAVERSE_DATASET.getDataverseName();
- String dataFormat = NonTaggedDataFormat.NON_TAGGED_DATA_FORMAT;
-- MetadataManager.INSTANCE.addDataverse(mdTxnCtx, new Dataverse(dataverseName, dataFormat));
-+ MetadataManager.INSTANCE.addDataverse(mdTxnCtx, new Dataverse(dataverseName, dataFormat, IMetadataEntity.PENDING_NO_OP));
- }
+- /*
+- * The logs written by a transaction need to be linked to each other for
+- * a successful rollback/recovery. However there could be multiple
+- * threads operating concurrently that are part of a common transaction.
+- * These threads need to synchronize and record the lsn corresponding to
+- * the last log record written by (any thread of) the transaction.
+- */
+- synchronized (context) {
+- previousLSN = context.getLastLogLocator().getLsn();
++ // the will be set to the location (a long value) where the log record needs to be placed.
++ long currentLSN;
++
++ // The logs written by a transaction need to be linked to each other for
++ // a successful rollback/recovery. However there could be multiple
++ // threads operating concurrently that are part of a common transaction.
++ // These threads need to synchronize and record the lsn corresponding to
++ // the last log record written by (any thread of) the transaction.
++ synchronized (txnCtx) {
++ previousLSN = txnCtx.getLastLogLocator().getLsn();
+ currentLSN = getLsn(totalLogSize, logType);
+- context.setLastLSN(currentLSN);
++ txnCtx.setLastLSN(currentLSN);
+ if (IS_DEBUG_MODE) {
+ System.out.println("--------------> LSN(" + currentLSN + ") is allocated");
+ }
+@@ -547,48 +440,37 @@
+ * performed correctly that is ownership is released.
+ */
- public static void insertInitialDatasets(MetadataTransactionContext mdTxnCtx) throws Exception {
-@@ -236,7 +237,7 @@
- primaryIndexes[i].getNodeGroupName());
- MetadataManager.INSTANCE.addDataset(mdTxnCtx, new Dataset(primaryIndexes[i].getDataverseName(),
- primaryIndexes[i].getIndexedDatasetName(), primaryIndexes[i].getPayloadRecordType().getTypeName(),
-- id, DatasetType.INTERNAL, primaryIndexes[i].getDatasetId().getId()));
-+ id, DatasetType.INTERNAL, primaryIndexes[i].getDatasetId().getId(), IMetadataEntity.PENDING_NO_OP));
- }
- }
+- boolean decremented = false; // indicates if the transaction thread
+- // has release ownership of the
+- // page.
+- boolean addedFlushRequest = false; // indicates if the transaction
+- // thread has submitted a flush
+- // request.
++ // indicates if the transaction thread has release ownership of the page.
++ boolean decremented = false;
-@@ -267,7 +268,7 @@
- for (int i = 0; i < secondaryIndexes.length; i++) {
- MetadataManager.INSTANCE.addIndex(mdTxnCtx, new Index(secondaryIndexes[i].getDataverseName(),
- secondaryIndexes[i].getIndexedDatasetName(), secondaryIndexes[i].getIndexName(), IndexType.BTREE,
-- secondaryIndexes[i].getPartitioningExpr(), false));
-+ secondaryIndexes[i].getPartitioningExpr(), false, IMetadataEntity.PENDING_NO_OP));
- }
- }
+ int pageIndex = (int) getLogPageIndex(currentLSN);
-Index: asterix-app/src/test/java/edu/uci/ics/asterix/test/runtime/ExecutionTest.java
-===================================================================
---- asterix-app/src/test/java/edu/uci/ics/asterix/test/runtime/ExecutionTest.java (revision 1061)
-+++ asterix-app/src/test/java/edu/uci/ics/asterix/test/runtime/ExecutionTest.java (working copy)
-@@ -95,11 +95,11 @@
- File testFile = tcCtx.getTestFile(cUnit);
-
- /*************** to avoid run failure cases ****************
-- if (!testFile.getAbsolutePath().contains("runtimets/queries/failure/")) {
-+ if (!testFile.getAbsolutePath().contains("index-selection/")) {
- continue;
- }
- ************************************************************/
+- /*
+- * the lsn has been obtained for the log record. need to set the
+- * LogLocator instance accordingly.
+- */
-
-+
- File expectedResultFile = tcCtx.getExpectedResultFile(cUnit);
- File actualFile = new File(PATH_ACTUAL + File.separator
- + tcCtx.getTestCase().getFilePath().replace(File.separator, "_") + "_" + cUnit.getName() + ".adm");
-Index: asterix-app/src/main/java/edu/uci/ics/asterix/file/DatasetOperations.java
-===================================================================
---- asterix-app/src/main/java/edu/uci/ics/asterix/file/DatasetOperations.java (revision 1061)
-+++ asterix-app/src/main/java/edu/uci/ics/asterix/file/DatasetOperations.java (working copy)
-@@ -90,7 +90,7 @@
++ // the lsn has been obtained for the log record. need to set the
++ // LogLocator instance accordingly.
+ try {
+-
+ logicalLogLocator.setBuffer(logPages[pageIndex]);
+ int pageOffset = getLogPageOffset(currentLSN);
+ logicalLogLocator.setMemoryOffset(pageOffset);
- private static Logger LOGGER = Logger.getLogger(DatasetOperations.class.getName());
+- /*
+- * write the log header.
+- */
+- logRecordHelper.writeLogHeader(logicalLogLocator, logType, context, datasetId, PKHashValue, previousLSN,
++ // write the log header.
++ logRecordHelper.writeLogHeader(logicalLogLocator, logType, txnCtx, datasetId, PKHashValue, previousLSN,
+ resourceId, resourceMgrId, logContentSize);
-- public static JobSpecification[] createDropDatasetJobSpec(CompiledDatasetDropStatement datasetDropStmt,
-+ public static JobSpecification createDropDatasetJobSpec(CompiledDatasetDropStatement datasetDropStmt,
- AqlMetadataProvider metadataProvider) throws AlgebricksException, HyracksDataException, RemoteException,
- ACIDException, AsterixException {
+ // increment the offset so that the transaction can fill up the
+ // content in the correct region of the allocated space.
+ logicalLogLocator.increaseMemoryOffset(logRecordHelper.getLogHeaderSize(logType));
-@@ -111,67 +111,10 @@
- throw new AlgebricksException("DROP DATASET: No metadata for dataset " + datasetName);
- }
- if (dataset.getDatasetType() == DatasetType.EXTERNAL) {
-- return new JobSpecification[0];
-+ return new JobSpecification();
- }
--
-- List<Index> datasetIndexes = metadataProvider.getDatasetIndexes(dataset.getDataverseName(),
-- dataset.getDatasetName());
-- int numSecondaryIndexes = 0;
-- for (Index index : datasetIndexes) {
-- if (index.isSecondaryIndex()) {
-- numSecondaryIndexes++;
-- }
-- }
-- JobSpecification[] specs;
-- if (numSecondaryIndexes > 0) {
-- specs = new JobSpecification[numSecondaryIndexes + 1];
-- int i = 0;
-- // First, drop secondary indexes.
-- for (Index index : datasetIndexes) {
-- if (index.isSecondaryIndex()) {
-- specs[i] = new JobSpecification();
-- Pair<IFileSplitProvider, AlgebricksPartitionConstraint> idxSplitsAndConstraint = metadataProvider
-- .splitProviderAndPartitionConstraintsForInternalOrFeedDataset(dataset.getDataverseName(),
-- datasetName, index.getIndexName());
-- IIndexDataflowHelperFactory dfhFactory;
-- switch (index.getIndexType()) {
-- case BTREE:
-- dfhFactory = new LSMBTreeDataflowHelperFactory(
-- AsterixRuntimeComponentsProvider.LSMBTREE_PROVIDER,
-- AsterixRuntimeComponentsProvider.LSMBTREE_PROVIDER,
-- AsterixRuntimeComponentsProvider.LSMBTREE_PROVIDER);
-- break;
-- case RTREE:
-- dfhFactory = new LSMRTreeDataflowHelperFactory(
-- new IPrimitiveValueProviderFactory[] { null }, RTreePolicyType.RTREE,
-- new IBinaryComparatorFactory[] { null },
-- AsterixRuntimeComponentsProvider.LSMRTREE_PROVIDER,
-- AsterixRuntimeComponentsProvider.LSMRTREE_PROVIDER,
-- AsterixRuntimeComponentsProvider.LSMRTREE_PROVIDER, null);
-- break;
-- case NGRAM_INVIX:
-- case WORD_INVIX:
-- dfhFactory = new LSMInvertedIndexDataflowHelperFactory(
-- AsterixRuntimeComponentsProvider.LSMINVERTEDINDEX_PROVIDER,
-- AsterixRuntimeComponentsProvider.LSMINVERTEDINDEX_PROVIDER,
-- AsterixRuntimeComponentsProvider.LSMINVERTEDINDEX_PROVIDER);
-- break;
-- default:
-- throw new AsterixException("Unknown index type provided.");
-- }
-- IndexDropOperatorDescriptor secondaryBtreeDrop = new IndexDropOperatorDescriptor(specs[i],
-- AsterixRuntimeComponentsProvider.NOINDEX_PROVIDER,
-- AsterixRuntimeComponentsProvider.NOINDEX_PROVIDER, idxSplitsAndConstraint.first, dfhFactory);
-- AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(specs[i], secondaryBtreeDrop,
-- idxSplitsAndConstraint.second);
-- i++;
-- }
-- }
-- } else {
-- specs = new JobSpecification[1];
-- }
-+
- JobSpecification specPrimary = new JobSpecification();
-- specs[specs.length - 1] = specPrimary;
+- // a COMMIT log record does not have any content
+- // and hence the logger (responsible for putting the log content) is
+- // not invoked.
++ // a COMMIT log record does not have any content and hence
++ // the logger (responsible for putting the log content) is not invoked.
+ if (logContentSize != 0) {
+- logger.preLog(context, reusableLogContentObject);
++ logger.preLog(txnCtx, reusableLogContentObject);
+ }
- Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint = metadataProvider
- .splitProviderAndPartitionConstraintsForInternalOrFeedDataset(dataset.getDataverseName(), datasetName,
-@@ -187,7 +130,7 @@
+ if (logContentSize != 0) {
+ // call the logger implementation and ask to fill in the log
+ // record content at the allocated space.
+- logger.log(context, logicalLogLocator, logContentSize, reusableLogContentObject);
+- logger.postLog(context, reusableLogContentObject);
++ logger.log(txnCtx, logicalLogLocator, logContentSize, reusableLogContentObject);
++ logger.postLog(txnCtx, reusableLogContentObject);
+ if (IS_DEBUG_MODE) {
+ logicalLogLocator.setMemoryOffset(logicalLogLocator.getMemoryOffset()
+ - logRecordHelper.getLogHeaderSize(logType));
+@@ -597,10 +479,8 @@
+ }
+ }
- specPrimary.addRoot(primaryBtreeDrop);
+- /*
+- * The log record has been written. For integrity checks, compute
+- * the checksum and put it at the end of the log record.
+- */
++ // The log record has been written. For integrity checks, compute
++ // the checksum and put it at the end of the log record.
+ int startPosChecksum = logicalLogLocator.getMemoryOffset() - logRecordHelper.getLogHeaderSize(logType);
+ int length = totalLogSize - logRecordHelper.getLogChecksumSize();
+ long checksum = DataUtil.getChecksum(logPages[pageIndex], startPosChecksum, length);
+@@ -611,46 +491,31 @@
+ System.out.println("--------------> LSN(" + currentLSN + ") is written");
+ }
-- return specs;
-+ return specPrimary;
- }
+- /*
+- * release the ownership as the log record has been placed in
+- * created space.
+- */
+- int pageDirtyCount = logPageOwnerCount[pageIndex].decrementAndGet();
++ // release the ownership as the log record has been placed in created space.
++ logPageOwnerCount[pageIndex].decrementAndGet();
- public static JobSpecification createDatasetJobSpec(Dataverse dataverse, String datasetName,
-Index: asterix-app/src/main/java/edu/uci/ics/asterix/aql/translator/AqlTranslator.java
-===================================================================
---- asterix-app/src/main/java/edu/uci/ics/asterix/aql/translator/AqlTranslator.java (revision 1061)
-+++ asterix-app/src/main/java/edu/uci/ics/asterix/aql/translator/AqlTranslator.java (working copy)
-@@ -21,6 +21,8 @@
- import java.util.HashMap;
- import java.util.List;
- import java.util.Map;
-+import java.util.concurrent.locks.ReadWriteLock;
-+import java.util.concurrent.locks.ReentrantReadWriteLock;
+ // indicating that the transaction thread has released ownership
+ decremented = true;
- import org.json.JSONException;
+- /*
+- * If the transaction thread happens to be the last owner of the log
+- * page the page must by marked as a candidate to be flushed.
+- */
+- if (pageDirtyCount == 0 && logPageStatus[pageIndex].get() == PageState.INACTIVE) {
+- addFlushRequest(pageIndex);
+- addedFlushRequest = true;
+- }
+-
+- /*
+- * If the log type is commit, a flush request is registered, if the
+- * log record has not reached the disk. It may be possible that this
+- * thread does not get CPU cycles and in-between the log record has
+- * been flushed to disk because the containing log page filled up.
+- */
+- if (logType == LogType.COMMIT) {
+- synchronized (logPages[pageIndex]) {
+- while (getLastFlushedLsn().get() < currentLSN) {
+- logPages[pageIndex].wait();
+- }
++ if (logType == LogType.ENTITY_COMMIT) {
++ map = activeTxnCountMaps.get(pageIndex);
++ if (map.containsKey(txnCtx)) {
++ activeTxnCount = (Integer) map.get(txnCtx);
++ activeTxnCount++;
++ map.put(txnCtx, activeTxnCount);
++ } else {
++ map.put(txnCtx, 1);
+ }
++ addFlushRequest(pageIndex, currentLSN, false);
++ } else if (logType == LogType.COMMIT) {
++ addFlushRequest(pageIndex, currentLSN, true);
+ }
-@@ -68,6 +70,7 @@
- import edu.uci.ics.asterix.metadata.MetadataException;
- import edu.uci.ics.asterix.metadata.MetadataManager;
- import edu.uci.ics.asterix.metadata.MetadataTransactionContext;
-+import edu.uci.ics.asterix.metadata.api.IMetadataEntity;
- import edu.uci.ics.asterix.metadata.declared.AqlMetadataProvider;
- import edu.uci.ics.asterix.metadata.entities.Dataset;
- import edu.uci.ics.asterix.metadata.entities.Datatype;
-@@ -112,6 +115,7 @@
- private final PrintWriter out;
- private final SessionConfig sessionConfig;
- private final DisplayFormat pdf;
-+ private final ReadWriteLock cacheLatch;
- private Dataverse activeDefaultDataverse;
- private List<FunctionDecl> declaredFunctions;
+ } catch (Exception e) {
+ e.printStackTrace();
+- throw new ACIDException(context, "Thread: " + Thread.currentThread().getName()
++ throw new ACIDException(txnCtx, "Thread: " + Thread.currentThread().getName()
+ + " logger encountered exception", e);
+ } finally {
+- /*
+- * If an exception was encountered and we did not release ownership
+- */
+ if (!decremented) {
+ logPageOwnerCount[pageIndex].decrementAndGet();
+ }
+@@ -667,9 +532,6 @@
-@@ -121,6 +125,7 @@
- this.out = out;
- this.sessionConfig = pc;
- this.pdf = pdf;
-+ this.cacheLatch = new ReentrantReadWriteLock(true);
- declaredFunctions = getDeclaredFunctions(aqlStatements);
+ logPages[pageIndex].reset(filePath, LogUtil.getFileOffset(this, nextWritePosition),
+ logManagerProperties.getLogPageSize());
+-
+- //TODO Check if this is necessary
+- //Arrays.fill(logPages[pageIndex].getArray(), (byte) 0);
}
-@@ -143,8 +148,7 @@
+ @Override
+@@ -747,16 +609,13 @@
+ //minimize memory allocation overhead. current code allocates the log page size per reading a log record.
- for (Statement stmt : aqlStatements) {
- validateOperation(activeDefaultDataverse, stmt);
-- MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
-- AqlMetadataProvider metadataProvider = new AqlMetadataProvider(mdTxnCtx, activeDefaultDataverse);
-+ AqlMetadataProvider metadataProvider = new AqlMetadataProvider(activeDefaultDataverse);
- metadataProvider.setWriterFactory(writerFactory);
- metadataProvider.setOutputFile(outputFile);
- metadataProvider.setConfig(config);
-@@ -253,15 +257,9 @@
- }
+ byte[] pageContent = new byte[logManagerProperties.getLogPageSize()];
+- // take a lock on the log page so that the page is not flushed to
+- // disk interim
++
++ // take a lock on the log page so that the page is not flushed to disk interim
+ synchronized (logPages[pageIndex]) {
+- if (lsnValue > getLastFlushedLsn().get()) { // need to check
+- // again
+- // (this
+- // thread may have got
+- // de-scheduled and must
+- // refresh!)
- }
-- MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
- } catch (Exception e) {
-- MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
- throw new AlgebricksException(e);
++ // need to check again (this thread may have got de-scheduled and must refresh!)
++ if (lsnValue > getLastFlushedLsn().get()) {
++
+ // get the log record length
+ logPages[pageIndex].getBytes(pageContent, 0, pageContent.length);
+ byte logType = pageContent[pageOffset + 4];
+@@ -765,9 +624,7 @@
+ int logRecordSize = logHeaderSize + logBodySize + logRecordHelper.getLogChecksumSize();
+ logRecord = new byte[logRecordSize];
+
+- /*
+- * copy the log record content
+- */
++ // copy the log record content
+ System.arraycopy(pageContent, pageOffset, logRecord, 0, logRecordSize);
+ MemBasedBuffer memBuffer = new MemBasedBuffer(logRecord);
+ if (logicalLogLocator == null) {
+@@ -790,9 +647,7 @@
}
-- // Following jobs are run under a separate transaction, that is committed/aborted by the JobEventListener
-- for (JobSpecification jobspec : jobsToExecute) {
-- runJob(hcc, jobspec);
-- }
}
- return executionResult;
+
+- /*
+- * the log record is residing on the disk, read it from there.
+- */
++ // the log record is residing on the disk, read it from there.
+ readDiskLog(lsnValue, logicalLogLocator);
}
-@@ -289,398 +287,802 @@
- private Dataverse handleUseDataverseStatement(AqlMetadataProvider metadataProvider, Statement stmt,
- List<JobSpecification> jobsToExecute) throws MetadataException, RemoteException, ACIDException {
-- DataverseDecl dvd = (DataverseDecl) stmt;
-- String dvName = dvd.getDataverseName().getValue();
-- Dataverse dv = MetadataManager.INSTANCE.getDataverse(metadataProvider.getMetadataTxnContext(), dvName);
-- if (dv == null) {
-- throw new MetadataException("Unknown dataverse " + dvName);
-+
-+ MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
-+ metadataProvider.setMetadataTxnContext(mdTxnCtx);
-+ acquireReadLatch();
-+
-+ try {
-+ DataverseDecl dvd = (DataverseDecl) stmt;
-+ String dvName = dvd.getDataverseName().getValue();
-+ Dataverse dv = MetadataManager.INSTANCE.getDataverse(metadataProvider.getMetadataTxnContext(), dvName);
-+ if (dv == null) {
-+ throw new MetadataException("Unknown dataverse " + dvName);
-+ }
-+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
-+ return dv;
-+ } catch (Exception e) {
-+ MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
-+ throw new MetadataException(e);
-+ } finally {
-+ releaseReadLatch();
- }
-- return dv;
+@@ -860,30 +715,40 @@
+ return logPageOwnerCount[pageIndex];
}
- private void handleCreateDataverseStatement(AqlMetadataProvider metadataProvider, Statement stmt,
- List<JobSpecification> jobsToExecute) throws MetadataException, AlgebricksException, RemoteException,
- ACIDException {
-- CreateDataverseStatement stmtCreateDataverse = (CreateDataverseStatement) stmt;
-- String dvName = stmtCreateDataverse.getDataverseName().getValue();
-- Dataverse dv = MetadataManager.INSTANCE.getDataverse(metadataProvider.getMetadataTxnContext(), dvName);
-- if (dv != null && !stmtCreateDataverse.getIfNotExists()) {
-- throw new AlgebricksException("A dataverse with this name " + dvName + " already exists.");
-+
-+ MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
-+ metadataProvider.setMetadataTxnContext(mdTxnCtx);
-+ acquireWriteLatch();
-+
-+ try {
-+ CreateDataverseStatement stmtCreateDataverse = (CreateDataverseStatement) stmt;
-+ String dvName = stmtCreateDataverse.getDataverseName().getValue();
-+ Dataverse dv = MetadataManager.INSTANCE.getDataverse(metadataProvider.getMetadataTxnContext(), dvName);
-+ if (dv != null && !stmtCreateDataverse.getIfNotExists()) {
-+ throw new AlgebricksException("A dataverse with this name " + dvName + " already exists.");
-+ }
-+ MetadataManager.INSTANCE.addDataverse(metadataProvider.getMetadataTxnContext(), new Dataverse(dvName,
-+ stmtCreateDataverse.getFormat(), IMetadataEntity.PENDING_NO_OP));
-+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
-+ } catch (Exception e) {
-+ MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
-+ throw new AlgebricksException(e);
-+ } finally {
-+ releaseWriteLatch();
- }
-- MetadataManager.INSTANCE.addDataverse(metadataProvider.getMetadataTxnContext(), new Dataverse(dvName,
-- stmtCreateDataverse.getFormat()));
+- public ICommitResolver getCommitResolver() {
+- return commitResolver;
+- }
+-
+- public CommitRequestStatistics getCommitRequestStatistics() {
+- return commitRequestStatistics;
+- }
+-
+ public IFileBasedBuffer[] getLogPages() {
+ return logPages;
}
- private void handleCreateDatasetStatement(AqlMetadataProvider metadataProvider, Statement stmt,
- IHyracksClientConnection hcc, List<JobSpecification> jobsToExecute) throws AsterixException, Exception {
-- DatasetDecl dd = (DatasetDecl) stmt;
-- String dataverseName = dd.getDataverse() != null ? dd.getDataverse().getValue()
-- : activeDefaultDataverse != null ? activeDefaultDataverse.getDataverseName() : null;
-- if (dataverseName == null) {
-- throw new AlgebricksException(" dataverse not specified ");
-- }
-- String datasetName = dd.getName().getValue();
-- DatasetType dsType = dd.getDatasetType();
-- String itemTypeName = dd.getItemTypeName().getValue();
-
-- IDatasetDetails datasetDetails = null;
-- Dataset ds = MetadataManager.INSTANCE.getDataset(metadataProvider.getMetadataTxnContext(), dataverseName,
-- datasetName);
-- if (ds != null) {
-- if (dd.getIfNotExists()) {
-- return;
-- } else {
-- throw new AlgebricksException("A dataset with this name " + datasetName + " already exists.");
-+ MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
-+ boolean bActiveTxn = true;
-+ metadataProvider.setMetadataTxnContext(mdTxnCtx);
-+ acquireWriteLatch();
+- public int getLastFlushedPage() {
+- return lastFlushedPage.get();
+- }
+-
+- public void setLastFlushedPage(int lastFlushedPage) {
+- this.lastFlushedPage.set(lastFlushedPage);
+- }
+-
+ @Override
+ public TransactionSubsystem getTransactionSubsystem() {
+ return provider;
+ }
+
-+ try {
-+ DatasetDecl dd = (DatasetDecl) stmt;
-+ String dataverseName = dd.getDataverse() != null ? dd.getDataverse().getValue()
-+ : activeDefaultDataverse != null ? activeDefaultDataverse.getDataverseName() : null;
-+ if (dataverseName == null) {
-+ throw new AlgebricksException(" dataverse not specified ");
- }
-- }
-- Datatype dt = MetadataManager.INSTANCE.getDatatype(metadataProvider.getMetadataTxnContext(), dataverseName,
-- itemTypeName);
-- if (dt == null) {
-- throw new AlgebricksException(": type " + itemTypeName + " could not be found.");
-- }
-- switch (dd.getDatasetType()) {
-- case INTERNAL: {
-- IAType itemType = dt.getDatatype();
-- if (itemType.getTypeTag() != ATypeTag.RECORD) {
-- throw new AlgebricksException("Can only partition ARecord's.");
-+ String datasetName = dd.getName().getValue();
-+ DatasetType dsType = dd.getDatasetType();
-+ String itemTypeName = dd.getItemTypeName().getValue();
++ public void decrementActiveTxnCountOnIndexes(int pageIndex) throws HyracksDataException {
++ TransactionContext ctx = null;
++ int count = 0;
++ int i = 0;
+
-+ IDatasetDetails datasetDetails = null;
-+ Dataset ds = MetadataManager.INSTANCE.getDataset(metadataProvider.getMetadataTxnContext(), dataverseName,
-+ datasetName);
-+ if (ds != null) {
-+ if (dd.getIfNotExists()) {
-+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
-+ return;
-+ } else {
-+ throw new AlgebricksException("A dataset with this name " + datasetName + " already exists.");
- }
-- List<String> partitioningExprs = ((InternalDetailsDecl) dd.getDatasetDetailsDecl())
-- .getPartitioningExprs();
-- String ngName = ((InternalDetailsDecl) dd.getDatasetDetailsDecl()).getNodegroupName().getValue();
-- datasetDetails = new InternalDatasetDetails(InternalDatasetDetails.FileStructure.BTREE,
-- InternalDatasetDetails.PartitioningStrategy.HASH, partitioningExprs, partitioningExprs, ngName);
-- break;
- }
-- case EXTERNAL: {
-- String adapter = ((ExternalDetailsDecl) dd.getDatasetDetailsDecl()).getAdapter();
-- Map<String, String> properties = ((ExternalDetailsDecl) dd.getDatasetDetailsDecl()).getProperties();
-- datasetDetails = new ExternalDatasetDetails(adapter, properties);
-- break;
-+ Datatype dt = MetadataManager.INSTANCE.getDatatype(metadataProvider.getMetadataTxnContext(), dataverseName,
-+ itemTypeName);
-+ if (dt == null) {
-+ throw new AlgebricksException(": type " + itemTypeName + " could not be found.");
- }
-- case FEED: {
-- IAType itemType = dt.getDatatype();
-- if (itemType.getTypeTag() != ATypeTag.RECORD) {
-- throw new AlgebricksException("Can only partition ARecord's.");
-+ switch (dd.getDatasetType()) {
-+ case INTERNAL: {
-+ IAType itemType = dt.getDatatype();
-+ if (itemType.getTypeTag() != ATypeTag.RECORD) {
-+ throw new AlgebricksException("Can only partition ARecord's.");
++ HashMap<TransactionContext, Integer> map = activeTxnCountMaps.get(pageIndex);
++ Set<Map.Entry<TransactionContext, Integer>> entrySet = map.entrySet();
++ if (entrySet != null) {
++ for (Map.Entry<TransactionContext, Integer> entry : entrySet) {
++ if (entry != null) {
++ if (entry.getValue() != null) {
++ count = entry.getValue();
+ }
-+ List<String> partitioningExprs = ((InternalDetailsDecl) dd.getDatasetDetailsDecl())
-+ .getPartitioningExprs();
-+ String ngName = ((InternalDetailsDecl) dd.getDatasetDetailsDecl()).getNodegroupName().getValue();
-+ datasetDetails = new InternalDatasetDetails(InternalDatasetDetails.FileStructure.BTREE,
-+ InternalDatasetDetails.PartitioningStrategy.HASH, partitioningExprs, partitioningExprs,
-+ ngName);
-+ break;
- }
-- List<String> partitioningExprs = ((FeedDetailsDecl) dd.getDatasetDetailsDecl()).getPartitioningExprs();
-- String ngName = ((FeedDetailsDecl) dd.getDatasetDetailsDecl()).getNodegroupName().getValue();
-- String adapter = ((FeedDetailsDecl) dd.getDatasetDetailsDecl()).getAdapterFactoryClassname();
-- Map<String, String> configuration = ((FeedDetailsDecl) dd.getDatasetDetailsDecl()).getConfiguration();
-- FunctionSignature signature = ((FeedDetailsDecl) dd.getDatasetDetailsDecl()).getFunctionSignature();
-- datasetDetails = new FeedDatasetDetails(InternalDatasetDetails.FileStructure.BTREE,
-- InternalDatasetDetails.PartitioningStrategy.HASH, partitioningExprs, partitioningExprs, ngName,
-- adapter, configuration, signature, FeedDatasetDetails.FeedState.INACTIVE.toString());
-- break;
-+ case EXTERNAL: {
-+ String adapter = ((ExternalDetailsDecl) dd.getDatasetDetailsDecl()).getAdapter();
-+ Map<String, String> properties = ((ExternalDetailsDecl) dd.getDatasetDetailsDecl()).getProperties();
-+ datasetDetails = new ExternalDatasetDetails(adapter, properties);
-+ break;
-+ }
-+ case FEED: {
-+ IAType itemType = dt.getDatatype();
-+ if (itemType.getTypeTag() != ATypeTag.RECORD) {
-+ throw new AlgebricksException("Can only partition ARecord's.");
++ if (count > 0) {
++ ctx = entry.getKey();
++ for (i = 0; i < count; i++) {
++ ctx.decreaseActiveTransactionCountOnIndexes();
++ }
+ }
-+ List<String> partitioningExprs = ((FeedDetailsDecl) dd.getDatasetDetailsDecl())
-+ .getPartitioningExprs();
-+ String ngName = ((FeedDetailsDecl) dd.getDatasetDetailsDecl()).getNodegroupName().getValue();
-+ String adapter = ((FeedDetailsDecl) dd.getDatasetDetailsDecl()).getAdapterFactoryClassname();
-+ Map<String, String> configuration = ((FeedDetailsDecl) dd.getDatasetDetailsDecl())
-+ .getConfiguration();
-+ FunctionSignature signature = ((FeedDetailsDecl) dd.getDatasetDetailsDecl()).getFunctionSignature();
-+ datasetDetails = new FeedDatasetDetails(InternalDatasetDetails.FileStructure.BTREE,
-+ InternalDatasetDetails.PartitioningStrategy.HASH, partitioningExprs, partitioningExprs,
-+ ngName, adapter, configuration, signature, FeedDatasetDetails.FeedState.INACTIVE.toString());
-+ break;
+ }
- }
-+
-+ //#. add a new dataset with PendingAddOp
-+ Dataset dataset = new Dataset(dataverseName, datasetName, itemTypeName, datasetDetails, dsType,
-+ DatasetIdFactory.generateDatasetId(), IMetadataEntity.PENDING_ADD_OP);
-+ MetadataManager.INSTANCE.addDataset(metadataProvider.getMetadataTxnContext(), dataset);
-+
-+ if (dd.getDatasetType() == DatasetType.INTERNAL || dd.getDatasetType() == DatasetType.FEED) {
-+ Dataverse dataverse = MetadataManager.INSTANCE.getDataverse(metadataProvider.getMetadataTxnContext(),
-+ dataverseName);
-+ JobSpecification jobSpec = DatasetOperations.createDatasetJobSpec(dataverse, datasetName,
-+ metadataProvider);
-+
-+ //#. make metadataTxn commit before calling runJob.
-+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
-+ bActiveTxn = false;
-+
-+ //#. runJob
-+ runJob(hcc, jobSpec);
-+
-+ //#. begin new metadataTxn
-+ mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
-+ bActiveTxn = true;
-+ metadataProvider.setMetadataTxnContext(mdTxnCtx);
+ }
++ }
+
-+ //#. add a new dataset with PendingNoOp after deleting the dataset with PendingAddOp
-+ MetadataManager.INSTANCE.dropDataset(metadataProvider.getMetadataTxnContext(), dataverseName, datasetName);
-+ MetadataManager.INSTANCE.addDataset(metadataProvider.getMetadataTxnContext(), new Dataset(dataverseName,
-+ datasetName, itemTypeName, datasetDetails, dsType, dataset.getDatasetId(),
-+ IMetadataEntity.PENDING_NO_OP));
-+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
-+ } catch (Exception e) {
-+ if (bActiveTxn) {
-+ MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
-+ }
-+ throw new AlgebricksException(e);
-+ } finally {
-+ releaseWriteLatch();
- }
-- MetadataManager.INSTANCE.addDataset(metadataProvider.getMetadataTxnContext(), new Dataset(dataverseName,
-- datasetName, itemTypeName, datasetDetails, dsType, DatasetIdFactory.generateDatasetId()));
-- if (dd.getDatasetType() == DatasetType.INTERNAL || dd.getDatasetType() == DatasetType.FEED) {
-- Dataverse dataverse = MetadataManager.INSTANCE.getDataverse(metadataProvider.getMetadataTxnContext(),
-- dataverseName);
-- runJob(hcc, DatasetOperations.createDatasetJobSpec(dataverse, datasetName, metadataProvider));
-- }
- }
++ map.clear();
++ }
+ }
- private void handleCreateIndexStatement(AqlMetadataProvider metadataProvider, Statement stmt,
- IHyracksClientConnection hcc, List<JobSpecification> jobsToExecute) throws Exception {
-- CreateIndexStatement stmtCreateIndex = (CreateIndexStatement) stmt;
-- String dataverseName = stmtCreateIndex.getDataverseName() == null ? activeDefaultDataverse == null ? null
-- : activeDefaultDataverse.getDataverseName() : stmtCreateIndex.getDataverseName().getValue();
-- if (dataverseName == null) {
-- throw new AlgebricksException(" dataverse not specified ");
-- }
-- String datasetName = stmtCreateIndex.getDatasetName().getValue();
-- Dataset ds = MetadataManager.INSTANCE.getDataset(metadataProvider.getMetadataTxnContext(), dataverseName,
-- datasetName);
-- if (ds == null) {
-- throw new AlgebricksException("There is no dataset with this name " + datasetName + " in dataverse "
-- + dataverseName);
-- }
-- String indexName = stmtCreateIndex.getIndexName().getValue();
-- Index idx = MetadataManager.INSTANCE.getIndex(metadataProvider.getMetadataTxnContext(), dataverseName,
-- datasetName, indexName);
-- if (idx != null) {
-- if (!stmtCreateIndex.getIfNotExists()) {
-- throw new AlgebricksException("An index with this name " + indexName + " already exists.");
-- } else {
-- stmtCreateIndex.setNeedToCreate(false);
-+
-+ MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
-+ boolean bActiveTxn = true;
-+ metadataProvider.setMetadataTxnContext(mdTxnCtx);
-+ acquireWriteLatch();
-+
-+ try {
-+ CreateIndexStatement stmtCreateIndex = (CreateIndexStatement) stmt;
-+ String dataverseName = stmtCreateIndex.getDataverseName() == null ? activeDefaultDataverse == null ? null
-+ : activeDefaultDataverse.getDataverseName() : stmtCreateIndex.getDataverseName().getValue();
-+ if (dataverseName == null) {
-+ throw new AlgebricksException(" dataverse not specified ");
- }
-- } else {
-+ String datasetName = stmtCreateIndex.getDatasetName().getValue();
-+
-+ Dataset ds = MetadataManager.INSTANCE.getDataset(metadataProvider.getMetadataTxnContext(), dataverseName,
-+ datasetName);
-+ if (ds == null) {
-+ throw new AlgebricksException("There is no dataset with this name " + datasetName + " in dataverse "
-+ + dataverseName);
-+ }
-+
-+ String indexName = stmtCreateIndex.getIndexName().getValue();
-+ Index idx = MetadataManager.INSTANCE.getIndex(metadataProvider.getMetadataTxnContext(), dataverseName,
-+ datasetName, indexName);
-+
-+ if (idx != null) {
-+ if (!stmtCreateIndex.getIfNotExists()) {
-+ throw new AlgebricksException("An index with this name " + indexName + " already exists.");
-+ } else {
-+ stmtCreateIndex.setNeedToCreate(false);
-+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
-+ return;
-+ }
-+ }
-+
-+ //#. add a new index with PendingAddOp
- Index index = new Index(dataverseName, datasetName, indexName, stmtCreateIndex.getIndexType(),
-- stmtCreateIndex.getFieldExprs(), stmtCreateIndex.getGramLength(), false);
-+ stmtCreateIndex.getFieldExprs(), stmtCreateIndex.getGramLength(), false,
-+ IMetadataEntity.PENDING_ADD_OP);
- MetadataManager.INSTANCE.addIndex(metadataProvider.getMetadataTxnContext(), index);
-- runCreateIndexJob(hcc, stmtCreateIndex, metadataProvider);
+ /*
+@@ -895,36 +760,82 @@
+ class LogPageFlushThread extends Thread {
-+ //#. create the index artifact in NC.
- CompiledCreateIndexStatement cis = new CompiledCreateIndexStatement(index.getIndexName(), dataverseName,
- index.getDatasetName(), index.getKeyFieldNames(), index.getGramLength(), index.getIndexType());
-- JobSpecification loadIndexJobSpec = IndexOperations
-- .buildSecondaryIndexLoadingJobSpec(cis, metadataProvider);
-- runJob(hcc, loadIndexJobSpec);
-+ JobSpecification spec = IndexOperations.buildSecondaryIndexCreationJobSpec(cis, metadataProvider);
-+ if (spec == null) {
-+ throw new AsterixException("Failed to create job spec for creating index '"
-+ + stmtCreateIndex.getDatasetName() + "." + stmtCreateIndex.getIndexName() + "'");
-+ }
-+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
-+ bActiveTxn = false;
-+
-+ runJob(hcc, spec);
-+
-+ mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
-+ bActiveTxn = true;
-+ metadataProvider.setMetadataTxnContext(mdTxnCtx);
-+
-+ //#. load data into the index in NC.
-+ cis = new CompiledCreateIndexStatement(index.getIndexName(), dataverseName, index.getDatasetName(),
-+ index.getKeyFieldNames(), index.getGramLength(), index.getIndexType());
-+ spec = IndexOperations.buildSecondaryIndexLoadingJobSpec(cis, metadataProvider);
-+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
-+ bActiveTxn = false;
-+
-+ runJob(hcc, spec);
-+
-+ //#. begin new metadataTxn
-+ mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
-+ bActiveTxn = true;
-+ metadataProvider.setMetadataTxnContext(mdTxnCtx);
-+
-+ //#. add another new index with PendingNoOp after deleting the index with PendingAddOp
-+ MetadataManager.INSTANCE.dropIndex(metadataProvider.getMetadataTxnContext(), dataverseName, datasetName,
-+ indexName);
-+ index = new Index(dataverseName, datasetName, indexName, stmtCreateIndex.getIndexType(),
-+ stmtCreateIndex.getFieldExprs(), stmtCreateIndex.getGramLength(), false,
-+ IMetadataEntity.PENDING_NO_OP);
-+ MetadataManager.INSTANCE.addIndex(metadataProvider.getMetadataTxnContext(), index);
-+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
-+
-+ } catch (Exception e) {
-+ if (bActiveTxn) {
-+ MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
-+ }
-+ throw new AlgebricksException(e);
-+ } finally {
-+ releaseWriteLatch();
- }
- }
+ private LogManager logManager;
++ /*
++ * pendingFlushRequests is a map with key as Integer denoting the page
++ * index. When a (transaction) thread discovers the need to flush a page, it
++ * puts its Thread object into the corresponding value that is a
++ * LinkedBlockingQueue. The LogManager has a LogFlusher thread that scans
++ * this map in order of page index (and circling around). The flusher thread
++ * needs to flush pages in order and waits for a thread to deposit an object
++ * in the blocking queue corresponding to the next page in order. A request
++ * to flush a page is conveyed to the flush thread by simply depositing an
++ * object in to corresponding blocking queue. It is blocking in the sense
++ * that the flusher thread will continue to wait for an object to arrive in
++ * the queue. The object itself is ignored by the fliusher and just acts as
++ * a signal/event that a page needs to be flushed.
++ */
++ private final LinkedBlockingQueue<Object>[] flushRequestQueue;
++ private final Object[] flushRequests;
++ private int lastFlushedPageIndex;
++ private final long groupCommitWaitPeriod;
- private void handleCreateTypeStatement(AqlMetadataProvider metadataProvider, Statement stmt,
- List<JobSpecification> jobsToExecute) throws AlgebricksException, RemoteException, ACIDException,
- MetadataException {
-- MetadataTransactionContext mdTxnCtx = metadataProvider.getMetadataTxnContext();
-- TypeDecl stmtCreateType = (TypeDecl) stmt;
-- String dataverseName = stmtCreateType.getDataverseName() == null ? activeDefaultDataverse == null ? null
-- : activeDefaultDataverse.getDataverseName() : stmtCreateType.getDataverseName().getValue();
-- if (dataverseName == null) {
-- throw new AlgebricksException(" dataverse not specified ");
-- }
-- String typeName = stmtCreateType.getIdent().getValue();
-- Dataverse dv = MetadataManager.INSTANCE.getDataverse(mdTxnCtx, dataverseName);
-- if (dv == null) {
-- throw new AlgebricksException("Unknonw dataverse " + dataverseName);
-- }
-- Datatype dt = MetadataManager.INSTANCE.getDatatype(mdTxnCtx, dataverseName, typeName);
-- if (dt != null) {
-- if (!stmtCreateType.getIfNotExists())
-- throw new AlgebricksException("A datatype with this name " + typeName + " already exists.");
-- } else {
-- if (builtinTypeMap.get(typeName) != null) {
-- throw new AlgebricksException("Cannot redefine builtin type " + typeName + ".");
-+
-+ MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
-+ metadataProvider.setMetadataTxnContext(mdTxnCtx);
-+ acquireWriteLatch();
-+
-+ try {
-+ TypeDecl stmtCreateType = (TypeDecl) stmt;
-+ String dataverseName = stmtCreateType.getDataverseName() == null ? activeDefaultDataverse == null ? null
-+ : activeDefaultDataverse.getDataverseName() : stmtCreateType.getDataverseName().getValue();
-+ if (dataverseName == null) {
-+ throw new AlgebricksException(" dataverse not specified ");
-+ }
-+ String typeName = stmtCreateType.getIdent().getValue();
-+ Dataverse dv = MetadataManager.INSTANCE.getDataverse(mdTxnCtx, dataverseName);
-+ if (dv == null) {
-+ throw new AlgebricksException("Unknonw dataverse " + dataverseName);
-+ }
-+ Datatype dt = MetadataManager.INSTANCE.getDatatype(mdTxnCtx, dataverseName, typeName);
-+ if (dt != null) {
-+ if (!stmtCreateType.getIfNotExists()) {
-+ throw new AlgebricksException("A datatype with this name " + typeName + " already exists.");
-+ }
- } else {
-- Map<TypeSignature, IAType> typeMap = TypeTranslator.computeTypes(mdTxnCtx, (TypeDecl) stmt,
-- dataverseName);
-- TypeSignature typeSignature = new TypeSignature(dataverseName, typeName);
-- IAType type = typeMap.get(typeSignature);
-- MetadataManager.INSTANCE.addDatatype(mdTxnCtx, new Datatype(dataverseName, typeName, type, false));
-+ if (builtinTypeMap.get(typeName) != null) {
-+ throw new AlgebricksException("Cannot redefine builtin type " + typeName + ".");
-+ } else {
-+ Map<TypeSignature, IAType> typeMap = TypeTranslator.computeTypes(mdTxnCtx, (TypeDecl) stmt,
-+ dataverseName);
-+ TypeSignature typeSignature = new TypeSignature(dataverseName, typeName);
-+ IAType type = typeMap.get(typeSignature);
-+ MetadataManager.INSTANCE.addDatatype(mdTxnCtx, new Datatype(dataverseName, typeName, type, false));
-+ }
- }
-+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
-+ } catch (Exception e) {
-+ MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
-+ throw new AlgebricksException(e);
-+ } finally {
-+ releaseWriteLatch();
- }
+ public LogPageFlushThread(LogManager logManager) {
+ this.logManager = logManager;
+ setName("Flusher");
++ int numLogPages = logManager.getLogManagerProperties().getNumLogPages();
++ this.flushRequestQueue = new LinkedBlockingQueue[numLogPages];
++ this.flushRequests = new Object[numLogPages];
++ for (int i = 0; i < numLogPages; i++) {
++ flushRequestQueue[i] = new LinkedBlockingQueue<Object>(1);
++ flushRequests[i] = new Object();
++ }
++ this.lastFlushedPageIndex = -1;
++ groupCommitWaitPeriod = logManager.getLogManagerProperties().getGroupCommitWaitPeriod();
}
- private void handleDataverseDropStatement(AqlMetadataProvider metadataProvider, Statement stmt,
- IHyracksClientConnection hcc, List<JobSpecification> jobsToExecute) throws Exception {
-- MetadataTransactionContext mdTxnCtx = metadataProvider.getMetadataTxnContext();
-- DataverseDropStatement stmtDelete = (DataverseDropStatement) stmt;
-- String dvName = stmtDelete.getDataverseName().getValue();
-
-- Dataverse dv = MetadataManager.INSTANCE.getDataverse(mdTxnCtx, dvName);
-- if (dv == null) {
-- if (!stmtDelete.getIfExists()) {
-- throw new AlgebricksException("There is no dataverse with this name " + dvName + ".");
-+ MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
-+ boolean bActiveTxn = true;
-+ metadataProvider.setMetadataTxnContext(mdTxnCtx);
-+ acquireWriteLatch();
-+
-+ try {
-+ DataverseDropStatement stmtDelete = (DataverseDropStatement) stmt;
-+ String dvName = stmtDelete.getDataverseName().getValue();
-+
-+ Dataverse dv = MetadataManager.INSTANCE.getDataverse(mdTxnCtx, dvName);
-+ if (dv == null) {
-+ if (!stmtDelete.getIfExists()) {
-+ throw new AlgebricksException("There is no dataverse with this name " + dvName + ".");
-+ }
-+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
++ public void requestFlush(int pageIndex, long lsn, boolean isSynchronous) {
++ synchronized (logManager.getLogPage(pageIndex)) {
++ //return if flushedLSN >= lsn
++ if (logManager.getLastFlushedLsn().get() >= lsn) {
+ return;
- }
-- } else {
-+
-+ //#. prepare jobs which will drop corresponding datasets with indexes.
- List<Dataset> datasets = MetadataManager.INSTANCE.getDataverseDatasets(mdTxnCtx, dvName);
- for (int j = 0; j < datasets.size(); j++) {
- String datasetName = datasets.get(j).getDatasetName();
- DatasetType dsType = datasets.get(j).getDatasetType();
- if (dsType == DatasetType.INTERNAL || dsType == DatasetType.FEED) {
-+
- List<Index> indexes = MetadataManager.INSTANCE.getDatasetIndexes(mdTxnCtx, dvName, datasetName);
- for (int k = 0; k < indexes.size(); k++) {
- if (indexes.get(k).isSecondaryIndex()) {
-- compileIndexDropStatement(hcc, dvName, datasetName, indexes.get(k).getIndexName(),
-- metadataProvider);
-+ CompiledIndexDropStatement cds = new CompiledIndexDropStatement(dvName, datasetName,
-+ indexes.get(k).getIndexName());
-+ jobsToExecute.add(IndexOperations.buildDropSecondaryIndexJobSpec(cds, metadataProvider));
- }
- }
-+
-+ CompiledDatasetDropStatement cds = new CompiledDatasetDropStatement(dvName, datasetName);
-+ jobsToExecute.add(DatasetOperations.createDropDatasetJobSpec(cds, metadataProvider));
- }
-- compileDatasetDropStatement(hcc, dvName, datasetName, metadataProvider);
- }
-
-+ //#. mark PendingDropOp on the dataverse record by
-+ // first, deleting the dataverse record from the DATAVERSE_DATASET
-+ // second, inserting the dataverse record with the PendingDropOp value into the DATAVERSE_DATASET
- MetadataManager.INSTANCE.dropDataverse(mdTxnCtx, dvName);
-+ MetadataManager.INSTANCE.addDataverse(mdTxnCtx, new Dataverse(dvName, dv.getDataFormat(),
-+ IMetadataEntity.PENDING_DROP_OP));
-+
-+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
-+ bActiveTxn = false;
-+
-+ for (JobSpecification jobSpec : jobsToExecute) {
-+ runJob(hcc, jobSpec);
+ }
+
-+ mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
-+ bActiveTxn = true;
-+ metadataProvider.setMetadataTxnContext(mdTxnCtx);
++ //put a new request to the queue only if the request on the page is not in the queue.
++ flushRequestQueue[pageIndex].offer(flushRequests[pageIndex]);
+
-+ //#. finally, delete the dataverse.
-+ MetadataManager.INSTANCE.dropDataverse(mdTxnCtx, dvName);
- if (activeDefaultDataverse != null && activeDefaultDataverse.getDataverseName() == dvName) {
- activeDefaultDataverse = null;
- }
-+
-+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
-+ } catch (Exception e) {
-+ if (bActiveTxn) {
-+ MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
++ //return if the request is asynchronous
++ if (!isSynchronous) {
++ return;
+ }
-+ throw new AlgebricksException(e);
-+ } finally {
-+ releaseWriteLatch();
- }
- }
-
- private void handleDatasetDropStatement(AqlMetadataProvider metadataProvider, Statement stmt,
- IHyracksClientConnection hcc, List<JobSpecification> jobsToExecute) throws Exception {
-- MetadataTransactionContext mdTxnCtx = metadataProvider.getMetadataTxnContext();
-- DropStatement stmtDelete = (DropStatement) stmt;
-- String dataverseName = stmtDelete.getDataverseName() == null ? activeDefaultDataverse == null ? null
-- : activeDefaultDataverse.getDataverseName() : stmtDelete.getDataverseName().getValue();
-- if (dataverseName == null) {
-- throw new AlgebricksException(" dataverse not specified ");
-- }
-- String datasetName = stmtDelete.getDatasetName().getValue();
-- Dataset ds = MetadataManager.INSTANCE.getDataset(mdTxnCtx, dataverseName, datasetName);
-- if (ds == null) {
-- if (!stmtDelete.getIfExists())
-- throw new AlgebricksException("There is no dataset with this name " + datasetName + " in dataverse "
-- + dataverseName + ".");
-- } else {
+
-+ MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
-+ boolean bActiveTxn = true;
-+ metadataProvider.setMetadataTxnContext(mdTxnCtx);
-+ acquireWriteLatch();
-+
-+ try {
-+ DropStatement stmtDelete = (DropStatement) stmt;
-+ String dataverseName = stmtDelete.getDataverseName() == null ? activeDefaultDataverse == null ? null
-+ : activeDefaultDataverse.getDataverseName() : stmtDelete.getDataverseName().getValue();
-+ if (dataverseName == null) {
-+ throw new AlgebricksException(" dataverse not specified ");
-+ }
-+ String datasetName = stmtDelete.getDatasetName().getValue();
-+
-+ Dataset ds = MetadataManager.INSTANCE.getDataset(mdTxnCtx, dataverseName, datasetName);
-+ if (ds == null) {
-+ if (!stmtDelete.getIfExists()) {
-+ throw new AlgebricksException("There is no dataset with this name " + datasetName
-+ + " in dataverse " + dataverseName + ".");
++ //wait until there is flush.
++ boolean isNotified = false;
++ while (!isNotified) {
++ try {
++ logManager.getLogPage(pageIndex).wait();
++ isNotified = true;
++ } catch (InterruptedException e) {
++ e.printStackTrace();
+ }
-+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
-+ return;
+ }
++ }
++ }
+
- if (ds.getDatasetType() == DatasetType.INTERNAL || ds.getDatasetType() == DatasetType.FEED) {
-+
-+ //#. prepare jobs to drop the datatset and the indexes in NC
- List<Index> indexes = MetadataManager.INSTANCE.getDatasetIndexes(mdTxnCtx, dataverseName, datasetName);
- for (int j = 0; j < indexes.size(); j++) {
-- if (indexes.get(j).isPrimaryIndex()) {
-- compileIndexDropStatement(hcc, dataverseName, datasetName, indexes.get(j).getIndexName(),
-- metadataProvider);
-+ if (indexes.get(j).isSecondaryIndex()) {
-+ CompiledIndexDropStatement cds = new CompiledIndexDropStatement(dataverseName, datasetName,
-+ indexes.get(j).getIndexName());
-+ jobsToExecute.add(IndexOperations.buildDropSecondaryIndexJobSpec(cds, metadataProvider));
+ @Override
+ public void run() {
+ while (true) {
+ try {
+- int pageToFlush = logManager.getNextPageInSequence(logManager.getLastFlushedPage());
++ int pageToFlush = logManager.getNextPageInSequence(lastFlushedPageIndex);
+
+- /*
+- * A wait call on the linkedBLockingQueue. The flusher thread is
+- * notified when an object is added to the queue. Please note
+- * that each page has an associated blocking queue.
+- */
+- logManager.getPendingFlushRequests(pageToFlush).take();
++ // A wait call on the linkedBLockingQueue. The flusher thread is
++ // notified when an object is added to the queue. Please note
++ // that each page has an associated blocking queue.
++ flushRequestQueue[pageToFlush].take();
+
+- /*
+- * The LogFlusher was waiting for a page to be marked as a
+- * candidate for flushing. Now that has happened. The thread
+- * shall proceed to take a lock on the log page
+- */
+- synchronized (logManager.getLogPages()[pageToFlush]) {
++ synchronized (logManager.getLogPage(pageToFlush)) {
+
+- /*
+- * lock the internal state of the log manager and create a
+- * log file if necessary.
+- */
++ // lock the internal state of the log manager and create a
++ // log file if necessary.
+ int prevLogFileId = logManager.getLogFileId(logManager.getLastFlushedLsn().get());
+ int nextLogFileId = logManager.getLogFileId(logManager.getLastFlushedLsn().get()
+ + logManager.getLogManagerProperties().getLogPageSize());
+@@ -936,198 +847,60 @@
+ logManager.getLogManagerProperties().getLogPageSize());
}
- }
-+ CompiledDatasetDropStatement cds = new CompiledDatasetDropStatement(dataverseName, datasetName);
-+ jobsToExecute.add(DatasetOperations.createDropDatasetJobSpec(cds, metadataProvider));
-+
-+ //#. mark the existing dataset as PendingDropOp
-+ MetadataManager.INSTANCE.dropDataset(mdTxnCtx, dataverseName, datasetName);
-+ MetadataManager.INSTANCE.addDataset(
-+ mdTxnCtx,
-+ new Dataset(dataverseName, datasetName, ds.getItemTypeName(), ds.getDatasetDetails(), ds
-+ .getDatasetType(), ds.getDatasetId(), IMetadataEntity.PENDING_DROP_OP));
-+
-+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
-+ bActiveTxn = false;
-+
-+ //#. run the jobs
-+ for (JobSpecification jobSpec : jobsToExecute) {
-+ runJob(hcc, jobSpec);
-+ }
-+
-+ mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
-+ bActiveTxn = true;
-+ metadataProvider.setMetadataTxnContext(mdTxnCtx);
- }
-- compileDatasetDropStatement(hcc, dataverseName, datasetName, metadataProvider);
-+
-+ //#. finally, delete the dataset.
-+ MetadataManager.INSTANCE.dropDataset(mdTxnCtx, dataverseName, datasetName);
-+
-+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
-+ } catch (Exception e) {
-+ if (bActiveTxn) {
-+ MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
-+ }
-+ throw new AlgebricksException(e);
-+ } finally {
-+ releaseWriteLatch();
- }
- }
- private void handleIndexDropStatement(AqlMetadataProvider metadataProvider, Statement stmt,
- IHyracksClientConnection hcc, List<JobSpecification> jobsToExecute) throws Exception {
-- MetadataTransactionContext mdTxnCtx = metadataProvider.getMetadataTxnContext();
-- IndexDropStatement stmtIndexDrop = (IndexDropStatement) stmt;
-- String datasetName = stmtIndexDrop.getDatasetName().getValue();
-- String dataverseName = stmtIndexDrop.getDataverseName() == null ? activeDefaultDataverse == null ? null
-- : activeDefaultDataverse.getDataverseName() : stmtIndexDrop.getDataverseName().getValue();
-- if (dataverseName == null) {
-- throw new AlgebricksException(" dataverse not specified ");
-+
-+ MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
-+ boolean bActiveTxn = true;
-+ metadataProvider.setMetadataTxnContext(mdTxnCtx);
-+ acquireWriteLatch();
-+
-+ try {
-+ IndexDropStatement stmtIndexDrop = (IndexDropStatement) stmt;
-+ String datasetName = stmtIndexDrop.getDatasetName().getValue();
-+ String dataverseName = stmtIndexDrop.getDataverseName() == null ? activeDefaultDataverse == null ? null
-+ : activeDefaultDataverse.getDataverseName() : stmtIndexDrop.getDataverseName().getValue();
-+ if (dataverseName == null) {
-+ throw new AlgebricksException(" dataverse not specified ");
-+ }
-+
-+ Dataset ds = MetadataManager.INSTANCE.getDataset(mdTxnCtx, dataverseName, datasetName);
-+ if (ds == null) {
-+ throw new AlgebricksException("There is no dataset with this name " + datasetName + " in dataverse "
-+ + dataverseName);
-+ }
-+
-+ if (ds.getDatasetType() == DatasetType.INTERNAL || ds.getDatasetType() == DatasetType.FEED) {
-+ String indexName = stmtIndexDrop.getIndexName().getValue();
-+ Index index = MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataverseName, datasetName, indexName);
-+ if (index == null) {
-+ if (!stmtIndexDrop.getIfExists()) {
-+ throw new AlgebricksException("There is no index with this name " + indexName + ".");
-+ }
-+ } else {
-+ //#. prepare a job to drop the index in NC.
-+ CompiledIndexDropStatement cds = new CompiledIndexDropStatement(dataverseName, datasetName,
-+ indexName);
-+ jobsToExecute.add(IndexOperations.buildDropSecondaryIndexJobSpec(cds, metadataProvider));
-+
-+ //#. mark PendingDropOp on the existing index
-+ MetadataManager.INSTANCE.dropIndex(mdTxnCtx, dataverseName, datasetName, indexName);
-+ MetadataManager.INSTANCE.addIndex(
-+ mdTxnCtx,
-+ new Index(dataverseName, datasetName, indexName, index.getIndexType(), index
-+ .getKeyFieldNames(), index.isPrimaryIndex(), IMetadataEntity.PENDING_DROP_OP));
-+
-+ //#. commit the existing transaction before calling runJob.
-+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
-+ bActiveTxn = false;
-+
-+ for (JobSpecification jobSpec : jobsToExecute) {
-+ runJob(hcc, jobSpec);
-+ }
-+
-+ //#. begin a new transaction
-+ mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
-+ bActiveTxn = true;
-+ metadataProvider.setMetadataTxnContext(mdTxnCtx);
-+
-+ //#. finally, delete the existing index
-+ MetadataManager.INSTANCE.dropIndex(mdTxnCtx, dataverseName, datasetName, indexName);
-+ }
-+ } else {
-+ throw new AlgebricksException(datasetName
-+ + " is an external dataset. Indexes are not maintained for external datasets.");
-+ }
-+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
-+
-+ } catch (Exception e) {
-+ if (bActiveTxn) {
-+ MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
-+ }
-+ throw new AlgebricksException(e);
-+
-+ } finally {
-+ releaseWriteLatch();
- }
-- Dataset ds = MetadataManager.INSTANCE.getDataset(mdTxnCtx, dataverseName, datasetName);
-- if (ds == null)
-- throw new AlgebricksException("There is no dataset with this name " + datasetName + " in dataverse "
-- + dataverseName);
-- if (ds.getDatasetType() == DatasetType.INTERNAL || ds.getDatasetType() == DatasetType.FEED) {
-- String indexName = stmtIndexDrop.getIndexName().getValue();
-- Index idx = MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataverseName, datasetName, indexName);
-- if (idx == null) {
-- if (!stmtIndexDrop.getIfExists())
-- throw new AlgebricksException("There is no index with this name " + indexName + ".");
-- } else
-- compileIndexDropStatement(hcc, dataverseName, datasetName, indexName, metadataProvider);
-- } else {
-- throw new AlgebricksException(datasetName
-- + " is an external dataset. Indexes are not maintained for external datasets.");
-- }
- }
+- logManager.getLogPage(pageToFlush).flush(); // put the
+- // content to
+- // disk, the
+- // thread still
+- // has a lock on
+- // the log page
++ //#. sleep during the groupCommitWaitTime
++ sleep(groupCommitWaitPeriod);
- private void handleTypeDropStatement(AqlMetadataProvider metadataProvider, Statement stmt,
- List<JobSpecification> jobsToExecute) throws AlgebricksException, MetadataException, RemoteException,
- ACIDException {
-- MetadataTransactionContext mdTxnCtx = metadataProvider.getMetadataTxnContext();
-- TypeDropStatement stmtTypeDrop = (TypeDropStatement) stmt;
-- String dataverseName = stmtTypeDrop.getDataverseName() == null ? (activeDefaultDataverse == null ? null
-- : activeDefaultDataverse.getDataverseName()) : stmtTypeDrop.getDataverseName().getValue();
-- if (dataverseName == null) {
-- throw new AlgebricksException(" dataverse not specified ");
-+
-+ MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
-+ metadataProvider.setMetadataTxnContext(mdTxnCtx);
-+ acquireWriteLatch();
-+
-+ try {
-+ TypeDropStatement stmtTypeDrop = (TypeDropStatement) stmt;
-+ String dataverseName = stmtTypeDrop.getDataverseName() == null ? (activeDefaultDataverse == null ? null
-+ : activeDefaultDataverse.getDataverseName()) : stmtTypeDrop.getDataverseName().getValue();
-+ if (dataverseName == null) {
-+ throw new AlgebricksException(" dataverse not specified ");
-+ }
-+ String typeName = stmtTypeDrop.getTypeName().getValue();
-+ Datatype dt = MetadataManager.INSTANCE.getDatatype(mdTxnCtx, dataverseName, typeName);
-+ if (dt == null) {
-+ if (!stmtTypeDrop.getIfExists())
-+ throw new AlgebricksException("There is no datatype with this name " + typeName + ".");
-+ } else {
-+ MetadataManager.INSTANCE.dropDatatype(mdTxnCtx, dataverseName, typeName);
-+ }
-+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
-+ } catch (Exception e) {
-+ MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
-+ throw new AlgebricksException(e);
-+ } finally {
-+ releaseWriteLatch();
- }
-- String typeName = stmtTypeDrop.getTypeName().getValue();
-- Datatype dt = MetadataManager.INSTANCE.getDatatype(mdTxnCtx, dataverseName, typeName);
-- if (dt == null) {
-- if (!stmtTypeDrop.getIfExists())
-- throw new AlgebricksException("There is no datatype with this name " + typeName + ".");
-- } else {
-- MetadataManager.INSTANCE.dropDatatype(mdTxnCtx, dataverseName, typeName);
-- }
- }
+- /*
+- * acquire lock on the log manager as we need to update the
+- * internal bookkeeping data.
+- */
++ //#. set the logPageStatus to INACTIVE in order to prevent other txns from writing on this page.
++ logManager.getLogPageStatus(pageToFlush).set(PageState.INACTIVE);
- private void handleNodegroupDropStatement(AqlMetadataProvider metadataProvider, Statement stmt,
- List<JobSpecification> jobsToExecute) throws MetadataException, AlgebricksException, RemoteException,
- ACIDException {
-- MetadataTransactionContext mdTxnCtx = metadataProvider.getMetadataTxnContext();
-- NodeGroupDropStatement stmtDelete = (NodeGroupDropStatement) stmt;
-- String nodegroupName = stmtDelete.getNodeGroupName().getValue();
-- NodeGroup ng = MetadataManager.INSTANCE.getNodegroup(mdTxnCtx, nodegroupName);
-- if (ng == null) {
-- if (!stmtDelete.getIfExists())
-- throw new AlgebricksException("There is no nodegroup with this name " + nodegroupName + ".");
-- } else {
-- MetadataManager.INSTANCE.dropNodegroup(mdTxnCtx, nodegroupName);
-+
-+ MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
-+ metadataProvider.setMetadataTxnContext(mdTxnCtx);
-+ acquireWriteLatch();
-+
-+ try {
-+ NodeGroupDropStatement stmtDelete = (NodeGroupDropStatement) stmt;
-+ String nodegroupName = stmtDelete.getNodeGroupName().getValue();
-+ NodeGroup ng = MetadataManager.INSTANCE.getNodegroup(mdTxnCtx, nodegroupName);
-+ if (ng == null) {
-+ if (!stmtDelete.getIfExists())
-+ throw new AlgebricksException("There is no nodegroup with this name " + nodegroupName + ".");
-+ } else {
-+ MetadataManager.INSTANCE.dropNodegroup(mdTxnCtx, nodegroupName);
-+ }
-+
-+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
-+ } catch (Exception e) {
-+ MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
-+ throw new AlgebricksException(e);
-+ } finally {
-+ releaseWriteLatch();
- }
- }
+- // increment the last flushed lsn.
+- long lastFlushedLsn = logManager.incrementLastFlushedLsn(logManager.getLogManagerProperties()
+- .getLogPageSize());
++ //#. need to wait until the logPageOwnerCount reaches 1 (LOG_WRITER)
++ // meaning every one has finished writing logs on this page.
++ while (logManager.getLogPageOwnershipCount(pageToFlush).get() != PageOwnershipStatus.LOG_WRITER) {
++ sleep(0);
++ }
- private void handleCreateFunctionStatement(AqlMetadataProvider metadataProvider, Statement stmt,
- List<JobSpecification> jobsToExecute) throws AlgebricksException, MetadataException, RemoteException,
- ACIDException {
-- MetadataTransactionContext mdTxnCtx = metadataProvider.getMetadataTxnContext();
-- CreateFunctionStatement cfs = (CreateFunctionStatement) stmt;
-- String dataverse = cfs.getSignature().getNamespace() == null ? activeDefaultDataverse == null ? null
-- : activeDefaultDataverse.getDataverseName() : cfs.getSignature().getNamespace();
-- if (dataverse == null) {
-- throw new AlgebricksException(" dataverse not specified ");
-+ MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
-+ metadataProvider.setMetadataTxnContext(mdTxnCtx);
-+ acquireWriteLatch();
-+
-+ try {
-+ CreateFunctionStatement cfs = (CreateFunctionStatement) stmt;
-+ String dataverse = cfs.getSignature().getNamespace() == null ? activeDefaultDataverse == null ? null
-+ : activeDefaultDataverse.getDataverseName() : cfs.getSignature().getNamespace();
-+ if (dataverse == null) {
-+ throw new AlgebricksException(" dataverse not specified ");
-+ }
-+ Dataverse dv = MetadataManager.INSTANCE.getDataverse(mdTxnCtx, dataverse);
-+ if (dv == null) {
-+ throw new AlgebricksException("There is no dataverse with this name " + dataverse + ".");
-+ }
-+ Function function = new Function(dataverse, cfs.getaAterixFunction().getName(), cfs.getaAterixFunction()
-+ .getArity(), cfs.getParamList(), Function.RETURNTYPE_VOID, cfs.getFunctionBody(),
-+ Function.LANGUAGE_AQL, FunctionKind.SCALAR.toString());
-+ MetadataManager.INSTANCE.addFunction(mdTxnCtx, function);
-+
-+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
-+ } catch (Exception e) {
-+ MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
-+ throw new AlgebricksException(e);
-+ } finally {
-+ releaseWriteLatch();
- }
-- Dataverse dv = MetadataManager.INSTANCE.getDataverse(mdTxnCtx, dataverse);
-- if (dv == null) {
-- throw new AlgebricksException("There is no dataverse with this name " + dataverse + ".");
-- }
-- Function function = new Function(dataverse, cfs.getaAterixFunction().getName(), cfs.getaAterixFunction()
-- .getArity(), cfs.getParamList(), Function.RETURNTYPE_VOID, cfs.getFunctionBody(),
-- Function.LANGUAGE_AQL, FunctionKind.SCALAR.toString());
-- MetadataManager.INSTANCE.addFunction(mdTxnCtx, function);
- }
+- /*
+- * the log manager gains back ownership of the page. this is
+- * reflected by incrementing the owner count of the page.
+- * recall that when the page is begin flushed the owner
+- * count is actually 0 Value of zero implicitly indicates
+- * that the page is operated upon by the log flusher thread.
+- */
+- logManager.getLogPageOwnershipCount(pageToFlush).incrementAndGet();
++ //#. set the logPageOwnerCount to 0 (LOG_FLUSHER)
++ // meaning it is flushing.
++ logManager.getLogPageOwnershipCount(pageToFlush).set(PageOwnershipStatus.LOG_FLUSHER);
- private void handleFunctionDropStatement(AqlMetadataProvider metadataProvider, Statement stmt,
- List<JobSpecification> jobsToExecute) throws MetadataException, RemoteException, ACIDException,
- AlgebricksException {
-- MetadataTransactionContext mdTxnCtx = metadataProvider.getMetadataTxnContext();
-- FunctionDropStatement stmtDropFunction = (FunctionDropStatement) stmt;
-- FunctionSignature signature = stmtDropFunction.getFunctionSignature();
-- Function function = MetadataManager.INSTANCE.getFunction(mdTxnCtx, signature);
-- if (function == null) {
-- if (!stmtDropFunction.getIfExists())
-- throw new AlgebricksException("Unknonw function " + signature);
-- } else {
-- MetadataManager.INSTANCE.dropFunction(mdTxnCtx, signature);
-+ MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
-+ metadataProvider.setMetadataTxnContext(mdTxnCtx);
-+ acquireWriteLatch();
-+
-+ try {
-+ FunctionDropStatement stmtDropFunction = (FunctionDropStatement) stmt;
-+ FunctionSignature signature = stmtDropFunction.getFunctionSignature();
-+ Function function = MetadataManager.INSTANCE.getFunction(mdTxnCtx, signature);
-+ if (function == null) {
-+ if (!stmtDropFunction.getIfExists())
-+ throw new AlgebricksException("Unknonw function " + signature);
-+ } else {
-+ MetadataManager.INSTANCE.dropFunction(mdTxnCtx, signature);
-+ }
-+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
-+ } catch (Exception e) {
-+ MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
-+ throw new AlgebricksException(e);
-+ } finally {
-+ releaseWriteLatch();
- }
- }
+- /*
+- * get the number of log buffers that have been written so
+- * far. A log buffer = number of log pages * size of a log
+- * page
+- */
+- int numCycles = (int) lastFlushedLsn / logManager.getLogManagerProperties().getLogBufferSize();
+- if (lastFlushedLsn % logManager.getLogManagerProperties().getLogBufferSize() == 0) {
+- numCycles--;
+- }
++ // put the content to disk (the thread still has a lock on the log page)
++ logManager.getLogPage(pageToFlush).flush();
- private void handleLoadFromFileStatement(AqlMetadataProvider metadataProvider, Statement stmt,
- IHyracksClientConnection hcc, List<JobSpecification> jobsToExecute) throws Exception {
-- MetadataTransactionContext mdTxnCtx = metadataProvider.getMetadataTxnContext();
-- LoadFromFileStatement loadStmt = (LoadFromFileStatement) stmt;
-- String dataverseName = loadStmt.getDataverseName() == null ? activeDefaultDataverse == null ? null
-- : activeDefaultDataverse.getDataverseName() : loadStmt.getDataverseName().getValue();
-- CompiledLoadFromFileStatement cls = new CompiledLoadFromFileStatement(dataverseName, loadStmt.getDatasetName()
-- .getValue(), loadStmt.getAdapter(), loadStmt.getProperties(), loadStmt.dataIsAlreadySorted());
+- /*
+- * Map the log page to a new region in the log file.
+- */
++ // increment the last flushed lsn and lastFlushedPage
++ logManager.incrementLastFlushedLsn(logManager.getLogManagerProperties().getLogPageSize());
++ lastFlushedPageIndex = pageToFlush;
-- IDataFormat format = getDataFormat(metadataProvider.getMetadataTxnContext(), dataverseName);
-- Job job = DatasetOperations.createLoadDatasetJobSpec(metadataProvider, cls, format);
-- jobsToExecute.add(job.getJobSpec());
-- // Also load the dataset's secondary indexes.
-- List<Index> datasetIndexes = MetadataManager.INSTANCE.getDatasetIndexes(mdTxnCtx, dataverseName, loadStmt
-- .getDatasetName().getValue());
-- for (Index index : datasetIndexes) {
-- if (!index.isSecondaryIndex()) {
-- continue;
-+ MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
-+ boolean bActiveTxn = true;
-+ metadataProvider.setMetadataTxnContext(mdTxnCtx);
-+ acquireReadLatch();
++ // decrement activeTxnCountOnIndexes
++ logManager.decrementActiveTxnCountOnIndexes(pageToFlush);
+
-+ try {
-+ LoadFromFileStatement loadStmt = (LoadFromFileStatement) stmt;
-+ String dataverseName = loadStmt.getDataverseName() == null ? activeDefaultDataverse == null ? null
-+ : activeDefaultDataverse.getDataverseName() : loadStmt.getDataverseName().getValue();
-+ CompiledLoadFromFileStatement cls = new CompiledLoadFromFileStatement(dataverseName, loadStmt
-+ .getDatasetName().getValue(), loadStmt.getAdapter(), loadStmt.getProperties(),
-+ loadStmt.dataIsAlreadySorted());
++ // reset the count to 1
++ logManager.getLogPageOwnershipCount(pageToFlush).set(PageOwnershipStatus.LOG_WRITER);
+
-+ IDataFormat format = getDataFormat(metadataProvider.getMetadataTxnContext(), dataverseName);
-+ Job job = DatasetOperations.createLoadDatasetJobSpec(metadataProvider, cls, format);
-+ jobsToExecute.add(job.getJobSpec());
-+ // Also load the dataset's secondary indexes.
-+ List<Index> datasetIndexes = MetadataManager.INSTANCE.getDatasetIndexes(mdTxnCtx, dataverseName, loadStmt
-+ .getDatasetName().getValue());
-+ for (Index index : datasetIndexes) {
-+ if (!index.isSecondaryIndex()) {
-+ continue;
-+ }
-+ // Create CompiledCreateIndexStatement from metadata entity 'index'.
-+ CompiledCreateIndexStatement cis = new CompiledCreateIndexStatement(index.getIndexName(),
-+ dataverseName, index.getDatasetName(), index.getKeyFieldNames(), index.getGramLength(),
-+ index.getIndexType());
-+ jobsToExecute.add(IndexOperations.buildSecondaryIndexLoadingJobSpec(cis, metadataProvider));
- }
-- // Create CompiledCreateIndexStatement from metadata entity 'index'.
-- CompiledCreateIndexStatement cis = new CompiledCreateIndexStatement(index.getIndexName(), dataverseName,
-- index.getDatasetName(), index.getKeyFieldNames(), index.getGramLength(), index.getIndexType());
-- jobsToExecute.add(IndexOperations.buildSecondaryIndexLoadingJobSpec(cis, metadataProvider));
-+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
-+ bActiveTxn = false;
-+
-+ for (JobSpecification jobspec : jobsToExecute) {
-+ runJob(hcc, jobspec);
-+ }
-+ } catch (Exception e) {
-+ if (bActiveTxn) {
-+ MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
-+ }
-+ throw new AlgebricksException(e);
-+ } finally {
-+ releaseReadLatch();
- }
- }
++ // Map the log page to a new region in the log file.
+ long nextWritePosition = logManager.getLogPages()[pageToFlush].getNextWritePosition()
+ + logManager.getLogManagerProperties().getLogBufferSize();
- private void handleWriteFromQueryResultStatement(AqlMetadataProvider metadataProvider, Statement stmt,
- IHyracksClientConnection hcc, List<JobSpecification> jobsToExecute) throws Exception {
-- metadataProvider.setWriteTransaction(true);
-- WriteFromQueryResultStatement st1 = (WriteFromQueryResultStatement) stmt;
-- String dataverseName = st1.getDataverseName() == null ? activeDefaultDataverse == null ? null
-- : activeDefaultDataverse.getDataverseName() : st1.getDataverseName().getValue();
-- CompiledWriteFromQueryResultStatement clfrqs = new CompiledWriteFromQueryResultStatement(dataverseName, st1
-- .getDatasetName().getValue(), st1.getQuery(), st1.getVarCounter());
+- /*
+- * long nextPos = (numCycles + 1)
+- * logManager.getLogManagerProperties() .getLogBufferSize()
+- * + pageToFlush logManager.getLogManagerProperties()
+- * .getLogPageSize();
+- */
+ logManager.resetLogPage(nextWritePosition, pageToFlush);
-- Pair<JobSpecification, FileSplit> compiled = rewriteCompileQuery(metadataProvider, clfrqs.getQuery(), clfrqs);
-- if (compiled.first != null) {
-- jobsToExecute.add(compiled.first);
-+ MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
-+ boolean bActiveTxn = true;
-+ metadataProvider.setMetadataTxnContext(mdTxnCtx);
-+ acquireReadLatch();
-+
-+ try {
-+ metadataProvider.setWriteTransaction(true);
-+ WriteFromQueryResultStatement st1 = (WriteFromQueryResultStatement) stmt;
-+ String dataverseName = st1.getDataverseName() == null ? activeDefaultDataverse == null ? null
-+ : activeDefaultDataverse.getDataverseName() : st1.getDataverseName().getValue();
-+ CompiledWriteFromQueryResultStatement clfrqs = new CompiledWriteFromQueryResultStatement(dataverseName, st1
-+ .getDatasetName().getValue(), st1.getQuery(), st1.getVarCounter());
-+
-+ Pair<JobSpecification, FileSplit> compiled = rewriteCompileQuery(metadataProvider, clfrqs.getQuery(),
-+ clfrqs);
-+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
-+ bActiveTxn = false;
-+ if (compiled.first != null) {
-+ runJob(hcc, compiled.first);
-+ }
-+ } catch (Exception e) {
-+ if (bActiveTxn) {
-+ MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
-+ }
-+ throw new AlgebricksException(e);
-+ } finally {
-+ releaseReadLatch();
- }
- }
+ // mark the page as ACTIVE
+ logManager.getLogPageStatus(pageToFlush).set(LogManager.PageState.ACTIVE);
- private void handleInsertStatement(AqlMetadataProvider metadataProvider, Statement stmt,
- IHyracksClientConnection hcc, List<JobSpecification> jobsToExecute) throws Exception {
-- metadataProvider.setWriteTransaction(true);
-- InsertStatement stmtInsert = (InsertStatement) stmt;
-- String dataverseName = stmtInsert.getDataverseName() == null ? activeDefaultDataverse == null ? null
-- : activeDefaultDataverse.getDataverseName() : stmtInsert.getDataverseName().getValue();
-- CompiledInsertStatement clfrqs = new CompiledInsertStatement(dataverseName, stmtInsert.getDatasetName()
-- .getValue(), stmtInsert.getQuery(), stmtInsert.getVarCounter());
-- Pair<JobSpecification, FileSplit> compiled = rewriteCompileQuery(metadataProvider, clfrqs.getQuery(), clfrqs);
-- if (compiled.first != null) {
-- jobsToExecute.add(compiled.first);
-+
-+ MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
-+ boolean bActiveTxn = true;
-+ metadataProvider.setMetadataTxnContext(mdTxnCtx);
-+ acquireReadLatch();
-+
-+ try {
-+ metadataProvider.setWriteTransaction(true);
-+ InsertStatement stmtInsert = (InsertStatement) stmt;
-+ String dataverseName = stmtInsert.getDataverseName() == null ? activeDefaultDataverse == null ? null
-+ : activeDefaultDataverse.getDataverseName() : stmtInsert.getDataverseName().getValue();
-+ CompiledInsertStatement clfrqs = new CompiledInsertStatement(dataverseName, stmtInsert.getDatasetName()
-+ .getValue(), stmtInsert.getQuery(), stmtInsert.getVarCounter());
-+ Pair<JobSpecification, FileSplit> compiled = rewriteCompileQuery(metadataProvider, clfrqs.getQuery(),
-+ clfrqs);
-+
-+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
-+ bActiveTxn = false;
-+
-+ if (compiled.first != null) {
-+ runJob(hcc, compiled.first);
-+ }
-+
-+ } catch (Exception e) {
-+ if (bActiveTxn) {
-+ MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
-+ }
-+ throw new AlgebricksException(e);
-+ } finally {
-+ releaseReadLatch();
- }
- }
+- // notify all waiting (transaction) threads.
+- // Transaction thread may be waiting for the page to be
+- // available or may have a commit log record on the page
+- // that got flushed.
+- logManager.getLogPages()[pageToFlush].notifyAll();
+- logManager.setLastFlushedPage(pageToFlush);
++ //#. checks the queue whether there is another flush request on the same log buffer
++ // If there is another request, then simply remove it.
++ if (flushRequestQueue[pageToFlush].peek() != null) {
++ flushRequestQueue[pageToFlush].take();
++ }
- private void handleDeleteStatement(AqlMetadataProvider metadataProvider, Statement stmt,
- IHyracksClientConnection hcc, List<JobSpecification> jobsToExecute) throws Exception {
-- metadataProvider.setWriteTransaction(true);
-- DeleteStatement stmtDelete = (DeleteStatement) stmt;
-- String dataverseName = stmtDelete.getDataverseName() == null ? activeDefaultDataverse == null ? null
-- : activeDefaultDataverse.getDataverseName() : stmtDelete.getDataverseName().getValue();
-- CompiledDeleteStatement clfrqs = new CompiledDeleteStatement(stmtDelete.getVariableExpr(), dataverseName,
-- stmtDelete.getDatasetName().getValue(), stmtDelete.getCondition(), stmtDelete.getDieClause(),
-- stmtDelete.getVarCounter(), metadataProvider);
-- Pair<JobSpecification, FileSplit> compiled = rewriteCompileQuery(metadataProvider, clfrqs.getQuery(), clfrqs);
-- if (compiled.first != null) {
-- jobsToExecute.add(compiled.first);
-+
-+ MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
-+ boolean bActiveTxn = true;
-+ metadataProvider.setMetadataTxnContext(mdTxnCtx);
-+ acquireReadLatch();
-+
-+ try {
-+ metadataProvider.setWriteTransaction(true);
-+ DeleteStatement stmtDelete = (DeleteStatement) stmt;
-+ String dataverseName = stmtDelete.getDataverseName() == null ? activeDefaultDataverse == null ? null
-+ : activeDefaultDataverse.getDataverseName() : stmtDelete.getDataverseName().getValue();
-+ CompiledDeleteStatement clfrqs = new CompiledDeleteStatement(stmtDelete.getVariableExpr(), dataverseName,
-+ stmtDelete.getDatasetName().getValue(), stmtDelete.getCondition(), stmtDelete.getDieClause(),
-+ stmtDelete.getVarCounter(), metadataProvider);
-+ Pair<JobSpecification, FileSplit> compiled = rewriteCompileQuery(metadataProvider, clfrqs.getQuery(),
-+ clfrqs);
-+
-+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
-+ bActiveTxn = false;
-+
-+ if (compiled.first != null) {
-+ runJob(hcc, compiled.first);
-+ }
-+
-+ } catch (Exception e) {
-+ if (bActiveTxn) {
-+ MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
-+ }
-+ throw new AlgebricksException(e);
-+ } finally {
-+ releaseReadLatch();
++ // notify all waiting (transaction) threads.
++ logManager.getLogPage(pageToFlush).notifyAll();
+ }
+ } catch (IOException ioe) {
+ ioe.printStackTrace();
+ throw new Error(" exception in flushing log page", ioe);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+- break; // must break from the loop as the exception indicates
+- // some thing horrendous has happened elsewhere
++ break;
+ }
}
}
+-}
+-
+-/*
+- * TODO: By default the commit policy is to commit at each request and not have
+- * a group commit. The following code needs to change to support group commit.
+- * The code for group commit has not been tested thoroughly and is under
+- * development.
+- */
+-class BasicCommitResolver implements ICommitResolver {
+-
+- public boolean shouldCommitPage(int pageIndex, LogManager logManager,
+- CommitRequestStatistics commitRequestStatistics) {
+- return true;
+- }
+-
+- public void init(LogManager logManager) {
+- }
+-}
+-
+-class GroupCommitResolver implements ICommitResolver {
+-
+- public boolean shouldCommitPage(int pageIndex, LogManager logManager,
+- CommitRequestStatistics commitRequestStatistics) {
+- long maxCommitWait = logManager.getLogManagerProperties().getGroupCommitWaitPeriod();
+- long timestamp = commitRequestStatistics.getPageLevelLastCommitRequestTimestamp(pageIndex);
+- if (timestamp == -1) {
+- if (maxCommitWait == 0) {
+- return true;
+- } else {
+- timestamp = System.currentTimeMillis();
+- }
+- }
+- long currenTime = System.currentTimeMillis();
+- if (currenTime - timestamp > maxCommitWait) {
+- return true;
+- }
+- return false;
+- }
+-
+- public void init(LogManager logManager) {
+- GroupCommitHandlerThread groupCommitHandler = new GroupCommitHandlerThread(logManager);
+- groupCommitHandler.setDaemon(true);
+- groupCommitHandler.start();
+- }
+-
+- class GroupCommitHandlerThread extends Thread {
+-
+- private LogManager logManager;
+-
+- public GroupCommitHandlerThread(LogManager logManager) {
+- this.logManager = logManager;
+- setName("Group Commit Handler");
+- }
+-
+- @Override
+- public void run() {
+- int pageIndex = -1;
+- while (true) {
+- pageIndex = logManager.getNextPageInSequence(pageIndex);
+- long lastCommitRequeestTimestamp = logManager.getCommitRequestStatistics()
+- .getPageLevelLastCommitRequestTimestamp(pageIndex);
+- if (lastCommitRequeestTimestamp != -1
+- && System.currentTimeMillis() - lastCommitRequeestTimestamp > logManager
+- .getLogManagerProperties().getGroupCommitWaitPeriod()) {
+- int dirtyCount = logManager.getLogPageOwnershipCount(pageIndex).decrementAndGet();
+- if (dirtyCount == 0) {
+- try {
+- logManager.getLogPageStatus(pageIndex).set(LogManager.PageState.INACTIVE);
+- logManager.getPendingFlushRequests(pageIndex).put(Thread.currentThread());
+- } catch (InterruptedException e) {
+- e.printStackTrace();
+- break;
+- }
+- logManager.getCommitRequestStatistics().committedPage(pageIndex);
+- }
+- }
+- }
+- }
+- }
+-
+-}
+-
+-interface ICommitResolver {
+- public boolean shouldCommitPage(int pageIndex, LogManager logManager,
+- CommitRequestStatistics commitRequestStatistics);
+-
+- public void init(LogManager logManager);
+-}
+-
+-/**
+- * Represents a collection of all commit requests by transactions for each log
+- * page. The requests are accumulated until the commit policy triggers a flush
+- * of the corresponding log page. Upon a flush of a page, all commit requests
+- * for the page are cleared.
+- */
+-class CommitRequestStatistics {
+-
+- AtomicInteger[] pageLevelCommitRequestCount;
+- AtomicLong[] pageLevelLastCommitRequestTimestamp;
+-
+- public CommitRequestStatistics(int numPages) {
+- pageLevelCommitRequestCount = new AtomicInteger[numPages];
+- pageLevelLastCommitRequestTimestamp = new AtomicLong[numPages];
+- for (int i = 0; i < numPages; i++) {
+- pageLevelCommitRequestCount[i] = new AtomicInteger(0);
+- pageLevelLastCommitRequestTimestamp[i] = new AtomicLong(-1L);
+- }
+- }
+-
+- public void registerCommitRequest(int pageIndex) {
+- pageLevelCommitRequestCount[pageIndex].incrementAndGet();
+- pageLevelLastCommitRequestTimestamp[pageIndex].set(System.currentTimeMillis());
+- }
+-
+- public long getPageLevelLastCommitRequestTimestamp(int pageIndex) {
+- return pageLevelLastCommitRequestTimestamp[pageIndex].get();
+- }
+-
+- public void committedPage(int pageIndex) {
+- pageLevelCommitRequestCount[pageIndex].set(0);
+- pageLevelLastCommitRequestTimestamp[pageIndex].set(-1L);
+- }
+-
+-}
++}
+\ No newline at end of file
+Index: asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogRecordHelper.java
+===================================================================
+--- asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogRecordHelper.java (revision 1194)
++++ asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogRecordHelper.java (working copy)
+@@ -152,6 +152,9 @@
+ case LogType.UPDATE:
+ logTypeDisplay = "UPDATE";
+ break;
++ case LogType.ENTITY_COMMIT:
++ logTypeDisplay = "ENTITY_COMMIT";
++ break;
+ }
+ builder.append(" LSN : ").append(logicalLogLocator.getLsn());
+ builder.append(" Log Type : ").append(logTypeDisplay);
+Index: asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogType.java
+===================================================================
+--- asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogType.java (revision 1194)
++++ asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogType.java (working copy)
+@@ -18,5 +18,6 @@
-@@ -704,46 +1106,109 @@
+ public static final byte UPDATE = 0;
+ public static final byte COMMIT = 1;
++ public static final byte ENTITY_COMMIT = 2;
- private void handleBeginFeedStatement(AqlMetadataProvider metadataProvider, Statement stmt,
- IHyracksClientConnection hcc, List<JobSpecification> jobsToExecute) throws Exception {
-- BeginFeedStatement bfs = (BeginFeedStatement) stmt;
-- String dataverseName = bfs.getDataverseName() == null ? activeDefaultDataverse == null ? null
-- : activeDefaultDataverse.getDataverseName() : bfs.getDatasetName().getValue();
+ }
+Index: asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogManagerProperties.java
+===================================================================
+--- asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogManagerProperties.java (revision 1194)
++++ asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogManagerProperties.java (working copy)
+@@ -1,5 +1,5 @@
+ /*
+- * Copyright 2009-2010 by The Regents of the University of California
++ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+@@ -41,7 +41,7 @@
+ private int logPageSize = 128 * 1024; // 128 KB
+ private int numLogPages = 8; // number of log pages in the log buffer.
-- CompiledBeginFeedStatement cbfs = new CompiledBeginFeedStatement(dataverseName,
-- bfs.getDatasetName().getValue(), bfs.getQuery(), bfs.getVarCounter());
-+ MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
-+ boolean bActiveTxn = true;
-+ metadataProvider.setMetadataTxnContext(mdTxnCtx);
-+ acquireReadLatch();
+- private long groupCommitWaitPeriod = 0; // time in milliseconds for which a
++ private long groupCommitWaitPeriod = 1; // time in milliseconds for which a
+ // commit record will wait before
+ // the housing page is marked for
+ // flushing.
+Index: asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/recovery/RecoveryManager.java
+===================================================================
+--- asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/recovery/RecoveryManager.java (revision 1194)
++++ asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/recovery/RecoveryManager.java (working copy)
+@@ -184,6 +184,7 @@
+ break;
-- Dataset dataset;
-- dataset = MetadataManager.INSTANCE.getDataset(metadataProvider.getMetadataTxnContext(), dataverseName, bfs
-- .getDatasetName().getValue());
-- IDatasetDetails datasetDetails = dataset.getDatasetDetails();
-- if (datasetDetails.getDatasetType() != DatasetType.FEED) {
-- throw new IllegalArgumentException("Dataset " + bfs.getDatasetName().getValue() + " is not a feed dataset");
-+ try {
-+ BeginFeedStatement bfs = (BeginFeedStatement) stmt;
-+ String dataverseName = bfs.getDataverseName() == null ? activeDefaultDataverse == null ? null
-+ : activeDefaultDataverse.getDataverseName() : bfs.getDatasetName().getValue();
-+
-+ CompiledBeginFeedStatement cbfs = new CompiledBeginFeedStatement(dataverseName, bfs.getDatasetName()
-+ .getValue(), bfs.getQuery(), bfs.getVarCounter());
-+
-+ Dataset dataset;
-+ dataset = MetadataManager.INSTANCE.getDataset(metadataProvider.getMetadataTxnContext(), dataverseName, bfs
-+ .getDatasetName().getValue());
-+ IDatasetDetails datasetDetails = dataset.getDatasetDetails();
-+ if (datasetDetails.getDatasetType() != DatasetType.FEED) {
-+ throw new IllegalArgumentException("Dataset " + bfs.getDatasetName().getValue()
-+ + " is not a feed dataset");
-+ }
-+ bfs.initialize(metadataProvider.getMetadataTxnContext(), dataset);
-+ cbfs.setQuery(bfs.getQuery());
-+ Pair<JobSpecification, FileSplit> compiled = rewriteCompileQuery(metadataProvider, bfs.getQuery(), cbfs);
-+
-+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
-+ bActiveTxn = false;
-+
-+ if (compiled.first != null) {
-+ runJob(hcc, compiled.first);
-+ }
-+
-+ } catch (Exception e) {
-+ if (bActiveTxn) {
-+ MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
-+ }
-+ throw new AlgebricksException(e);
-+ } finally {
-+ releaseReadLatch();
- }
-- bfs.initialize(metadataProvider.getMetadataTxnContext(), dataset);
-- cbfs.setQuery(bfs.getQuery());
-- Pair<JobSpecification, FileSplit> compiled = rewriteCompileQuery(metadataProvider, bfs.getQuery(), cbfs);
-- if (compiled.first != null) {
-- jobsToExecute.add(compiled.first);
-- }
- }
+ case LogType.COMMIT:
++ case LogType.ENTITY_COMMIT:
+ tempKeyTxnId.setTxnId(logRecordHelper.getJobId(currentLogLocator),
+ logRecordHelper.getDatasetId(currentLogLocator),
+ logRecordHelper.getPKHashValue(currentLogLocator));
+@@ -218,6 +219,7 @@
+ IIndex index = null;
+ LocalResource localResource = null;
+ ILocalResourceMetadata localResourceMetadata = null;
++ List<Long> resourceIdList = new ArrayList<Long>();
- private void handleControlFeedStatement(AqlMetadataProvider metadataProvider, Statement stmt,
- IHyracksClientConnection hcc, List<JobSpecification> jobsToExecute) throws Exception {
-- ControlFeedStatement cfs = (ControlFeedStatement) stmt;
-- String dataverseName = cfs.getDataverseName() == null ? activeDefaultDataverse == null ? null
-- : activeDefaultDataverse.getDataverseName() : cfs.getDatasetName().getValue();
-- CompiledControlFeedStatement clcfs = new CompiledControlFeedStatement(cfs.getOperationType(), dataverseName,
-- cfs.getDatasetName().getValue(), cfs.getAlterAdapterConfParams());
-- jobsToExecute.add(FeedOperations.buildControlFeedJobSpec(clcfs, metadataProvider));
-+
-+ MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
-+ boolean bActiveTxn = true;
-+ metadataProvider.setMetadataTxnContext(mdTxnCtx);
-+ acquireReadLatch();
-+
-+ try {
-+ ControlFeedStatement cfs = (ControlFeedStatement) stmt;
-+ String dataverseName = cfs.getDataverseName() == null ? activeDefaultDataverse == null ? null
-+ : activeDefaultDataverse.getDataverseName() : cfs.getDatasetName().getValue();
-+ CompiledControlFeedStatement clcfs = new CompiledControlFeedStatement(cfs.getOperationType(),
-+ dataverseName, cfs.getDatasetName().getValue(), cfs.getAlterAdapterConfParams());
-+ JobSpecification jobSpec = FeedOperations.buildControlFeedJobSpec(clcfs, metadataProvider);
-+
-+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
-+ bActiveTxn = false;
-+
-+ runJob(hcc, jobSpec);
-+
-+ } catch (Exception e) {
-+ if (bActiveTxn) {
-+ MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
-+ }
-+ throw new AlgebricksException(e);
-+ } finally {
-+ releaseReadLatch();
-+ }
- }
+ //#. get indexLifeCycleManager
+ IAsterixAppRuntimeContextProvider appRuntimeContext = txnSubsystem.getAsterixAppRuntimeContextProvider();
+@@ -272,6 +274,8 @@
+ index = localResourceMetadata.createIndexInstance(appRuntimeContext,
+ localResource.getResourceName(), localResource.getPartition());
+ indexLifecycleManager.register(resourceId, index);
++ indexLifecycleManager.open(resourceId);
++ resourceIdList.add(resourceId);
+ }
- private QueryResult handleQuery(AqlMetadataProvider metadataProvider, Query query, IHyracksClientConnection hcc,
- List<JobSpecification> jobsToExecute) throws Exception {
-- Pair<JobSpecification, FileSplit> compiled = rewriteCompileQuery(metadataProvider, query, null);
-- if (compiled.first != null) {
-- GlobalConfig.ASTERIX_LOGGER.info(compiled.first.toJSON().toString(1));
-- jobsToExecute.add(compiled.first);
-+
-+ MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
-+ boolean bActiveTxn = true;
-+ metadataProvider.setMetadataTxnContext(mdTxnCtx);
-+ acquireReadLatch();
-+
-+ try {
-+ Pair<JobSpecification, FileSplit> compiled = rewriteCompileQuery(metadataProvider, query, null);
-+
-+ QueryResult queryResult = new QueryResult(query, compiled.second.getLocalFile().getFile().getAbsolutePath());
-+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
-+ bActiveTxn = false;
-+
-+ if (compiled.first != null) {
-+ GlobalConfig.ASTERIX_LOGGER.info(compiled.first.toJSON().toString(1));
-+ runJob(hcc, compiled.first);
-+ }
-+
-+ return queryResult;
-+ } catch (Exception e) {
-+ if (bActiveTxn) {
-+ MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
-+ }
-+ throw new AlgebricksException(e);
-+ } finally {
-+ releaseReadLatch();
- }
-- return new QueryResult(query, compiled.second.getLocalFile().getFile().getAbsolutePath());
- }
+ /***************************************************/
+@@ -300,6 +304,7 @@
+ break;
- private void runCreateIndexJob(IHyracksClientConnection hcc, CreateIndexStatement stmtCreateIndex,
-@@ -768,20 +1233,32 @@
- private void handleCreateNodeGroupStatement(AqlMetadataProvider metadataProvider, Statement stmt,
- List<JobSpecification> jobsToExecute) throws MetadataException, AlgebricksException, RemoteException,
- ACIDException {
-- MetadataTransactionContext mdTxnCtx = metadataProvider.getMetadataTxnContext();
-- NodegroupDecl stmtCreateNodegroup = (NodegroupDecl) stmt;
-- String ngName = stmtCreateNodegroup.getNodegroupName().getValue();
-- NodeGroup ng = MetadataManager.INSTANCE.getNodegroup(mdTxnCtx, ngName);
-- if (ng != null) {
-- if (!stmtCreateNodegroup.getIfNotExists())
-- throw new AlgebricksException("A nodegroup with this name " + ngName + " already exists.");
-- } else {
-- List<Identifier> ncIdentifiers = stmtCreateNodegroup.getNodeControllerNames();
-- List<String> ncNames = new ArrayList<String>(ncIdentifiers.size());
-- for (Identifier id : ncIdentifiers) {
-- ncNames.add(id.getValue());
-+
-+ MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
-+ metadataProvider.setMetadataTxnContext(mdTxnCtx);
-+ acquireWriteLatch();
-+
-+ try {
-+ NodegroupDecl stmtCreateNodegroup = (NodegroupDecl) stmt;
-+ String ngName = stmtCreateNodegroup.getNodegroupName().getValue();
-+ NodeGroup ng = MetadataManager.INSTANCE.getNodegroup(mdTxnCtx, ngName);
-+ if (ng != null) {
-+ if (!stmtCreateNodegroup.getIfNotExists())
-+ throw new AlgebricksException("A nodegroup with this name " + ngName + " already exists.");
-+ } else {
-+ List<Identifier> ncIdentifiers = stmtCreateNodegroup.getNodeControllerNames();
-+ List<String> ncNames = new ArrayList<String>(ncIdentifiers.size());
-+ for (Identifier id : ncIdentifiers) {
-+ ncNames.add(id.getValue());
-+ }
-+ MetadataManager.INSTANCE.addNodegroup(mdTxnCtx, new NodeGroup(ngName, ncNames));
+ case LogType.COMMIT:
++ case LogType.ENTITY_COMMIT:
+ //do nothing
+ break;
+
+@@ -308,6 +313,11 @@
}
-- MetadataManager.INSTANCE.addNodegroup(mdTxnCtx, new NodeGroup(ngName, ncNames));
-+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
-+ } catch (Exception e) {
-+ MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
-+ throw new AlgebricksException(e);
-+ } finally {
-+ releaseWriteLatch();
}
- }
-
-@@ -791,10 +1268,37 @@
-
- private void compileIndexDropStatement(IHyracksClientConnection hcc, String dataverseName, String datasetName,
- String indexName, AqlMetadataProvider metadataProvider) throws Exception {
-+ MetadataTransactionContext mdTxnCtx = metadataProvider.getMetadataTxnContext();
-+ Index index = MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataverseName, datasetName, indexName);
-+
-+ //#. mark PendingDropOp on the existing index
-+ MetadataManager.INSTANCE.dropIndex(mdTxnCtx, dataverseName, datasetName, indexName);
-+ MetadataManager.INSTANCE.addIndex(
-+ mdTxnCtx,
-+ new Index(dataverseName, datasetName, indexName, index.getIndexType(), index.getKeyFieldNames(), index
-+ .isPrimaryIndex(), IMetadataEntity.PENDING_DROP_OP));
- CompiledIndexDropStatement cds = new CompiledIndexDropStatement(dataverseName, datasetName, indexName);
-- runJob(hcc, IndexOperations.buildDropSecondaryIndexJobSpec(cds, metadataProvider));
-- MetadataManager.INSTANCE.dropIndex(metadataProvider.getMetadataTxnContext(), dataverseName, datasetName,
-- indexName);
-+ JobSpecification jobSpec = IndexOperations.buildDropSecondaryIndexJobSpec(cds, metadataProvider);
-+
-+ //#. commit the existing transaction before calling runJob.
-+ // the caller should begin the transaction before calling this function.
-+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
-+
-+ try {
-+ runJob(hcc, jobSpec);
-+ } catch (Exception e) {
-+ //need to create the mdTxnCtx to be aborted by caller properly
-+ mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
-+ metadataProvider.setMetadataTxnContext(mdTxnCtx);
-+ throw e;
+
++ //close all indexes
++ for (long r : resourceIdList) {
++ indexLifecycleManager.close(r);
+ }
-+
-+ //#. begin a new transaction
-+ mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
-+ metadataProvider.setMetadataTxnContext(mdTxnCtx);
-+
-+ //#. finally, delete the existing index
-+ MetadataManager.INSTANCE.dropIndex(mdTxnCtx, dataverseName, datasetName, indexName);
++
+ JobIdFactory.initJobId(maxJobId);
}
- private void compileDatasetDropStatement(IHyracksClientConnection hcc, String dataverseName, String datasetName,
-@@ -803,10 +1307,32 @@
- CompiledDatasetDropStatement cds = new CompiledDatasetDropStatement(dataverseName, datasetName);
- Dataset ds = MetadataManager.INSTANCE.getDataset(mdTxnCtx, dataverseName, datasetName);
- if (ds.getDatasetType() == DatasetType.INTERNAL || ds.getDatasetType() == DatasetType.FEED) {
-- JobSpecification[] jobSpecs = DatasetOperations.createDropDatasetJobSpec(cds, metadataProvider);
-- for (JobSpecification spec : jobSpecs)
-- runJob(hcc, spec);
-+ JobSpecification jobSpec = DatasetOperations.createDropDatasetJobSpec(cds, metadataProvider);
-+
-+ //#. mark PendingDropOp on the existing dataset
-+ MetadataManager.INSTANCE.dropDataset(mdTxnCtx, dataverseName, datasetName);
-+ MetadataManager.INSTANCE.addDataset(mdTxnCtx, new Dataset(dataverseName, datasetName, ds.getItemTypeName(),
-+ ds.getDatasetDetails(), ds.getDatasetType(), ds.getDatasetId(), IMetadataEntity.PENDING_DROP_OP));
-+
-+ //#. commit the transaction
-+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
-+
-+ //#. run the job
-+ try {
-+ runJob(hcc, jobSpec);
-+ } catch (Exception e) {
-+ //need to create the mdTxnCtx to be aborted by caller properly
-+ mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
-+ metadataProvider.setMetadataTxnContext(mdTxnCtx);
-+ throw e;
+@@ -539,6 +549,7 @@
+ break;
+
+ case LogType.COMMIT:
++ case LogType.ENTITY_COMMIT:
+ undoLSNSet = loserTxnTable.get(tempKeyTxnId);
+ if (undoLSNSet != null) {
+ loserTxnTable.remove(tempKeyTxnId);
+Index: asterix-app/src/test/java/edu/uci/ics/asterix/test/metadata/MetadataTest.java
+===================================================================
+--- asterix-app/src/test/java/edu/uci/ics/asterix/test/metadata/MetadataTest.java (revision 1194)
++++ asterix-app/src/test/java/edu/uci/ics/asterix/test/metadata/MetadataTest.java (working copy)
+@@ -42,6 +42,16 @@
+ List<CompilationUnit> cUnits = tcCtx.getTestCase().getCompilationUnit();
+ for (CompilationUnit cUnit : cUnits) {
+ File testFile = tcCtx.getTestFile(cUnit);
++
++ /*****************
++ if (!testFile.getAbsolutePath().contains("meta09.aql")) {
++ System.out.println(testFile.getAbsolutePath());
++ continue;
+ }
-+
-+ //#. start a new transaction
-+ mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
-+ metadataProvider.setMetadataTxnContext(mdTxnCtx);
- }
-+
-+ //#. finally, delete the existing dataset.
- MetadataManager.INSTANCE.dropDataset(mdTxnCtx, dataverseName, datasetName);
- }
++ System.out.println(testFile.getAbsolutePath());
++ *****************/
++
++
+ File expectedResultFile = tcCtx.getExpectedResultFile(cUnit);
+ File actualFile = new File(PATH_ACTUAL + File.separator
+ + tcCtx.getTestCase().getFilePath().replace(File.separator, "_") + "_" + cUnit.getName() + ".adm");
+Index: asterix-app/src/test/java/edu/uci/ics/asterix/test/runtime/ExecutionTest.java
+===================================================================
+--- asterix-app/src/test/java/edu/uci/ics/asterix/test/runtime/ExecutionTest.java (revision 1194)
++++ asterix-app/src/test/java/edu/uci/ics/asterix/test/runtime/ExecutionTest.java (working copy)
+@@ -95,9 +95,10 @@
+ File testFile = tcCtx.getTestFile(cUnit);
-@@ -831,4 +1357,20 @@
- }
- return format;
- }
-+
-+ private void acquireWriteLatch() {
-+ cacheLatch.writeLock().lock();
-+ }
-+
-+ private void releaseWriteLatch() {
-+ cacheLatch.writeLock().unlock();
-+ }
-+
-+ private void acquireReadLatch() {
-+ cacheLatch.readLock().lock();
-+ }
-+
-+ private void releaseReadLatch() {
-+ cacheLatch.readLock().unlock();
-+ }
- }
+ /*************** to avoid run failure cases ****************
+- if (!testFile.getAbsolutePath().contains("index-selection/")) {
++ if (!testFile.getAbsolutePath().contains("query-issue205.aql")) {
+ continue;
+ }
++ System.out.println(testFile.getAbsolutePath());
+ ************************************************************/
+
+ File expectedResultFile = tcCtx.getExpectedResultFile(cUnit);