blob: e0e67149c8d811a6a6331ed68ea4a57f48412246 [file] [log] [blame]
kisskysdf137512013-02-20 07:12:33 +00001Index: asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionManager.java
kisskysc98f0fc2013-01-26 09:14:15 +00002===================================================================
kisskysdf137512013-02-20 07:12:33 +00003--- asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionManager.java (revision 1194)
4+++ asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionManager.java (working copy)
5@@ -103,12 +103,14 @@
6 //for entity-level commit
7 if (PKHashVal != -1) {
8 transactionProvider.getLockManager().unlock(datasetId, PKHashVal, txnContext, true);
9+ /*****************************
10 try {
11 //decrease the transaction reference count on index
12 txnContext.decreaseActiveTransactionCountOnIndexes();
13 } catch (HyracksDataException e) {
14 throw new ACIDException("failed to complete index operation", e);
kisskysc98f0fc2013-01-26 09:14:15 +000015 }
kisskysdf137512013-02-20 07:12:33 +000016+ *****************************/
17 return;
18 }
kisskysc98f0fc2013-01-26 09:14:15 +000019
kisskysdf137512013-02-20 07:12:33 +000020Index: asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionContext.java
kisskysc98f0fc2013-01-26 09:14:15 +000021===================================================================
kisskysdf137512013-02-20 07:12:33 +000022--- asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionContext.java (revision 1194)
23+++ asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionContext.java (working copy)
kisskysc98f0fc2013-01-26 09:14:15 +000024@@ -19,6 +19,7 @@
kisskysdf137512013-02-20 07:12:33 +000025 import java.util.HashSet;
26 import java.util.List;
27 import java.util.Set;
28+import java.util.concurrent.atomic.AtomicInteger;
kisskysc98f0fc2013-01-26 09:14:15 +000029
kisskysdf137512013-02-20 07:12:33 +000030 import edu.uci.ics.asterix.transaction.management.exception.ACIDException;
31 import edu.uci.ics.asterix.transaction.management.opcallbacks.AbstractOperationCallback;
32@@ -169,5 +170,14 @@
33 closeable.close(this);
kisskysc98f0fc2013-01-26 09:14:15 +000034 }
kisskysc98f0fc2013-01-26 09:14:15 +000035 }
kisskysdf137512013-02-20 07:12:33 +000036+
37+ @Override
38+ public int hashCode() {
39+ return jobId.getId();
40+ }
41
42+ @Override
43+ public boolean equals(Object o) {
44+ return (o == this);
45+ }
46 }
47Index: asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/LockManager.java
48===================================================================
49--- asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/LockManager.java (revision 1194)
50+++ asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/LockManager.java (working copy)
51@@ -567,7 +567,7 @@
52 if (commitFlag) {
53 if (txnContext.getTransactionType().equals(TransactionContext.TransactionType.READ_WRITE)) {
54 try {
55- txnSubsystem.getLogManager().log(LogType.COMMIT, txnContext, datasetId.getId(),
56+ txnSubsystem.getLogManager().log(LogType.ENTITY_COMMIT, txnContext, datasetId.getId(),
57 entityHashValue, -1, (byte) 0, 0, null, null, logicalLogLocator);
58 } catch (ACIDException e) {
59 try {
60Index: asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/FileBasedBuffer.java
61===================================================================
62--- asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/FileBasedBuffer.java (revision 1194)
63+++ asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/FileBasedBuffer.java (working copy)
64@@ -75,6 +75,7 @@
65 buffer.position(0);
66 buffer.limit(size);
67 fileChannel.write(buffer);
68+ fileChannel.force(false);
69 erase();
70 }
71
72Index: asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogManager.java
73===================================================================
74--- asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogManager.java (revision 1194)
75+++ asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogManager.java (working copy)
76@@ -1,5 +1,5 @@
77 /*
78- * Copyright 2009-2010 by The Regents of the University of California
79+ * Copyright 2009-2012 by The Regents of the University of California
80 * Licensed under the Apache License, Version 2.0 (the "License");
81 * you may not use this file except in compliance with the License.
82 * you may obtain a copy of the License from
83@@ -21,7 +21,12 @@
84 import java.io.RandomAccessFile;
85 import java.nio.ByteBuffer;
86 import java.nio.channels.FileChannel;
87+import java.util.ArrayList;
88+import java.util.HashMap;
89+import java.util.List;
90+import java.util.Map;
91 import java.util.Properties;
92+import java.util.Set;
93 import java.util.concurrent.LinkedBlockingQueue;
94 import java.util.concurrent.atomic.AtomicInteger;
95 import java.util.concurrent.atomic.AtomicLong;
96@@ -30,22 +35,25 @@
97
98 import edu.uci.ics.asterix.transaction.management.exception.ACIDException;
99 import edu.uci.ics.asterix.transaction.management.service.logging.IndexLogger.ReusableLogContentObject;
100+import edu.uci.ics.asterix.transaction.management.service.logging.LogManager.PageOwnershipStatus;
101+import edu.uci.ics.asterix.transaction.management.service.logging.LogManager.PageState;
102 import edu.uci.ics.asterix.transaction.management.service.transaction.TransactionContext;
103 import edu.uci.ics.asterix.transaction.management.service.transaction.TransactionManagementConstants;
104 import edu.uci.ics.asterix.transaction.management.service.transaction.TransactionSubsystem;
105+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
106
107 public class LogManager implements ILogManager {
108
109 public static final boolean IS_DEBUG_MODE = false;//true
110 private static final Logger LOGGER = Logger.getLogger(LogManager.class.getName());
111- private TransactionSubsystem provider;
112+ private final TransactionSubsystem provider;
113 private LogManagerProperties logManagerProperties;
114+ private LogPageFlushThread logPageFlusher;
115
116 /*
117 * the array of log pages. The number of log pages is configurable. Pages
118 * taken together form an in-memory log buffer.
119 */
120-
121 private IFileBasedBuffer[] logPages;
122
123 private ILogRecordHelper logRecordHelper;
124@@ -54,6 +62,7 @@
125 * Number of log pages that constitute the in-memory log buffer.
126 */
127 private int numLogPages;
kisskysc98f0fc2013-01-26 09:14:15 +0000128+
kisskysdf137512013-02-20 07:12:33 +0000129 /*
130 * Initially all pages have an owner count of 1 that is the LogManager. When
131 * a transaction requests to write in a log page, the owner count is
132@@ -62,12 +71,11 @@
133 * (covering the whole log record). When the content has been put, the log
134 * manager computes the checksum and puts it after the content. At this
135 * point, the ownership count is decremented as the transaction is done with
136- * using the page. When a page is full, the log manager decrements the count
137- * by one indicating that it has released its ownership of the log page.
138- * There could be other transaction(s) still owning the page (that is they
139- * could still be mid-way putting the log content). When the ownership count
140- * eventually reaches zero, the thread responsible for flushing the log page
141- * is notified and the page is flushed to disk.
142+ * using the page. When a page is requested to be flushed, logPageFlusher
143+ * set the count to 0(LOG_FLUSHER: meaning that the page is being flushed)
144+ * only if the count is 1(LOG_WRITER: meaning that there is no other
145+ * transactions who own the page to write logs.) After flushing the page,
146+ * logPageFlusher set this count to 1.
147 */
148 private AtomicInteger[] logPageOwnerCount;
kisskysc98f0fc2013-01-26 09:14:15 +0000149
kisskysdf137512013-02-20 07:12:33 +0000150@@ -78,18 +86,16 @@
151
152 /*
153 * LogPageStatus: A page is either ACTIVE or INACTIVE. The status for each
154- * page is maintained in a map called logPageStatus. A page is ACTIVE when
155- * the LogManager can allocate space in the page for writing a log record.
156- * Initially all pages are ACTIVE. As transactions fill up space by writing
157- * log records, a page may not have sufficient space left for serving a
158- * request by a transaction. When this happens, the page is marked INACTIVE.
159- * An INACTIVE page with no owners ( logPageOwnerCount.get(<pageIndex>) ==
160- * 0) indicates that the page must be flushed to disk before any other log
161- * record is written on the page.F
162+ * page is maintained in logPageStatus. A page is ACTIVE when the LogManager
163+ * can allocate space in the page for writing a log record. Initially all
164+ * pages are ACTIVE. As transactions fill up space by writing log records,
165+ * a page may not have sufficient space left for serving a request by a
166+ * transaction. When this happens, the page is flushed to disk by calling
167+ * logPageFlusher.requestFlush(). In the requestFlush(), after groupCommitWaitTime,
168+ * the page status is set to INACTIVE. Then, there is no more writer on the
169+ * page(meaning the corresponding logPageOwnerCount is 1), the page is flushed
170+ * by the logPageFlusher and the status is reset to ACTIVE by the logPageFlusher.
171 */
172-
173- // private Map<Integer, Integer> logPageStatus = new
174- // ConcurrentHashMap<Integer, Integer>();
175 private AtomicInteger[] logPageStatus;
176
177 static class PageState {
178@@ -98,41 +104,8 @@
kisskysc98f0fc2013-01-26 09:14:15 +0000179 }
180
kisskysdf137512013-02-20 07:12:33 +0000181 private AtomicLong lastFlushedLsn = new AtomicLong(-1);
182- private AtomicInteger lastFlushedPage = new AtomicInteger(-1);
kisskysc98f0fc2013-01-26 09:14:15 +0000183
kisskysdf137512013-02-20 07:12:33 +0000184 /*
185- * pendingFlushRequests is a map with key as Integer denoting the page
186- * index. When a (transaction) thread discovers the need to flush a page, it
187- * puts its Thread object into the corresponding value that is a
188- * LinkedBlockingQueue. The LogManager has a LogFlusher thread that scans
189- * this map in order of page index (and circling around). The flusher thread
190- * needs to flush pages in order and waits for a thread to deposit an object
191- * in the blocking queue corresponding to the next page in order. A request
192- * to flush a page is conveyed to the flush thread by simply depositing an
193- * object in to corresponding blocking queue. It is blocking in the sense
194- * that the flusher thread will continue to wait for an object to arrive in
195- * the queue. The object itself is ignored by the fliusher and just acts as
196- * a signal/event that a page needs to be flushed.
197- */
198-
199- private LinkedBlockingQueue[] pendingFlushRequests;
200-
201- /*
202- * ICommitResolver is an interface that provides an API that can answer a
203- * simple boolean - Given the commit requests so far, should a page be
204- * flushed. The implementation of the interface contains the logic (or you
205- * can say the policy) for commit. It could be group commit in which case
206- * the commit resolver may not return a true indicating that it wishes to
207- * delay flushing of the page.
208- */
209- private ICommitResolver commitResolver;
210-
211- /*
212- * An object that keeps track of the submitted commit requests.
213- */
214- private CommitRequestStatistics commitRequestStatistics;
215-
216- /*
217 * When the transaction eco-system comes to life, the log manager positions
218 * itself to the end of the last written log. the startingLsn represent the
219 * lsn value of the next log record to be written after a system (re)start.
220@@ -146,16 +119,10 @@
221 */
222 private AtomicLong lsn = new AtomicLong(0);
kisskysc98f0fc2013-01-26 09:14:15 +0000223
kisskysdf137512013-02-20 07:12:33 +0000224- /*
225- * A map that tracks the flush requests submitted for each page. The
226- * requests for a page are cleared when the page is flushed.
227- */
228- public LinkedBlockingQueue<Thread> getPendingFlushRequests(int pageIndex) {
229- return pendingFlushRequests[pageIndex];
230- }
231+ private List<HashMap<TransactionContext, Integer>> activeTxnCountMaps;
kisskysc98f0fc2013-01-26 09:14:15 +0000232
kisskysdf137512013-02-20 07:12:33 +0000233- public void addFlushRequest(int pageIndex) {
234- pendingFlushRequests[pageIndex].add(pendingFlushRequests);
235+ public void addFlushRequest(int pageIndex, long lsn, boolean isSynchronous) {
236+ logPageFlusher.requestFlush(pageIndex, lsn, isSynchronous);
kisskysc98f0fc2013-01-26 09:14:15 +0000237 }
238
kisskysdf137512013-02-20 07:12:33 +0000239 public AtomicLong getLastFlushedLsn() {
240@@ -233,19 +200,12 @@
241 numLogPages = logManagerProperties.getNumLogPages();
242 logPageOwnerCount = new AtomicInteger[numLogPages];
243 logPageStatus = new AtomicInteger[numLogPages];
244- pendingFlushRequests = new LinkedBlockingQueue[numLogPages];
245- if (logManagerProperties.getGroupCommitWaitPeriod() > 0) { // configure
246- // the
247- // Commit
248- // Resolver
249- commitResolver = new GroupCommitResolver(); // Group Commit is
250- // enabled
251- commitRequestStatistics = new CommitRequestStatistics(numLogPages);
252- } else {
253- commitResolver = new BasicCommitResolver(); // the basic commit
254- // resolver
255+
256+ activeTxnCountMaps = new ArrayList<HashMap<TransactionContext, Integer>>(numLogPages);
257+ for (int i = 0; i < numLogPages; i++) {
258+ activeTxnCountMaps.add(new HashMap<TransactionContext, Integer>());
kisskysc98f0fc2013-01-26 09:14:15 +0000259 }
kisskysdf137512013-02-20 07:12:33 +0000260- this.commitResolver.init(this); // initialize the commit resolver
261+
262 logPages = new FileBasedBuffer[numLogPages];
kisskysc98f0fc2013-01-26 09:14:15 +0000263
kisskysdf137512013-02-20 07:12:33 +0000264 /*
265@@ -264,7 +224,6 @@
266 for (int i = 0; i < numLogPages; i++) {
267 logPageOwnerCount[i] = new AtomicInteger(PageOwnershipStatus.LOG_WRITER);
268 logPageStatus[i] = new AtomicInteger(PageState.ACTIVE);
269- pendingFlushRequests[i] = new LinkedBlockingQueue<Thread>();
kisskysc98f0fc2013-01-26 09:14:15 +0000270 }
kisskysdf137512013-02-20 07:12:33 +0000271
272 /*
273@@ -278,9 +237,9 @@
274 * daemon thread so that it does not stop the JVM from exiting when all
275 * other threads are done with their work.
276 */
277- LogPageFlushThread logFlusher = new LogPageFlushThread(this);
278- logFlusher.setDaemon(true);
279- logFlusher.start();
280+ logPageFlusher = new LogPageFlushThread(this);
281+ logPageFlusher.setDaemon(true);
282+ logPageFlusher.start();
kisskysc98f0fc2013-01-26 09:14:15 +0000283 }
284
kisskysdf137512013-02-20 07:12:33 +0000285 public int getLogPageIndex(long lsnValue) {
286@@ -312,7 +271,7 @@
287 */
288 private void waitUntillPageIsAvailableForWritingLog(int pageIndex) throws ACIDException {
289 if (logPageStatus[pageIndex].get() == PageState.ACTIVE
290- && getLogPageOwnershipCount(pageIndex).get() >= PageOwnershipStatus.LOG_WRITER) {
291+ && logPageOwnerCount[pageIndex].get() >= PageOwnershipStatus.LOG_WRITER) {
292 return;
293 }
294 try {
295@@ -338,47 +297,40 @@
296 */
297 private long getLsn(int entrySize, byte logType) throws ACIDException {
298 long pageSize = logManagerProperties.getLogPageSize();
299- boolean requiresFlushing = logType == LogType.COMMIT;
300+
301 while (true) {
302 boolean forwardPage = false;
303- boolean shouldFlushPage = false;
304 long old = lsn.get();
305- int pageIndex = getLogPageIndex(old); // get the log page
306- // corresponding to the
307- // current lsn value
308+
309+ //get the log page corresponding to the current lsn value
310+ int pageIndex = getLogPageIndex(old);
311 long retVal = old;
312- long next = old + entrySize; // the lsn value for the next request,
313- // if the current request is served.
314+
315+ // the lsn value for the next request if the current request is served.
316+ long next = old + entrySize;
317 int prevPage = -1;
318- if ((next - 1) / pageSize != old / pageSize // check if the log
319- // record will cross
320- // page boundaries, a
321- // case that is not
322- // allowed.
323- || (next % pageSize == 0)) {
324+
325+ // check if the log record will cross page boundaries, a case that is not allowed.
326+ if ((next - 1) / pageSize != old / pageSize || (next % pageSize == 0)) {
327+
328 if ((old != 0 && old % pageSize == 0)) {
329- retVal = old; // On second thought, this shall never be the
330- // case as it means that the lsn is
331- // currently at the beginning of a page and
332- // we still need to forward the page which
333- // means that the entrySize exceeds a log
334- // page size. If this is the case, an
335- // exception is thrown before calling this
336- // API.
337- // would remove this case.
338+ // On second thought, this shall never be the case as it means that the lsn is
339+ // currently at the beginning of a page and we still need to forward the page which
340+ // means that the entrySize exceeds a log page size. If this is the case, an
341+ // exception is thrown before calling this API. would remove this case.
342+ retVal = old;
kisskysc98f0fc2013-01-26 09:14:15 +0000343
kisskysdf137512013-02-20 07:12:33 +0000344 } else {
345- retVal = ((old / pageSize) + 1) * pageSize; // set the lsn
346- // to point to
347- // the beginning
348- // of the next
349- // page.
350+ // set the lsn to point to the beginning of the next page.
351+ retVal = ((old / pageSize) + 1) * pageSize;
352 }
353+
354 next = retVal;
355- forwardPage = true; // as the log record shall cross log page
356- // boundary, we must re-assign the lsn (so
357- // that the log record begins on a different
358- // location.
359+
360+ // as the log record shall cross log page boundary, we must re-assign the lsn so
361+ // that the log record begins on a different location.
362+ forwardPage = true;
363+
364 prevPage = pageIndex;
365 pageIndex = getNextPageInSequence(pageIndex);
366 }
367@@ -397,109 +349,51 @@
368 */
369 waitUntillPageIsAvailableForWritingLog(pageIndex);
370
371- if (!forwardPage && requiresFlushing) {
372- shouldFlushPage = commitResolver.shouldCommitPage(pageIndex, this, commitRequestStatistics);
373- if (shouldFlushPage) {
374- next = ((next / pageSize) + 1) * pageSize; /*
375- * next
376- * represents the
377- * next value of
378- * lsn after this
379- * log record has
380- * been written.
381- * If the page
382- * needs to be
383- * flushed, then
384- * we do not give
385- * any more LSNs
386- * from this
387- * page.
388- */
389- }
390- }
391- if (!lsn.compareAndSet(old, next)) { // Atomic call -> returns true
392- // only when the value
393- // represented by lsn is same as
394- // "old". The value is updated
395- // to "next".
396+ if (!lsn.compareAndSet(old, next)) {
397+ // Atomic call -> returns true only when the value represented by lsn is same as
398+ // "old". The value is updated to "next".
kisskysc98f0fc2013-01-26 09:14:15 +0000399 continue;
400 }
kisskysdf137512013-02-20 07:12:33 +0000401
402 if (forwardPage) {
403- //TODO
404- //this is not safe since the incoming thread may reach the same page slot with this page
405- //(differ by the log buffer size)
406- logPageStatus[prevPage].set(PageState.INACTIVE); // mark
407- // previous
408- // page
409- // inactive
410+ addFlushRequest(prevPage, old, false);
411
412- /*
413- * decrement on the behalf of the log manager. if there are no
414- * more owners (count == 0) the page must be marked as a
415- * candidate to be flushed.
416- */
417- int pageDirtyCount = getLogPageOwnershipCount(prevPage).decrementAndGet();
418- if (pageDirtyCount == 0) {
419- addFlushRequest(prevPage);
420- }
kisskysc98f0fc2013-01-26 09:14:15 +0000421-
kisskysdf137512013-02-20 07:12:33 +0000422- /*
423- * The transaction thread that discovers the need to forward a
424- * page is made to re-acquire a lsn.
425- */
426+ // The transaction thread that discovers the need to forward a
427+ // page is made to re-acquire a lsn.
428 continue;
429+
430 } else {
431- /*
432- * the transaction thread has been given a space in a log page,
433- * but is made to wait until the page is available.
434- */
435+ // the transaction thread has been given a space in a log page,
436+ // but is made to wait until the page is available.
437+ // (Is this needed? when does this wait happen?)
438 waitUntillPageIsAvailableForWritingLog(pageIndex);
439- /*
440- * increment the counter as the transaction thread now holds a
441- * space in the log page and hence is an owner.
442- */
443+
444+ // increment the counter as the transaction thread now holds a
445+ // space in the log page and hence is an owner.
446 logPageOwnerCount[pageIndex].incrementAndGet();
447- }
448- if (requiresFlushing) {
449- if (!shouldFlushPage) {
450- /*
451- * the log record requires the page to be flushed but under
452- * the commit policy, the flush task has been deferred. The
453- * transaction thread submits its request to flush the page.
454- */
455- commitRequestStatistics.registerCommitRequest(pageIndex);
456- } else {
457- /*
458- * the flush request was approved by the commit resolver.
459- * Thus the page is marked INACTIVE as no more logs will be
460- * written on this page. The log manager needs to release
461- * its ownership. Note that transaction threads may still
462- * continue to be owners of the log page till they fill up
463- * the space allocated to them.
464- */
465- logPageStatus[pageIndex].set(PageState.INACTIVE);
466- logPageOwnerCount[pageIndex].decrementAndGet(); // on
467- // the
468- // behalf
469- // of
470- // log
471- // manager
472+
473+ // Before the count is incremented, if the flusher flushed the allocated page,
474+ // then retry to get new LSN. Otherwise, the log with allocated lsn will be lost.
475+ if (lastFlushedLsn.get() >= retVal) {
476+ logPageOwnerCount[pageIndex].decrementAndGet();
477+ continue;
478 }
479 }
480+
481 return retVal;
482 }
483 }
484
485 @Override
486- public void log(byte logType, TransactionContext context, int datasetId, int PKHashValue, long resourceId,
487+ public void log(byte logType, TransactionContext txnCtx, int datasetId, int PKHashValue, long resourceId,
488 byte resourceMgrId, int logContentSize, ReusableLogContentObject reusableLogContentObject, ILogger logger,
489 LogicalLogLocator logicalLogLocator) throws ACIDException {
490- /*
491- * logLocator is a re-usable object that is appropriately set in each
492- * invocation. If the reference is null, the log manager must throw an
493- * exception
494- */
495+
496+ HashMap<TransactionContext, Integer> map = null;
497+ int activeTxnCount;
498+
499+ // logLocator is a re-usable object that is appropriately set in each invocation.
500+ // If the reference is null, the log manager must throw an exception.
501 if (logicalLogLocator == null) {
502 throw new ACIDException(
503 " you need to pass in a non-null logLocator, if you dont have it, then pass in a dummy so that the +"
504@@ -519,20 +413,19 @@
505
506 // all constraints checked and we are good to go and acquire a lsn.
507 long previousLSN = -1;
508- long currentLSN; // the will be set to the location (a long value)
509- // where the log record needs to be placed.
510
511- /*
512- * The logs written by a transaction need to be linked to each other for
513- * a successful rollback/recovery. However there could be multiple
514- * threads operating concurrently that are part of a common transaction.
515- * These threads need to synchronize and record the lsn corresponding to
516- * the last log record written by (any thread of) the transaction.
517- */
518- synchronized (context) {
519- previousLSN = context.getLastLogLocator().getLsn();
520+ // the will be set to the location (a long value) where the log record needs to be placed.
521+ long currentLSN;
522+
523+ // The logs written by a transaction need to be linked to each other for
524+ // a successful rollback/recovery. However there could be multiple
525+ // threads operating concurrently that are part of a common transaction.
526+ // These threads need to synchronize and record the lsn corresponding to
527+ // the last log record written by (any thread of) the transaction.
528+ synchronized (txnCtx) {
529+ previousLSN = txnCtx.getLastLogLocator().getLsn();
530 currentLSN = getLsn(totalLogSize, logType);
531- context.setLastLSN(currentLSN);
532+ txnCtx.setLastLSN(currentLSN);
533 if (IS_DEBUG_MODE) {
534 System.out.println("--------------> LSN(" + currentLSN + ") is allocated");
535 }
536@@ -547,48 +440,37 @@
537 * performed correctly that is ownership is released.
538 */
539
540- boolean decremented = false; // indicates if the transaction thread
541- // has release ownership of the
542- // page.
543- boolean addedFlushRequest = false; // indicates if the transaction
544- // thread has submitted a flush
545- // request.
546+ // indicates if the transaction thread has release ownership of the page.
547+ boolean decremented = false;
548
549 int pageIndex = (int) getLogPageIndex(currentLSN);
550
551- /*
552- * the lsn has been obtained for the log record. need to set the
553- * LogLocator instance accordingly.
554- */
555-
556+ // the lsn has been obtained for the log record. need to set the
557+ // LogLocator instance accordingly.
558 try {
559-
560 logicalLogLocator.setBuffer(logPages[pageIndex]);
561 int pageOffset = getLogPageOffset(currentLSN);
562 logicalLogLocator.setMemoryOffset(pageOffset);
563
564- /*
565- * write the log header.
566- */
567- logRecordHelper.writeLogHeader(logicalLogLocator, logType, context, datasetId, PKHashValue, previousLSN,
568+ // write the log header.
569+ logRecordHelper.writeLogHeader(logicalLogLocator, logType, txnCtx, datasetId, PKHashValue, previousLSN,
570 resourceId, resourceMgrId, logContentSize);
571
572 // increment the offset so that the transaction can fill up the
573 // content in the correct region of the allocated space.
574 logicalLogLocator.increaseMemoryOffset(logRecordHelper.getLogHeaderSize(logType));
575
576- // a COMMIT log record does not have any content
577- // and hence the logger (responsible for putting the log content) is
578- // not invoked.
579+ // a COMMIT log record does not have any content and hence
580+ // the logger (responsible for putting the log content) is not invoked.
581 if (logContentSize != 0) {
582- logger.preLog(context, reusableLogContentObject);
583+ logger.preLog(txnCtx, reusableLogContentObject);
584 }
585
586 if (logContentSize != 0) {
587 // call the logger implementation and ask to fill in the log
588 // record content at the allocated space.
589- logger.log(context, logicalLogLocator, logContentSize, reusableLogContentObject);
590- logger.postLog(context, reusableLogContentObject);
591+ logger.log(txnCtx, logicalLogLocator, logContentSize, reusableLogContentObject);
592+ logger.postLog(txnCtx, reusableLogContentObject);
593 if (IS_DEBUG_MODE) {
594 logicalLogLocator.setMemoryOffset(logicalLogLocator.getMemoryOffset()
595 - logRecordHelper.getLogHeaderSize(logType));
596@@ -597,10 +479,8 @@
597 }
598 }
599
600- /*
601- * The log record has been written. For integrity checks, compute
602- * the checksum and put it at the end of the log record.
603- */
604+ // The log record has been written. For integrity checks, compute
605+ // the checksum and put it at the end of the log record.
606 int startPosChecksum = logicalLogLocator.getMemoryOffset() - logRecordHelper.getLogHeaderSize(logType);
607 int length = totalLogSize - logRecordHelper.getLogChecksumSize();
608 long checksum = DataUtil.getChecksum(logPages[pageIndex], startPosChecksum, length);
609@@ -611,46 +491,31 @@
610 System.out.println("--------------> LSN(" + currentLSN + ") is written");
611 }
612
613- /*
614- * release the ownership as the log record has been placed in
615- * created space.
616- */
617- int pageDirtyCount = logPageOwnerCount[pageIndex].decrementAndGet();
618+ // release the ownership as the log record has been placed in created space.
619+ logPageOwnerCount[pageIndex].decrementAndGet();
620
621 // indicating that the transaction thread has released ownership
622 decremented = true;
623
624- /*
625- * If the transaction thread happens to be the last owner of the log
626- * page the page must by marked as a candidate to be flushed.
627- */
628- if (pageDirtyCount == 0 && logPageStatus[pageIndex].get() == PageState.INACTIVE) {
629- addFlushRequest(pageIndex);
630- addedFlushRequest = true;
631- }
632-
633- /*
634- * If the log type is commit, a flush request is registered, if the
635- * log record has not reached the disk. It may be possible that this
636- * thread does not get CPU cycles and in-between the log record has
637- * been flushed to disk because the containing log page filled up.
638- */
639- if (logType == LogType.COMMIT) {
640- synchronized (logPages[pageIndex]) {
641- while (getLastFlushedLsn().get() < currentLSN) {
642- logPages[pageIndex].wait();
643- }
644+ if (logType == LogType.ENTITY_COMMIT) {
645+ map = activeTxnCountMaps.get(pageIndex);
646+ if (map.containsKey(txnCtx)) {
647+ activeTxnCount = (Integer) map.get(txnCtx);
648+ activeTxnCount++;
649+ map.put(txnCtx, activeTxnCount);
650+ } else {
651+ map.put(txnCtx, 1);
652 }
653+ addFlushRequest(pageIndex, currentLSN, false);
654+ } else if (logType == LogType.COMMIT) {
655+ addFlushRequest(pageIndex, currentLSN, true);
656 }
657
658 } catch (Exception e) {
659 e.printStackTrace();
660- throw new ACIDException(context, "Thread: " + Thread.currentThread().getName()
661+ throw new ACIDException(txnCtx, "Thread: " + Thread.currentThread().getName()
662 + " logger encountered exception", e);
663 } finally {
664- /*
665- * If an exception was encountered and we did not release ownership
666- */
667 if (!decremented) {
668 logPageOwnerCount[pageIndex].decrementAndGet();
669 }
670@@ -667,9 +532,6 @@
671
672 logPages[pageIndex].reset(filePath, LogUtil.getFileOffset(this, nextWritePosition),
673 logManagerProperties.getLogPageSize());
674-
675- //TODO Check if this is necessary
676- //Arrays.fill(logPages[pageIndex].getArray(), (byte) 0);
677 }
678
679 @Override
680@@ -747,16 +609,13 @@
681 //minimize memory allocation overhead. current code allocates the log page size per reading a log record.
682
683 byte[] pageContent = new byte[logManagerProperties.getLogPageSize()];
684- // take a lock on the log page so that the page is not flushed to
685- // disk interim
686+
687+ // take a lock on the log page so that the page is not flushed to disk interim
688 synchronized (logPages[pageIndex]) {
689- if (lsnValue > getLastFlushedLsn().get()) { // need to check
690- // again
691- // (this
692- // thread may have got
693- // de-scheduled and must
694- // refresh!)
695
696+ // need to check again (this thread may have got de-scheduled and must refresh!)
697+ if (lsnValue > getLastFlushedLsn().get()) {
698+
699 // get the log record length
700 logPages[pageIndex].getBytes(pageContent, 0, pageContent.length);
701 byte logType = pageContent[pageOffset + 4];
702@@ -765,9 +624,7 @@
703 int logRecordSize = logHeaderSize + logBodySize + logRecordHelper.getLogChecksumSize();
704 logRecord = new byte[logRecordSize];
705
706- /*
707- * copy the log record content
708- */
709+ // copy the log record content
710 System.arraycopy(pageContent, pageOffset, logRecord, 0, logRecordSize);
711 MemBasedBuffer memBuffer = new MemBasedBuffer(logRecord);
712 if (logicalLogLocator == null) {
713@@ -790,9 +647,7 @@
714 }
715 }
716
717- /*
718- * the log record is residing on the disk, read it from there.
719- */
720+ // the log record is residing on the disk, read it from there.
721 readDiskLog(lsnValue, logicalLogLocator);
722 }
723
724@@ -860,30 +715,40 @@
725 return logPageOwnerCount[pageIndex];
726 }
727
728- public ICommitResolver getCommitResolver() {
729- return commitResolver;
730- }
731-
732- public CommitRequestStatistics getCommitRequestStatistics() {
733- return commitRequestStatistics;
734- }
735-
736 public IFileBasedBuffer[] getLogPages() {
737 return logPages;
738 }
739
740- public int getLastFlushedPage() {
741- return lastFlushedPage.get();
742- }
743-
744- public void setLastFlushedPage(int lastFlushedPage) {
745- this.lastFlushedPage.set(lastFlushedPage);
746- }
747-
748 @Override
749 public TransactionSubsystem getTransactionSubsystem() {
750 return provider;
751 }
752+
753+ public void decrementActiveTxnCountOnIndexes(int pageIndex) throws HyracksDataException {
754+ TransactionContext ctx = null;
755+ int count = 0;
756+ int i = 0;
757+
758+ HashMap<TransactionContext, Integer> map = activeTxnCountMaps.get(pageIndex);
759+ Set<Map.Entry<TransactionContext, Integer>> entrySet = map.entrySet();
760+ if (entrySet != null) {
761+ for (Map.Entry<TransactionContext, Integer> entry : entrySet) {
762+ if (entry != null) {
763+ if (entry.getValue() != null) {
764+ count = entry.getValue();
765+ }
766+ if (count > 0) {
767+ ctx = entry.getKey();
768+ for (i = 0; i < count; i++) {
769+ ctx.decreaseActiveTransactionCountOnIndexes();
770+ }
771+ }
772+ }
773+ }
774+ }
775+
776+ map.clear();
777+ }
778 }
779
780 /*
781@@ -895,36 +760,82 @@
782 class LogPageFlushThread extends Thread {
783
784 private LogManager logManager;
785+ /*
786+ * pendingFlushRequests is a map with key as Integer denoting the page
787+ * index. When a (transaction) thread discovers the need to flush a page, it
788+ * puts its Thread object into the corresponding value that is a
789+ * LinkedBlockingQueue. The LogManager has a LogFlusher thread that scans
790+ * this map in order of page index (and circling around). The flusher thread
791+ * needs to flush pages in order and waits for a thread to deposit an object
792+ * in the blocking queue corresponding to the next page in order. A request
793+ * to flush a page is conveyed to the flush thread by simply depositing an
794+ * object in to corresponding blocking queue. It is blocking in the sense
795+ * that the flusher thread will continue to wait for an object to arrive in
796+ * the queue. The object itself is ignored by the fliusher and just acts as
797+ * a signal/event that a page needs to be flushed.
798+ */
799+ private final LinkedBlockingQueue<Object>[] flushRequestQueue;
800+ private final Object[] flushRequests;
801+ private int lastFlushedPageIndex;
802+ private final long groupCommitWaitPeriod;
803
804 public LogPageFlushThread(LogManager logManager) {
805 this.logManager = logManager;
806 setName("Flusher");
807+ int numLogPages = logManager.getLogManagerProperties().getNumLogPages();
808+ this.flushRequestQueue = new LinkedBlockingQueue[numLogPages];
809+ this.flushRequests = new Object[numLogPages];
810+ for (int i = 0; i < numLogPages; i++) {
811+ flushRequestQueue[i] = new LinkedBlockingQueue<Object>(1);
812+ flushRequests[i] = new Object();
813+ }
814+ this.lastFlushedPageIndex = -1;
815+ groupCommitWaitPeriod = logManager.getLogManagerProperties().getGroupCommitWaitPeriod();
816 }
817
818+ public void requestFlush(int pageIndex, long lsn, boolean isSynchronous) {
819+ synchronized (logManager.getLogPage(pageIndex)) {
820+ //return if flushedLSN >= lsn
821+ if (logManager.getLastFlushedLsn().get() >= lsn) {
822+ return;
823+ }
824+
825+ //put a new request to the queue only if the request on the page is not in the queue.
826+ flushRequestQueue[pageIndex].offer(flushRequests[pageIndex]);
827+
828+ //return if the request is asynchronous
829+ if (!isSynchronous) {
830+ return;
831+ }
832+
833+ //wait until there is flush.
834+ boolean isNotified = false;
835+ while (!isNotified) {
836+ try {
837+ logManager.getLogPage(pageIndex).wait();
838+ isNotified = true;
839+ } catch (InterruptedException e) {
840+ e.printStackTrace();
841+ }
842+ }
843+ }
844+ }
845+
846 @Override
847 public void run() {
848 while (true) {
849 try {
850- int pageToFlush = logManager.getNextPageInSequence(logManager.getLastFlushedPage());
851+ int pageToFlush = logManager.getNextPageInSequence(lastFlushedPageIndex);
852
853- /*
854- * A wait call on the linkedBLockingQueue. The flusher thread is
855- * notified when an object is added to the queue. Please note
856- * that each page has an associated blocking queue.
857- */
858- logManager.getPendingFlushRequests(pageToFlush).take();
859+ // A wait call on the linkedBLockingQueue. The flusher thread is
860+ // notified when an object is added to the queue. Please note
861+ // that each page has an associated blocking queue.
862+ flushRequestQueue[pageToFlush].take();
863
864- /*
865- * The LogFlusher was waiting for a page to be marked as a
866- * candidate for flushing. Now that has happened. The thread
867- * shall proceed to take a lock on the log page
868- */
869- synchronized (logManager.getLogPages()[pageToFlush]) {
870+ synchronized (logManager.getLogPage(pageToFlush)) {
871
872- /*
873- * lock the internal state of the log manager and create a
874- * log file if necessary.
875- */
876+ // lock the internal state of the log manager and create a
877+ // log file if necessary.
878 int prevLogFileId = logManager.getLogFileId(logManager.getLastFlushedLsn().get());
879 int nextLogFileId = logManager.getLogFileId(logManager.getLastFlushedLsn().get()
880 + logManager.getLogManagerProperties().getLogPageSize());
881@@ -936,198 +847,60 @@
882 logManager.getLogManagerProperties().getLogPageSize());
883 }
884
885- logManager.getLogPage(pageToFlush).flush(); // put the
886- // content to
887- // disk, the
888- // thread still
889- // has a lock on
890- // the log page
891+ //#. sleep during the groupCommitWaitTime
892+ sleep(groupCommitWaitPeriod);
893
894- /*
895- * acquire lock on the log manager as we need to update the
896- * internal bookkeeping data.
897- */
898+ //#. set the logPageStatus to INACTIVE in order to prevent other txns from writing on this page.
899+ logManager.getLogPageStatus(pageToFlush).set(PageState.INACTIVE);
900
901- // increment the last flushed lsn.
902- long lastFlushedLsn = logManager.incrementLastFlushedLsn(logManager.getLogManagerProperties()
903- .getLogPageSize());
904+ //#. need to wait until the logPageOwnerCount reaches 1 (LOG_WRITER)
905+ // meaning every one has finished writing logs on this page.
906+ while (logManager.getLogPageOwnershipCount(pageToFlush).get() != PageOwnershipStatus.LOG_WRITER) {
907+ sleep(0);
908+ }
909
910- /*
911- * the log manager gains back ownership of the page. this is
912- * reflected by incrementing the owner count of the page.
913- * recall that when the page is begin flushed the owner
914- * count is actually 0 Value of zero implicitly indicates
915- * that the page is operated upon by the log flusher thread.
916- */
917- logManager.getLogPageOwnershipCount(pageToFlush).incrementAndGet();
918+ //#. set the logPageOwnerCount to 0 (LOG_FLUSHER)
919+ // meaning it is flushing.
920+ logManager.getLogPageOwnershipCount(pageToFlush).set(PageOwnershipStatus.LOG_FLUSHER);
921
922- /*
923- * get the number of log buffers that have been written so
924- * far. A log buffer = number of log pages * size of a log
925- * page
926- */
927- int numCycles = (int) lastFlushedLsn / logManager.getLogManagerProperties().getLogBufferSize();
928- if (lastFlushedLsn % logManager.getLogManagerProperties().getLogBufferSize() == 0) {
929- numCycles--;
930- }
931+ // put the content to disk (the thread still has a lock on the log page)
932+ logManager.getLogPage(pageToFlush).flush();
933
934- /*
935- * Map the log page to a new region in the log file.
936- */
937+ // increment the last flushed lsn and lastFlushedPage
938+ logManager.incrementLastFlushedLsn(logManager.getLogManagerProperties().getLogPageSize());
939+ lastFlushedPageIndex = pageToFlush;
940
941+ // decrement activeTxnCountOnIndexes
942+ logManager.decrementActiveTxnCountOnIndexes(pageToFlush);
943+
944+ // reset the count to 1
945+ logManager.getLogPageOwnershipCount(pageToFlush).set(PageOwnershipStatus.LOG_WRITER);
946+
947+ // Map the log page to a new region in the log file.
948 long nextWritePosition = logManager.getLogPages()[pageToFlush].getNextWritePosition()
949 + logManager.getLogManagerProperties().getLogBufferSize();
950
951- /*
952- * long nextPos = (numCycles + 1)
953- * logManager.getLogManagerProperties() .getLogBufferSize()
954- * + pageToFlush logManager.getLogManagerProperties()
955- * .getLogPageSize();
956- */
957 logManager.resetLogPage(nextWritePosition, pageToFlush);
958
959 // mark the page as ACTIVE
960 logManager.getLogPageStatus(pageToFlush).set(LogManager.PageState.ACTIVE);
961
962- // notify all waiting (transaction) threads.
963- // Transaction thread may be waiting for the page to be
964- // available or may have a commit log record on the page
965- // that got flushed.
966- logManager.getLogPages()[pageToFlush].notifyAll();
967- logManager.setLastFlushedPage(pageToFlush);
968+ //#. checks the queue whether there is another flush request on the same log buffer
969+ // If there is another request, then simply remove it.
970+ if (flushRequestQueue[pageToFlush].peek() != null) {
971+ flushRequestQueue[pageToFlush].take();
972+ }
973
974+ // notify all waiting (transaction) threads.
975+ logManager.getLogPage(pageToFlush).notifyAll();
976 }
977 } catch (IOException ioe) {
978 ioe.printStackTrace();
979 throw new Error(" exception in flushing log page", ioe);
980 } catch (InterruptedException e) {
981 e.printStackTrace();
982- break; // must break from the loop as the exception indicates
983- // some thing horrendous has happened elsewhere
984+ break;
985 }
986 }
987 }
988-}
989-
990-/*
991- * TODO: By default the commit policy is to commit at each request and not have
992- * a group commit. The following code needs to change to support group commit.
993- * The code for group commit has not been tested thoroughly and is under
994- * development.
995- */
996-class BasicCommitResolver implements ICommitResolver {
997-
998- public boolean shouldCommitPage(int pageIndex, LogManager logManager,
999- CommitRequestStatistics commitRequestStatistics) {
1000- return true;
1001- }
1002-
1003- public void init(LogManager logManager) {
1004- }
1005-}
1006-
1007-class GroupCommitResolver implements ICommitResolver {
1008-
1009- public boolean shouldCommitPage(int pageIndex, LogManager logManager,
1010- CommitRequestStatistics commitRequestStatistics) {
1011- long maxCommitWait = logManager.getLogManagerProperties().getGroupCommitWaitPeriod();
1012- long timestamp = commitRequestStatistics.getPageLevelLastCommitRequestTimestamp(pageIndex);
1013- if (timestamp == -1) {
1014- if (maxCommitWait == 0) {
1015- return true;
1016- } else {
1017- timestamp = System.currentTimeMillis();
1018- }
1019- }
1020- long currenTime = System.currentTimeMillis();
1021- if (currenTime - timestamp > maxCommitWait) {
1022- return true;
1023- }
1024- return false;
1025- }
1026-
1027- public void init(LogManager logManager) {
1028- GroupCommitHandlerThread groupCommitHandler = new GroupCommitHandlerThread(logManager);
1029- groupCommitHandler.setDaemon(true);
1030- groupCommitHandler.start();
1031- }
1032-
1033- class GroupCommitHandlerThread extends Thread {
1034-
1035- private LogManager logManager;
1036-
1037- public GroupCommitHandlerThread(LogManager logManager) {
1038- this.logManager = logManager;
1039- setName("Group Commit Handler");
1040- }
1041-
1042- @Override
1043- public void run() {
1044- int pageIndex = -1;
1045- while (true) {
1046- pageIndex = logManager.getNextPageInSequence(pageIndex);
1047- long lastCommitRequeestTimestamp = logManager.getCommitRequestStatistics()
1048- .getPageLevelLastCommitRequestTimestamp(pageIndex);
1049- if (lastCommitRequeestTimestamp != -1
1050- && System.currentTimeMillis() - lastCommitRequeestTimestamp > logManager
1051- .getLogManagerProperties().getGroupCommitWaitPeriod()) {
1052- int dirtyCount = logManager.getLogPageOwnershipCount(pageIndex).decrementAndGet();
1053- if (dirtyCount == 0) {
1054- try {
1055- logManager.getLogPageStatus(pageIndex).set(LogManager.PageState.INACTIVE);
1056- logManager.getPendingFlushRequests(pageIndex).put(Thread.currentThread());
1057- } catch (InterruptedException e) {
1058- e.printStackTrace();
1059- break;
1060- }
1061- logManager.getCommitRequestStatistics().committedPage(pageIndex);
1062- }
1063- }
1064- }
1065- }
1066- }
1067-
1068-}
1069-
1070-interface ICommitResolver {
1071- public boolean shouldCommitPage(int pageIndex, LogManager logManager,
1072- CommitRequestStatistics commitRequestStatistics);
1073-
1074- public void init(LogManager logManager);
1075-}
1076-
1077-/**
1078- * Represents a collection of all commit requests by transactions for each log
1079- * page. The requests are accumulated until the commit policy triggers a flush
1080- * of the corresponding log page. Upon a flush of a page, all commit requests
1081- * for the page are cleared.
1082- */
1083-class CommitRequestStatistics {
1084-
1085- AtomicInteger[] pageLevelCommitRequestCount;
1086- AtomicLong[] pageLevelLastCommitRequestTimestamp;
1087-
1088- public CommitRequestStatistics(int numPages) {
1089- pageLevelCommitRequestCount = new AtomicInteger[numPages];
1090- pageLevelLastCommitRequestTimestamp = new AtomicLong[numPages];
1091- for (int i = 0; i < numPages; i++) {
1092- pageLevelCommitRequestCount[i] = new AtomicInteger(0);
1093- pageLevelLastCommitRequestTimestamp[i] = new AtomicLong(-1L);
1094- }
1095- }
1096-
1097- public void registerCommitRequest(int pageIndex) {
1098- pageLevelCommitRequestCount[pageIndex].incrementAndGet();
1099- pageLevelLastCommitRequestTimestamp[pageIndex].set(System.currentTimeMillis());
1100- }
1101-
1102- public long getPageLevelLastCommitRequestTimestamp(int pageIndex) {
1103- return pageLevelLastCommitRequestTimestamp[pageIndex].get();
1104- }
1105-
1106- public void committedPage(int pageIndex) {
1107- pageLevelCommitRequestCount[pageIndex].set(0);
1108- pageLevelLastCommitRequestTimestamp[pageIndex].set(-1L);
1109- }
1110-
1111-}
1112+}
1113\ No newline at end of file
1114Index: asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogRecordHelper.java
1115===================================================================
1116--- asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogRecordHelper.java (revision 1194)
1117+++ asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogRecordHelper.java (working copy)
1118@@ -152,6 +152,9 @@
1119 case LogType.UPDATE:
1120 logTypeDisplay = "UPDATE";
1121 break;
1122+ case LogType.ENTITY_COMMIT:
1123+ logTypeDisplay = "ENTITY_COMMIT";
1124+ break;
1125 }
1126 builder.append(" LSN : ").append(logicalLogLocator.getLsn());
1127 builder.append(" Log Type : ").append(logTypeDisplay);
1128Index: asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogType.java
1129===================================================================
1130--- asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogType.java (revision 1194)
1131+++ asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogType.java (working copy)
1132@@ -18,5 +18,6 @@
1133
1134 public static final byte UPDATE = 0;
1135 public static final byte COMMIT = 1;
1136+ public static final byte ENTITY_COMMIT = 2;
1137
1138 }
1139Index: asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogManagerProperties.java
1140===================================================================
1141--- asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogManagerProperties.java (revision 1194)
1142+++ asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogManagerProperties.java (working copy)
1143@@ -1,5 +1,5 @@
1144 /*
1145- * Copyright 2009-2010 by The Regents of the University of California
1146+ * Copyright 2009-2012 by The Regents of the University of California
1147 * Licensed under the Apache License, Version 2.0 (the "License");
1148 * you may not use this file except in compliance with the License.
1149 * you may obtain a copy of the License from
1150@@ -41,7 +41,7 @@
1151 private int logPageSize = 128 * 1024; // 128 KB
1152 private int numLogPages = 8; // number of log pages in the log buffer.
1153
1154- private long groupCommitWaitPeriod = 0; // time in milliseconds for which a
1155+ private long groupCommitWaitPeriod = 1; // time in milliseconds for which a
1156 // commit record will wait before
1157 // the housing page is marked for
1158 // flushing.
1159Index: asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/recovery/RecoveryManager.java
1160===================================================================
1161--- asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/recovery/RecoveryManager.java (revision 1194)
1162+++ asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/recovery/RecoveryManager.java (working copy)
1163@@ -184,6 +184,7 @@
1164 break;
1165
1166 case LogType.COMMIT:
1167+ case LogType.ENTITY_COMMIT:
1168 tempKeyTxnId.setTxnId(logRecordHelper.getJobId(currentLogLocator),
1169 logRecordHelper.getDatasetId(currentLogLocator),
1170 logRecordHelper.getPKHashValue(currentLogLocator));
1171@@ -218,6 +219,7 @@
1172 IIndex index = null;
1173 LocalResource localResource = null;
1174 ILocalResourceMetadata localResourceMetadata = null;
1175+ List<Long> resourceIdList = new ArrayList<Long>();
1176
1177 //#. get indexLifeCycleManager
1178 IAsterixAppRuntimeContextProvider appRuntimeContext = txnSubsystem.getAsterixAppRuntimeContextProvider();
1179@@ -272,6 +274,8 @@
1180 index = localResourceMetadata.createIndexInstance(appRuntimeContext,
1181 localResource.getResourceName(), localResource.getPartition());
1182 indexLifecycleManager.register(resourceId, index);
1183+ indexLifecycleManager.open(resourceId);
1184+ resourceIdList.add(resourceId);
1185 }
1186
1187 /***************************************************/
1188@@ -300,6 +304,7 @@
1189 break;
1190
1191 case LogType.COMMIT:
1192+ case LogType.ENTITY_COMMIT:
1193 //do nothing
1194 break;
1195
1196@@ -308,6 +313,11 @@
1197 }
1198 }
1199
1200+ //close all indexes
1201+ for (long r : resourceIdList) {
1202+ indexLifecycleManager.close(r);
1203+ }
1204+
1205 JobIdFactory.initJobId(maxJobId);
1206 }
1207
1208@@ -539,6 +549,7 @@
1209 break;
1210
1211 case LogType.COMMIT:
1212+ case LogType.ENTITY_COMMIT:
1213 undoLSNSet = loserTxnTable.get(tempKeyTxnId);
1214 if (undoLSNSet != null) {
1215 loserTxnTable.remove(tempKeyTxnId);
1216Index: asterix-app/src/test/java/edu/uci/ics/asterix/test/metadata/MetadataTest.java
1217===================================================================
1218--- asterix-app/src/test/java/edu/uci/ics/asterix/test/metadata/MetadataTest.java (revision 1194)
1219+++ asterix-app/src/test/java/edu/uci/ics/asterix/test/metadata/MetadataTest.java (working copy)
1220@@ -42,6 +42,16 @@
1221 List<CompilationUnit> cUnits = tcCtx.getTestCase().getCompilationUnit();
1222 for (CompilationUnit cUnit : cUnits) {
1223 File testFile = tcCtx.getTestFile(cUnit);
1224+
1225+ /*****************
1226+ if (!testFile.getAbsolutePath().contains("meta09.aql")) {
1227+ System.out.println(testFile.getAbsolutePath());
1228+ continue;
1229+ }
1230+ System.out.println(testFile.getAbsolutePath());
1231+ *****************/
1232+
kisskysc98f0fc2013-01-26 09:14:15 +00001233+
1234 File expectedResultFile = tcCtx.getExpectedResultFile(cUnit);
1235 File actualFile = new File(PATH_ACTUAL + File.separator
1236 + tcCtx.getTestCase().getFilePath().replace(File.separator, "_") + "_" + cUnit.getName() + ".adm");
kisskysdf137512013-02-20 07:12:33 +00001237Index: asterix-app/src/test/java/edu/uci/ics/asterix/test/runtime/ExecutionTest.java
kisskysc98f0fc2013-01-26 09:14:15 +00001238===================================================================
kisskysdf137512013-02-20 07:12:33 +00001239--- asterix-app/src/test/java/edu/uci/ics/asterix/test/runtime/ExecutionTest.java (revision 1194)
1240+++ asterix-app/src/test/java/edu/uci/ics/asterix/test/runtime/ExecutionTest.java (working copy)
1241@@ -95,9 +95,10 @@
1242 File testFile = tcCtx.getTestFile(cUnit);
kisskysc98f0fc2013-01-26 09:14:15 +00001243
kisskysdf137512013-02-20 07:12:33 +00001244 /*************** to avoid run failure cases ****************
1245- if (!testFile.getAbsolutePath().contains("index-selection/")) {
1246+ if (!testFile.getAbsolutePath().contains("query-issue205.aql")) {
1247 continue;
1248 }
1249+ System.out.println(testFile.getAbsolutePath());
1250 ************************************************************/
1251
1252 File expectedResultFile = tcCtx.getExpectedResultFile(cUnit);
1253Index: diff_file
kisskysc98f0fc2013-01-26 09:14:15 +00001254===================================================================
kisskysdf137512013-02-20 07:12:33 +00001255--- diff_file (revision 1194)
1256+++ diff_file (working copy)
1257@@ -1,2098 +1,1252 @@
1258-Index: asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entitytupletranslators/DataverseTupleTranslator.java
1259+Index: asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionManager.java
1260 ===================================================================
1261---- asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entitytupletranslators/DataverseTupleTranslator.java (revision 1061)
1262-+++ asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entitytupletranslators/DataverseTupleTranslator.java (working copy)
1263-@@ -25,8 +25,11 @@
1264- import edu.uci.ics.asterix.metadata.bootstrap.MetadataPrimaryIndexes;
1265- import edu.uci.ics.asterix.metadata.bootstrap.MetadataRecordTypes;
1266- import edu.uci.ics.asterix.metadata.entities.Dataverse;
1267-+import edu.uci.ics.asterix.om.base.AInt32;
1268-+import edu.uci.ics.asterix.om.base.AMutableInt32;
1269- import edu.uci.ics.asterix.om.base.ARecord;
1270- import edu.uci.ics.asterix.om.base.AString;
1271-+import edu.uci.ics.asterix.om.types.BuiltinType;
1272- import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
1273- import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
1274+--- asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionManager.java (revision 1194)
1275++++ asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionManager.java (working copy)
1276+@@ -103,12 +103,14 @@
1277+ //for entity-level commit
1278+ if (PKHashVal != -1) {
1279+ transactionProvider.getLockManager().unlock(datasetId, PKHashVal, txnContext, true);
1280++ /*****************************
1281+ try {
1282+ //decrease the transaction reference count on index
1283+ txnContext.decreaseActiveTransactionCountOnIndexes();
1284+ } catch (HyracksDataException e) {
1285+ throw new ACIDException("failed to complete index operation", e);
1286+ }
1287++ *****************************/
1288+ return;
1289+ }
1290
1291-@@ -40,12 +43,18 @@
1292- // Payload field containing serialized Dataverse.
1293- public static final int DATAVERSE_PAYLOAD_TUPLE_FIELD_INDEX = 1;
1294-
1295-+ private AMutableInt32 aInt32;
1296-+ protected ISerializerDeserializer<AInt32> aInt32Serde;
1297-+
1298- @SuppressWarnings("unchecked")
1299- private ISerializerDeserializer<ARecord> recordSerDes = AqlSerializerDeserializerProvider.INSTANCE
1300- .getSerializerDeserializer(MetadataRecordTypes.DATAVERSE_RECORDTYPE);
1301-
1302-+ @SuppressWarnings("unchecked")
1303- public DataverseTupleTranslator(boolean getTuple) {
1304- super(getTuple, MetadataPrimaryIndexes.DATAVERSE_DATASET.getFieldCount());
1305-+ aInt32 = new AMutableInt32(-1);
1306-+ aInt32Serde = AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.AINT32);
1307- }
1308-
1309- @Override
1310-@@ -57,7 +66,8 @@
1311- DataInput in = new DataInputStream(stream);
1312- ARecord dataverseRecord = recordSerDes.deserialize(in);
1313- return new Dataverse(((AString) dataverseRecord.getValueByPos(0)).getStringValue(),
1314-- ((AString) dataverseRecord.getValueByPos(1)).getStringValue());
1315-+ ((AString) dataverseRecord.getValueByPos(1)).getStringValue(),
1316-+ ((AInt32) dataverseRecord.getValueByPos(2)).getIntegerValue());
1317- }
1318-
1319- @Override
1320-@@ -88,6 +98,12 @@
1321- stringSerde.serialize(aString, fieldValue.getDataOutput());
1322- recordBuilder.addField(MetadataRecordTypes.DATAVERSE_ARECORD_TIMESTAMP_FIELD_INDEX, fieldValue);
1323-
1324-+ // write field 3
1325-+ fieldValue.reset();
1326-+ aInt32.setValue(instance.getPendingOp());
1327-+ aInt32Serde.serialize(aInt32, fieldValue.getDataOutput());
1328-+ recordBuilder.addField(MetadataRecordTypes.DATAVERSE_ARECORD_PENDINGOP_FIELD_INDEX, fieldValue);
1329-+
1330- recordBuilder.write(tupleBuilder.getDataOutput(), true);
1331- tupleBuilder.addFieldEndOffset();
1332-
1333-Index: asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entitytupletranslators/DatasetTupleTranslator.java
1334+Index: asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionContext.java
1335 ===================================================================
1336---- asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entitytupletranslators/DatasetTupleTranslator.java (revision 1061)
1337-+++ asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entitytupletranslators/DatasetTupleTranslator.java (working copy)
1338-@@ -77,9 +77,9 @@
1339- protected ISerializerDeserializer<AInt32> aInt32Serde;
1340+--- asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionContext.java (revision 1194)
1341++++ asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionContext.java (working copy)
1342+@@ -19,6 +19,7 @@
1343+ import java.util.HashSet;
1344+ import java.util.List;
1345+ import java.util.Set;
1346++import java.util.concurrent.atomic.AtomicInteger;
1347
1348- @SuppressWarnings("unchecked")
1349-- public DatasetTupleTranslator(boolean getTuple) {
1350-+ public DatasetTupleTranslator(boolean getTuple) {
1351- super(getTuple, MetadataPrimaryIndexes.DATASET_DATASET.getFieldCount());
1352-- aInt32 = new AMutableInt32(-1);
1353-+ aInt32 = new AMutableInt32(-1);
1354- aInt32Serde = AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.AINT32);
1355- }
1356-
1357-@@ -104,8 +104,10 @@
1358- .getValueByPos(MetadataRecordTypes.DATASET_ARECORD_DATATYPENAME_FIELD_INDEX)).getStringValue();
1359- DatasetType datasetType = DatasetType.valueOf(((AString) datasetRecord.getValueByPos(3)).getStringValue());
1360- IDatasetDetails datasetDetails = null;
1361-- int datasetId = ((AInt32) datasetRecord
1362-+ int datasetId = ((AInt32) datasetRecord
1363- .getValueByPos(MetadataRecordTypes.DATASET_ARECORD_DATASETID_FIELD_INDEX)).getIntegerValue();
1364-+ int pendingOp = ((AInt32) datasetRecord
1365-+ .getValueByPos(MetadataRecordTypes.DATASET_ARECORD_PENDINGOP_FIELD_INDEX)).getIntegerValue();
1366- switch (datasetType) {
1367- case FEED:
1368- case INTERNAL: {
1369-@@ -197,7 +199,7 @@
1370- }
1371- datasetDetails = new ExternalDatasetDetails(adapter, properties);
1372+ import edu.uci.ics.asterix.transaction.management.exception.ACIDException;
1373+ import edu.uci.ics.asterix.transaction.management.opcallbacks.AbstractOperationCallback;
1374+@@ -169,5 +170,14 @@
1375+ closeable.close(this);
1376 }
1377-- return new Dataset(dataverseName, datasetName, typeName, datasetDetails, datasetType, datasetId);
1378-+ return new Dataset(dataverseName, datasetName, typeName, datasetDetails, datasetType, datasetId, pendingOp);
1379 }
1380++
1381++ @Override
1382++ public int hashCode() {
1383++ return jobId.getId();
1384++ }
1385
1386- @Override
1387-@@ -248,13 +250,19 @@
1388- aString.setValue(Calendar.getInstance().getTime().toString());
1389- stringSerde.serialize(aString, fieldValue.getDataOutput());
1390- recordBuilder.addField(MetadataRecordTypes.DATASET_ARECORD_TIMESTAMP_FIELD_INDEX, fieldValue);
1391--
1392-+
1393- // write field 8
1394- fieldValue.reset();
1395- aInt32.setValue(dataset.getDatasetId());
1396- aInt32Serde.serialize(aInt32, fieldValue.getDataOutput());
1397- recordBuilder.addField(MetadataRecordTypes.DATASET_ARECORD_DATASETID_FIELD_INDEX, fieldValue);
1398--
1399-+
1400-+ // write field 9
1401-+ fieldValue.reset();
1402-+ aInt32.setValue(dataset.getPendingOp());
1403-+ aInt32Serde.serialize(aInt32, fieldValue.getDataOutput());
1404-+ recordBuilder.addField(MetadataRecordTypes.DATASET_ARECORD_PENDINGOP_FIELD_INDEX, fieldValue);
1405-+
1406- // write record
1407- recordBuilder.write(tupleBuilder.getDataOutput(), true);
1408- tupleBuilder.addFieldEndOffset();
1409-@@ -290,13 +298,15 @@
1410- fieldValue.reset();
1411- aString.setValue(name);
1412- stringSerde.serialize(aString, fieldValue.getDataOutput());
1413-- propertyRecordBuilder.addField(MetadataRecordTypes.DATASOURCE_ADAPTER_PROPERTIES_ARECORD_NAME_FIELD_INDEX, fieldValue);
1414-+ propertyRecordBuilder.addField(MetadataRecordTypes.DATASOURCE_ADAPTER_PROPERTIES_ARECORD_NAME_FIELD_INDEX,
1415-+ fieldValue);
1416-
1417- // write field 1
1418- fieldValue.reset();
1419- aString.setValue(value);
1420- stringSerde.serialize(aString, fieldValue.getDataOutput());
1421-- propertyRecordBuilder.addField(MetadataRecordTypes.DATASOURCE_ADAPTER_PROPERTIES_ARECORD_VALUE_FIELD_INDEX, fieldValue);
1422-+ propertyRecordBuilder.addField(MetadataRecordTypes.DATASOURCE_ADAPTER_PROPERTIES_ARECORD_VALUE_FIELD_INDEX,
1423-+ fieldValue);
1424-
1425- propertyRecordBuilder.write(out, true);
1426- }
1427-Index: asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entitytupletranslators/IndexTupleTranslator.java
1428++ @Override
1429++ public boolean equals(Object o) {
1430++ return (o == this);
1431++ }
1432+ }
1433+Index: asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/LockManager.java
1434 ===================================================================
1435---- asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entitytupletranslators/IndexTupleTranslator.java (revision 1061)
1436-+++ asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entitytupletranslators/IndexTupleTranslator.java (working copy)
1437-@@ -96,13 +96,15 @@
1438- }
1439- Boolean isPrimaryIndex = ((ABoolean) rec.getValueByPos(MetadataRecordTypes.INDEX_ARECORD_ISPRIMARY_FIELD_INDEX))
1440- .getBoolean();
1441-+ int pendingOp = ((AInt32) rec.getValueByPos(MetadataRecordTypes.INDEX_ARECORD_PENDINGOP_FIELD_INDEX))
1442-+ .getIntegerValue();
1443- // Check if there is a gram length as well.
1444- int gramLength = -1;
1445- int gramLenPos = rec.getType().findFieldPosition(GRAM_LENGTH_FIELD_NAME);
1446- if (gramLenPos >= 0) {
1447- gramLength = ((AInt32) rec.getValueByPos(gramLenPos)).getIntegerValue();
1448- }
1449-- return new Index(dvName, dsName, indexName, indexStructure, searchKey, gramLength, isPrimaryIndex);
1450-+ return new Index(dvName, dsName, indexName, indexStructure, searchKey, gramLength, isPrimaryIndex, pendingOp);
1451- }
1452-
1453- @Override
1454-@@ -174,7 +176,12 @@
1455- stringSerde.serialize(aString, fieldValue.getDataOutput());
1456- recordBuilder.addField(MetadataRecordTypes.INDEX_ARECORD_TIMESTAMP_FIELD_INDEX, fieldValue);
1457-
1458-- // write optional field 7
1459-+ // write field 7
1460-+ fieldValue.reset();
1461-+ intSerde.serialize(new AInt32(instance.getPendingOp()), fieldValue.getDataOutput());
1462-+ recordBuilder.addField(MetadataRecordTypes.INDEX_ARECORD_PENDINGOP_FIELD_INDEX, fieldValue);
1463-+
1464-+ // write optional field 8
1465- if (instance.getGramLength() > 0) {
1466- fieldValue.reset();
1467- nameValue.reset();
1468-Index: asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java
1469+--- asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/LockManager.java (revision 1194)
1470++++ asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/LockManager.java (working copy)
1471+@@ -567,7 +567,7 @@
1472+ if (commitFlag) {
1473+ if (txnContext.getTransactionType().equals(TransactionContext.TransactionType.READ_WRITE)) {
1474+ try {
1475+- txnSubsystem.getLogManager().log(LogType.COMMIT, txnContext, datasetId.getId(),
1476++ txnSubsystem.getLogManager().log(LogType.ENTITY_COMMIT, txnContext, datasetId.getId(),
1477+ entityHashValue, -1, (byte) 0, 0, null, null, logicalLogLocator);
1478+ } catch (ACIDException e) {
1479+ try {
1480+Index: asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/FileBasedBuffer.java
1481 ===================================================================
1482---- asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java (revision 1061)
1483-+++ asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java (working copy)
1484-@@ -129,7 +129,7 @@
1485-
1486- public class AqlMetadataProvider implements IMetadataProvider<AqlSourceId, String> {
1487- private static Logger LOGGER = Logger.getLogger(AqlMetadataProvider.class.getName());
1488-- private final MetadataTransactionContext mdTxnCtx;
1489-+ private MetadataTransactionContext mdTxnCtx;
1490- private boolean isWriteTransaction;
1491- private Map<String, String[]> stores;
1492- private Map<String, String> config;
1493-@@ -156,8 +156,7 @@
1494- return config;
1495+--- asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/FileBasedBuffer.java (revision 1194)
1496++++ asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/FileBasedBuffer.java (working copy)
1497+@@ -75,6 +75,7 @@
1498+ buffer.position(0);
1499+ buffer.limit(size);
1500+ fileChannel.write(buffer);
1501++ fileChannel.force(false);
1502+ erase();
1503 }
1504
1505-- public AqlMetadataProvider(MetadataTransactionContext mdTxnCtx, Dataverse defaultDataverse) {
1506-- this.mdTxnCtx = mdTxnCtx;
1507-+ public AqlMetadataProvider(Dataverse defaultDataverse) {
1508- this.defaultDataverse = defaultDataverse;
1509- this.stores = AsterixProperties.INSTANCE.getStores();
1510- }
1511-@@ -181,6 +180,10 @@
1512- public void setWriterFactory(IAWriterFactory writerFactory) {
1513- this.writerFactory = writerFactory;
1514- }
1515-+
1516-+ public void setMetadataTxnContext(MetadataTransactionContext mdTxnCtx) {
1517-+ this.mdTxnCtx = mdTxnCtx;
1518-+ }
1519-
1520- public MetadataTransactionContext getMetadataTxnContext() {
1521- return mdTxnCtx;
1522-Index: asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entities/Dataset.java
1523+Index: asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogManager.java
1524 ===================================================================
1525---- asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entities/Dataset.java (revision 1061)
1526-+++ asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entities/Dataset.java (working copy)
1527-@@ -35,15 +35,18 @@
1528- private final DatasetType datasetType;
1529- private IDatasetDetails datasetDetails;
1530- private final int datasetId;
1531-+ // Type of pending operations with respect to atomic DDL operation
1532-+ private final int pendingOp;
1533+--- asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogManager.java (revision 1194)
1534++++ asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogManager.java (working copy)
1535+@@ -1,5 +1,5 @@
1536+ /*
1537+- * Copyright 2009-2010 by The Regents of the University of California
1538++ * Copyright 2009-2012 by The Regents of the University of California
1539+ * Licensed under the Apache License, Version 2.0 (the "License");
1540+ * you may not use this file except in compliance with the License.
1541+ * you may obtain a copy of the License from
1542+@@ -21,7 +21,12 @@
1543+ import java.io.RandomAccessFile;
1544+ import java.nio.ByteBuffer;
1545+ import java.nio.channels.FileChannel;
1546++import java.util.ArrayList;
1547++import java.util.HashMap;
1548++import java.util.List;
1549++import java.util.Map;
1550+ import java.util.Properties;
1551++import java.util.Set;
1552+ import java.util.concurrent.LinkedBlockingQueue;
1553+ import java.util.concurrent.atomic.AtomicInteger;
1554+ import java.util.concurrent.atomic.AtomicLong;
1555+@@ -30,22 +35,25 @@
1556
1557- public Dataset(String dataverseName, String datasetName, String itemTypeName, IDatasetDetails datasetDetails,
1558-- DatasetType datasetType, int datasetId) {
1559-+ DatasetType datasetType, int datasetId, int pendingOp) {
1560- this.dataverseName = dataverseName;
1561- this.datasetName = datasetName;
1562- this.itemTypeName = itemTypeName;
1563- this.datasetType = datasetType;
1564- this.datasetDetails = datasetDetails;
1565- this.datasetId = datasetId;
1566-+ this.pendingOp = pendingOp;
1567- }
1568+ import edu.uci.ics.asterix.transaction.management.exception.ACIDException;
1569+ import edu.uci.ics.asterix.transaction.management.service.logging.IndexLogger.ReusableLogContentObject;
1570++import edu.uci.ics.asterix.transaction.management.service.logging.LogManager.PageOwnershipStatus;
1571++import edu.uci.ics.asterix.transaction.management.service.logging.LogManager.PageState;
1572+ import edu.uci.ics.asterix.transaction.management.service.transaction.TransactionContext;
1573+ import edu.uci.ics.asterix.transaction.management.service.transaction.TransactionManagementConstants;
1574+ import edu.uci.ics.asterix.transaction.management.service.transaction.TransactionSubsystem;
1575++import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
1576
1577- public String getDataverseName() {
1578-@@ -73,6 +76,10 @@
1579- public int getDatasetId() {
1580- return datasetId;
1581- }
1582-+
1583-+ public int getPendingOp() {
1584-+ return pendingOp;
1585-+ }
1586+ public class LogManager implements ILogManager {
1587
1588- @Override
1589- public Object addToCache(MetadataCache cache) {
1590-Index: asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entities/Index.java
1591-===================================================================
1592---- asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entities/Index.java (revision 1061)
1593-+++ asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entities/Index.java (working copy)
1594-@@ -45,9 +45,11 @@
1595- private final boolean isPrimaryIndex;
1596- // Specific to NGRAM indexes.
1597- private final int gramLength;
1598-+ // Type of pending operations with respect to atomic DDL operation
1599-+ private final int pendingOp;
1600+ public static final boolean IS_DEBUG_MODE = false;//true
1601+ private static final Logger LOGGER = Logger.getLogger(LogManager.class.getName());
1602+- private TransactionSubsystem provider;
1603++ private final TransactionSubsystem provider;
1604+ private LogManagerProperties logManagerProperties;
1605++ private LogPageFlushThread logPageFlusher;
1606
1607- public Index(String dataverseName, String datasetName, String indexName, IndexType indexType,
1608-- List<String> keyFieldNames, int gramLength, boolean isPrimaryIndex) {
1609-+ List<String> keyFieldNames, int gramLength, boolean isPrimaryIndex, int pendingOp) {
1610- this.dataverseName = dataverseName;
1611- this.datasetName = datasetName;
1612- this.indexName = indexName;
1613-@@ -55,10 +57,11 @@
1614- this.keyFieldNames = keyFieldNames;
1615- this.gramLength = gramLength;
1616- this.isPrimaryIndex = isPrimaryIndex;
1617-+ this.pendingOp = pendingOp;
1618- }
1619+ /*
1620+ * the array of log pages. The number of log pages is configurable. Pages
1621+ * taken together form an in-memory log buffer.
1622+ */
1623+-
1624+ private IFileBasedBuffer[] logPages;
1625
1626- public Index(String dataverseName, String datasetName, String indexName, IndexType indexType,
1627-- List<String> keyFieldNames, boolean isPrimaryIndex) {
1628-+ List<String> keyFieldNames, boolean isPrimaryIndex, int pendingOp) {
1629- this.dataverseName = dataverseName;
1630- this.datasetName = datasetName;
1631- this.indexName = indexName;
1632-@@ -66,6 +69,7 @@
1633- this.keyFieldNames = keyFieldNames;
1634- this.gramLength = -1;
1635- this.isPrimaryIndex = isPrimaryIndex;
1636-+ this.pendingOp = pendingOp;
1637- }
1638+ private ILogRecordHelper logRecordHelper;
1639+@@ -54,6 +62,7 @@
1640+ * Number of log pages that constitute the in-memory log buffer.
1641+ */
1642+ private int numLogPages;
1643++
1644+ /*
1645+ * Initially all pages have an owner count of 1 that is the LogManager. When
1646+ * a transaction requests to write in a log page, the owner count is
1647+@@ -62,12 +71,11 @@
1648+ * (covering the whole log record). When the content has been put, the log
1649+ * manager computes the checksum and puts it after the content. At this
1650+ * point, the ownership count is decremented as the transaction is done with
1651+- * using the page. When a page is full, the log manager decrements the count
1652+- * by one indicating that it has released its ownership of the log page.
1653+- * There could be other transaction(s) still owning the page (that is they
1654+- * could still be mid-way putting the log content). When the ownership count
1655+- * eventually reaches zero, the thread responsible for flushing the log page
1656+- * is notified and the page is flushed to disk.
1657++ * using the page. When a page is requested to be flushed, logPageFlusher
1658++ * set the count to 0(LOG_FLUSHER: meaning that the page is being flushed)
1659++ * only if the count is 1(LOG_WRITER: meaning that there is no other
1660++ * transactions who own the page to write logs.) After flushing the page,
1661++ * logPageFlusher set this count to 1.
1662+ */
1663+ private AtomicInteger[] logPageOwnerCount;
1664
1665- public String getDataverseName() {
1666-@@ -95,6 +99,10 @@
1667- public boolean isPrimaryIndex() {
1668- return isPrimaryIndex;
1669- }
1670-+
1671-+ public int getPendingOp() {
1672-+ return pendingOp;
1673-+ }
1674+@@ -78,18 +86,16 @@
1675
1676- public boolean isSecondaryIndex() {
1677- return !isPrimaryIndex();
1678-Index: asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entities/Dataverse.java
1679-===================================================================
1680---- asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entities/Dataverse.java (revision 1061)
1681-+++ asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entities/Dataverse.java (working copy)
1682-@@ -27,10 +27,12 @@
1683- // Enforced to be unique within an Asterix cluster..
1684- private final String dataverseName;
1685- private final String dataFormat;
1686-+ private final int pendingOp;
1687+ /*
1688+ * LogPageStatus: A page is either ACTIVE or INACTIVE. The status for each
1689+- * page is maintained in a map called logPageStatus. A page is ACTIVE when
1690+- * the LogManager can allocate space in the page for writing a log record.
1691+- * Initially all pages are ACTIVE. As transactions fill up space by writing
1692+- * log records, a page may not have sufficient space left for serving a
1693+- * request by a transaction. When this happens, the page is marked INACTIVE.
1694+- * An INACTIVE page with no owners ( logPageOwnerCount.get(<pageIndex>) ==
1695+- * 0) indicates that the page must be flushed to disk before any other log
1696+- * record is written on the page.F
1697++ * page is maintained in logPageStatus. A page is ACTIVE when the LogManager
1698++ * can allocate space in the page for writing a log record. Initially all
1699++ * pages are ACTIVE. As transactions fill up space by writing log records,
1700++ * a page may not have sufficient space left for serving a request by a
1701++ * transaction. When this happens, the page is flushed to disk by calling
1702++ * logPageFlusher.requestFlush(). In the requestFlush(), after groupCommitWaitTime,
1703++ * the page status is set to INACTIVE. Then, there is no more writer on the
1704++ * page(meaning the corresponding logPageOwnerCount is 1), the page is flushed
1705++ * by the logPageFlusher and the status is reset to ACTIVE by the logPageFlusher.
1706+ */
1707+-
1708+- // private Map<Integer, Integer> logPageStatus = new
1709+- // ConcurrentHashMap<Integer, Integer>();
1710+ private AtomicInteger[] logPageStatus;
1711
1712-- public Dataverse(String dataverseName, String format) {
1713-+ public Dataverse(String dataverseName, String format, int pendingOp) {
1714- this.dataverseName = dataverseName;
1715- this.dataFormat = format;
1716-+ this.pendingOp = pendingOp;
1717+ static class PageState {
1718+@@ -98,41 +104,8 @@
1719 }
1720
1721- public String getDataverseName() {
1722-@@ -40,6 +42,10 @@
1723- public String getDataFormat() {
1724- return dataFormat;
1725- }
1726-+
1727-+ public int getPendingOp() {
1728-+ return pendingOp;
1729-+ }
1730+ private AtomicLong lastFlushedLsn = new AtomicLong(-1);
1731+- private AtomicInteger lastFlushedPage = new AtomicInteger(-1);
1732
1733- @Override
1734- public Object addToCache(MetadataCache cache) {
1735-Index: asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataNode.java
1736-===================================================================
1737---- asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataNode.java (revision 1061)
1738-+++ asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataNode.java (working copy)
1739-@@ -25,6 +25,7 @@
1740- import edu.uci.ics.asterix.common.exceptions.AsterixException;
1741- import edu.uci.ics.asterix.common.functions.FunctionSignature;
1742- import edu.uci.ics.asterix.formats.nontagged.AqlSerializerDeserializerProvider;
1743-+import edu.uci.ics.asterix.metadata.api.IMetadataEntity;
1744- import edu.uci.ics.asterix.metadata.api.IMetadataIndex;
1745- import edu.uci.ics.asterix.metadata.api.IMetadataNode;
1746- import edu.uci.ics.asterix.metadata.api.IValueExtractor;
1747-@@ -160,7 +161,7 @@
1748- // Add the primary index for the dataset.
1749- InternalDatasetDetails id = (InternalDatasetDetails) dataset.getDatasetDetails();
1750- Index primaryIndex = new Index(dataset.getDataverseName(), dataset.getDatasetName(),
1751-- dataset.getDatasetName(), IndexType.BTREE, id.getPrimaryKey(), true);
1752-+ dataset.getDatasetName(), IndexType.BTREE, id.getPrimaryKey(), true, dataset.getPendingOp());
1753- addIndex(jobId, primaryIndex);
1754- ITupleReference nodeGroupTuple = createTuple(id.getNodeGroupName(), dataset.getDataverseName(),
1755- dataset.getDatasetName());
1756-@@ -260,7 +261,7 @@
1757- IIndexAccessor indexAccessor = indexInstance.createAccessor(NoOpOperationCallback.INSTANCE,
1758- NoOpOperationCallback.INSTANCE);
1759- TransactionContext txnCtx = transactionProvider.getTransactionManager().getTransactionContext(jobId);
1760-- transactionProvider.getLockManager().lock(index.getDatasetId(), -1, LockMode.X, txnCtx);
1761-+ //transactionProvider.getLockManager().lock(index.getDatasetId(), -1, LockMode.X, txnCtx);
1762- // TODO: fix exceptions once new BTree exception model is in hyracks.
1763- indexAccessor.insert(tuple);
1764- //TODO: extract the key from the tuple and get the PKHashValue from the key.
1765-@@ -536,7 +537,7 @@
1766- // The transaction with txnId will have an S lock on the
1767- // resource. Note that lock converters have a higher priority than
1768- // regular waiters in the LockManager.
1769-- transactionProvider.getLockManager().lock(index.getDatasetId(), -1, LockMode.X, txnCtx);
1770-+ //transactionProvider.getLockManager().lock(index.getDatasetId(), -1, LockMode.X, txnCtx);
1771- indexAccessor.delete(tuple);
1772- //TODO: extract the key from the tuple and get the PKHashValue from the key.
1773- //check how to get the oldValue.
1774-@@ -803,7 +804,9 @@
1775- private <ResultType> void searchIndex(JobId jobId, IMetadataIndex index, ITupleReference searchKey,
1776- IValueExtractor<ResultType> valueExtractor, List<ResultType> results) throws Exception {
1777- TransactionContext txnCtx = transactionProvider.getTransactionManager().getTransactionContext(jobId);
1778-- transactionProvider.getLockManager().lock(index.getDatasetId(), -1, LockMode.S, txnCtx);
1779-+ //#. currently lock is not needed to access any metadata
1780-+ // since the non-compatible concurrent access is always protected by the latch in the MetadataManager.
1781-+ //transactionProvider.getLockManager().lock(index.getDatasetId(), -1, LockMode.S, txnCtx);
1782- IBinaryComparatorFactory[] comparatorFactories = index.getKeyBinaryComparatorFactory();
1783- long resourceID = index.getResourceID();
1784- IIndex indexInstance = indexLifecycleManager.getIndex(resourceID);
1785-Index: asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/api/IMetadataEntity.java
1786-===================================================================
1787---- asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/api/IMetadataEntity.java (revision 1061)
1788-+++ asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/api/IMetadataEntity.java (working copy)
1789-@@ -20,6 +20,11 @@
1790- import edu.uci.ics.asterix.metadata.MetadataCache;
1791-
1792- public interface IMetadataEntity extends Serializable {
1793-+
1794-+ public static final int PENDING_NO_OP = 0;
1795-+ public static final int PENDING_ADD_OP = 1;
1796-+ public static final int PENDING_DROP_OP = 2;
1797-+
1798- Object addToCache(MetadataCache cache);
1799-
1800- Object dropFromCache(MetadataCache cache);
1801-Index: asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataManager.java
1802-===================================================================
1803---- asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataManager.java (revision 1061)
1804-+++ asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataManager.java (working copy)
1805-@@ -17,6 +17,8 @@
1806-
1807- import java.rmi.RemoteException;
1808- import java.util.List;
1809-+import java.util.concurrent.locks.ReadWriteLock;
1810-+import java.util.concurrent.locks.ReentrantReadWriteLock;
1811-
1812- import edu.uci.ics.asterix.common.functions.FunctionSignature;
1813- import edu.uci.ics.asterix.metadata.api.IAsterixStateProxy;
1814-@@ -79,11 +81,10 @@
1815- public class MetadataManager implements IMetadataManager {
1816- // Set in init().
1817- public static MetadataManager INSTANCE;
1818+ /*
1819+- * pendingFlushRequests is a map with key as Integer denoting the page
1820+- * index. When a (transaction) thread discovers the need to flush a page, it
1821+- * puts its Thread object into the corresponding value that is a
1822+- * LinkedBlockingQueue. The LogManager has a LogFlusher thread that scans
1823+- * this map in order of page index (and circling around). The flusher thread
1824+- * needs to flush pages in order and waits for a thread to deposit an object
1825+- * in the blocking queue corresponding to the next page in order. A request
1826+- * to flush a page is conveyed to the flush thread by simply depositing an
1827+- * object in to corresponding blocking queue. It is blocking in the sense
1828+- * that the flusher thread will continue to wait for an object to arrive in
1829+- * the queue. The object itself is ignored by the fliusher and just acts as
1830+- * a signal/event that a page needs to be flushed.
1831+- */
1832 -
1833- private final MetadataCache cache = new MetadataCache();
1834- private IAsterixStateProxy proxy;
1835- private IMetadataNode metadataNode;
1836+- private LinkedBlockingQueue[] pendingFlushRequests;
1837 -
1838-+
1839- public MetadataManager(IAsterixStateProxy proxy) {
1840- if (proxy == null) {
1841- throw new Error("Null proxy given to MetadataManager.");
1842-@@ -206,11 +207,14 @@
1843+- /*
1844+- * ICommitResolver is an interface that provides an API that can answer a
1845+- * simple boolean - Given the commit requests so far, should a page be
1846+- * flushed. The implementation of the interface contains the logic (or you
1847+- * can say the policy) for commit. It could be group commit in which case
1848+- * the commit resolver may not return a true indicating that it wishes to
1849+- * delay flushing of the page.
1850+- */
1851+- private ICommitResolver commitResolver;
1852+-
1853+- /*
1854+- * An object that keeps track of the submitted commit requests.
1855+- */
1856+- private CommitRequestStatistics commitRequestStatistics;
1857+-
1858+- /*
1859+ * When the transaction eco-system comes to life, the log manager positions
1860+ * itself to the end of the last written log. the startingLsn represent the
1861+ * lsn value of the next log record to be written after a system (re)start.
1862+@@ -146,16 +119,10 @@
1863+ */
1864+ private AtomicLong lsn = new AtomicLong(0);
1865
1866- @Override
1867- public void addDataset(MetadataTransactionContext ctx, Dataset dataset) throws MetadataException {
1868-+ // add dataset into metadataNode
1869- try {
1870- metadataNode.addDataset(ctx.getJobId(), dataset);
1871- } catch (RemoteException e) {
1872- throw new MetadataException(e);
1873- }
1874-+
1875-+ // reflect the dataset into the cache
1876- ctx.addDataset(dataset);
1877+- /*
1878+- * A map that tracks the flush requests submitted for each page. The
1879+- * requests for a page are cleared when the page is flushed.
1880+- */
1881+- public LinkedBlockingQueue<Thread> getPendingFlushRequests(int pageIndex) {
1882+- return pendingFlushRequests[pageIndex];
1883+- }
1884++ private List<HashMap<TransactionContext, Integer>> activeTxnCountMaps;
1885+
1886+- public void addFlushRequest(int pageIndex) {
1887+- pendingFlushRequests[pageIndex].add(pendingFlushRequests);
1888++ public void addFlushRequest(int pageIndex, long lsn, boolean isSynchronous) {
1889++ logPageFlusher.requestFlush(pageIndex, lsn, isSynchronous);
1890 }
1891
1892-@@ -585,4 +589,5 @@
1893+ public AtomicLong getLastFlushedLsn() {
1894+@@ -233,19 +200,12 @@
1895+ numLogPages = logManagerProperties.getNumLogPages();
1896+ logPageOwnerCount = new AtomicInteger[numLogPages];
1897+ logPageStatus = new AtomicInteger[numLogPages];
1898+- pendingFlushRequests = new LinkedBlockingQueue[numLogPages];
1899+- if (logManagerProperties.getGroupCommitWaitPeriod() > 0) { // configure
1900+- // the
1901+- // Commit
1902+- // Resolver
1903+- commitResolver = new GroupCommitResolver(); // Group Commit is
1904+- // enabled
1905+- commitRequestStatistics = new CommitRequestStatistics(numLogPages);
1906+- } else {
1907+- commitResolver = new BasicCommitResolver(); // the basic commit
1908+- // resolver
1909++
1910++ activeTxnCountMaps = new ArrayList<HashMap<TransactionContext, Integer>>(numLogPages);
1911++ for (int i = 0; i < numLogPages; i++) {
1912++ activeTxnCountMaps.add(new HashMap<TransactionContext, Integer>());
1913 }
1914- return adapter;
1915- }
1916+- this.commitResolver.init(this); // initialize the commit resolver
1917 +
1918- }
1919-\ No newline at end of file
1920-Index: asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataTransactionContext.java
1921-===================================================================
1922---- asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataTransactionContext.java (revision 1061)
1923-+++ asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataTransactionContext.java (working copy)
1924-@@ -19,6 +19,7 @@
1925+ logPages = new FileBasedBuffer[numLogPages];
1926
1927- import edu.uci.ics.asterix.common.functions.FunctionSignature;
1928- import edu.uci.ics.asterix.external.dataset.adapter.AdapterIdentifier;
1929-+import edu.uci.ics.asterix.metadata.api.IMetadataEntity;
1930- import edu.uci.ics.asterix.metadata.entities.Dataset;
1931- import edu.uci.ics.asterix.metadata.entities.DatasourceAdapter;
1932- import edu.uci.ics.asterix.metadata.entities.Datatype;
1933-@@ -104,19 +105,19 @@
1934- }
1935+ /*
1936+@@ -264,7 +224,6 @@
1937+ for (int i = 0; i < numLogPages; i++) {
1938+ logPageOwnerCount[i] = new AtomicInteger(PageOwnershipStatus.LOG_WRITER);
1939+ logPageStatus[i] = new AtomicInteger(PageState.ACTIVE);
1940+- pendingFlushRequests[i] = new LinkedBlockingQueue<Thread>();
1941+ }
1942
1943- public void dropDataset(String dataverseName, String datasetName) {
1944-- Dataset dataset = new Dataset(dataverseName, datasetName, null, null, null, -1);
1945-+ Dataset dataset = new Dataset(dataverseName, datasetName, null, null, null, -1, IMetadataEntity.PENDING_NO_OP);
1946- droppedCache.addDatasetIfNotExists(dataset);
1947- logAndApply(new MetadataLogicalOperation(dataset, false));
1948+ /*
1949+@@ -278,9 +237,9 @@
1950+ * daemon thread so that it does not stop the JVM from exiting when all
1951+ * other threads are done with their work.
1952+ */
1953+- LogPageFlushThread logFlusher = new LogPageFlushThread(this);
1954+- logFlusher.setDaemon(true);
1955+- logFlusher.start();
1956++ logPageFlusher = new LogPageFlushThread(this);
1957++ logPageFlusher.setDaemon(true);
1958++ logPageFlusher.start();
1959 }
1960
1961- public void dropIndex(String dataverseName, String datasetName, String indexName) {
1962-- Index index = new Index(dataverseName, datasetName, indexName, null, null, false);
1963-+ Index index = new Index(dataverseName, datasetName, indexName, null, null, false, IMetadataEntity.PENDING_NO_OP);
1964- droppedCache.addIndexIfNotExists(index);
1965- logAndApply(new MetadataLogicalOperation(index, false));
1966- }
1967-
1968- public void dropDataverse(String dataverseName) {
1969-- Dataverse dataverse = new Dataverse(dataverseName, null);
1970-+ Dataverse dataverse = new Dataverse(dataverseName, null, IMetadataEntity.PENDING_NO_OP);
1971- droppedCache.addDataverseIfNotExists(dataverse);
1972- logAndApply(new MetadataLogicalOperation(dataverse, false));
1973- }
1974-@@ -162,7 +163,7 @@
1975+ public int getLogPageIndex(long lsnValue) {
1976+@@ -312,7 +271,7 @@
1977+ */
1978+ private void waitUntillPageIsAvailableForWritingLog(int pageIndex) throws ACIDException {
1979+ if (logPageStatus[pageIndex].get() == PageState.ACTIVE
1980+- && getLogPageOwnershipCount(pageIndex).get() >= PageOwnershipStatus.LOG_WRITER) {
1981++ && logPageOwnerCount[pageIndex].get() >= PageOwnershipStatus.LOG_WRITER) {
1982+ return;
1983 }
1984- return droppedCache.getDataset(dataverseName, datasetName) != null;
1985- }
1986--
1987+ try {
1988+@@ -338,47 +297,40 @@
1989+ */
1990+ private long getLsn(int entrySize, byte logType) throws ACIDException {
1991+ long pageSize = logManagerProperties.getLogPageSize();
1992+- boolean requiresFlushing = logType == LogType.COMMIT;
1993 +
1994- public boolean indexIsDropped(String dataverseName, String datasetName, String indexName) {
1995- if (droppedCache.getDataverse(dataverseName) != null) {
1996- return true;
1997-Index: asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataRecordTypes.java
1998-===================================================================
1999---- asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataRecordTypes.java (revision 1061)
2000-+++ asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataRecordTypes.java (working copy)
2001-@@ -80,10 +80,11 @@
2002- public static final int DATAVERSE_ARECORD_NAME_FIELD_INDEX = 0;
2003- public static final int DATAVERSE_ARECORD_FORMAT_FIELD_INDEX = 1;
2004- public static final int DATAVERSE_ARECORD_TIMESTAMP_FIELD_INDEX = 2;
2005-+ public static final int DATAVERSE_ARECORD_PENDINGOP_FIELD_INDEX = 3;
2006+ while (true) {
2007+ boolean forwardPage = false;
2008+- boolean shouldFlushPage = false;
2009+ long old = lsn.get();
2010+- int pageIndex = getLogPageIndex(old); // get the log page
2011+- // corresponding to the
2012+- // current lsn value
2013++
2014++ //get the log page corresponding to the current lsn value
2015++ int pageIndex = getLogPageIndex(old);
2016+ long retVal = old;
2017+- long next = old + entrySize; // the lsn value for the next request,
2018+- // if the current request is served.
2019++
2020++ // the lsn value for the next request if the current request is served.
2021++ long next = old + entrySize;
2022+ int prevPage = -1;
2023+- if ((next - 1) / pageSize != old / pageSize // check if the log
2024+- // record will cross
2025+- // page boundaries, a
2026+- // case that is not
2027+- // allowed.
2028+- || (next % pageSize == 0)) {
2029++
2030++ // check if the log record will cross page boundaries, a case that is not allowed.
2031++ if ((next - 1) / pageSize != old / pageSize || (next % pageSize == 0)) {
2032++
2033+ if ((old != 0 && old % pageSize == 0)) {
2034+- retVal = old; // On second thought, this shall never be the
2035+- // case as it means that the lsn is
2036+- // currently at the beginning of a page and
2037+- // we still need to forward the page which
2038+- // means that the entrySize exceeds a log
2039+- // page size. If this is the case, an
2040+- // exception is thrown before calling this
2041+- // API.
2042+- // would remove this case.
2043++ // On second thought, this shall never be the case as it means that the lsn is
2044++ // currently at the beginning of a page and we still need to forward the page which
2045++ // means that the entrySize exceeds a log page size. If this is the case, an
2046++ // exception is thrown before calling this API. would remove this case.
2047++ retVal = old;
2048
2049- private static final ARecordType createDataverseRecordType() {
2050-- return new ARecordType("DataverseRecordType", new String[] { "DataverseName", "DataFormat", "Timestamp" },
2051-- new IAType[] { BuiltinType.ASTRING, BuiltinType.ASTRING, BuiltinType.ASTRING }, true);
2052-+ return new ARecordType("DataverseRecordType", new String[] { "DataverseName", "DataFormat", "Timestamp", "PendingOp" },
2053-+ new IAType[] { BuiltinType.ASTRING, BuiltinType.ASTRING, BuiltinType.ASTRING, BuiltinType.AINT32 }, true);
2054- }
2055+ } else {
2056+- retVal = ((old / pageSize) + 1) * pageSize; // set the lsn
2057+- // to point to
2058+- // the beginning
2059+- // of the next
2060+- // page.
2061++ // set the lsn to point to the beginning of the next page.
2062++ retVal = ((old / pageSize) + 1) * pageSize;
2063+ }
2064++
2065+ next = retVal;
2066+- forwardPage = true; // as the log record shall cross log page
2067+- // boundary, we must re-assign the lsn (so
2068+- // that the log record begins on a different
2069+- // location.
2070++
2071++ // as the log record shall cross log page boundary, we must re-assign the lsn so
2072++ // that the log record begins on a different location.
2073++ forwardPage = true;
2074++
2075+ prevPage = pageIndex;
2076+ pageIndex = getNextPageInSequence(pageIndex);
2077+ }
2078+@@ -397,109 +349,51 @@
2079+ */
2080+ waitUntillPageIsAvailableForWritingLog(pageIndex);
2081
2082- // Helper constants for accessing fields in an ARecord of anonymous type
2083-@@ -158,10 +159,11 @@
2084- public static final int DATASET_ARECORD_FEEDDETAILS_FIELD_INDEX = 6;
2085- public static final int DATASET_ARECORD_TIMESTAMP_FIELD_INDEX = 7;
2086- public static final int DATASET_ARECORD_DATASETID_FIELD_INDEX = 8;
2087-+ public static final int DATASET_ARECORD_PENDINGOP_FIELD_INDEX = 9;
2088+- if (!forwardPage && requiresFlushing) {
2089+- shouldFlushPage = commitResolver.shouldCommitPage(pageIndex, this, commitRequestStatistics);
2090+- if (shouldFlushPage) {
2091+- next = ((next / pageSize) + 1) * pageSize; /*
2092+- * next
2093+- * represents the
2094+- * next value of
2095+- * lsn after this
2096+- * log record has
2097+- * been written.
2098+- * If the page
2099+- * needs to be
2100+- * flushed, then
2101+- * we do not give
2102+- * any more LSNs
2103+- * from this
2104+- * page.
2105+- */
2106+- }
2107+- }
2108+- if (!lsn.compareAndSet(old, next)) { // Atomic call -> returns true
2109+- // only when the value
2110+- // represented by lsn is same as
2111+- // "old". The value is updated
2112+- // to "next".
2113++ if (!lsn.compareAndSet(old, next)) {
2114++ // Atomic call -> returns true only when the value represented by lsn is same as
2115++ // "old". The value is updated to "next".
2116+ continue;
2117+ }
2118
2119- private static final ARecordType createDatasetRecordType() {
2120- String[] fieldNames = { "DataverseName", "DatasetName", "DataTypeName", "DatasetType", "InternalDetails",
2121-- "ExternalDetails", "FeedDetails", "Timestamp", "DatasetId" };
2122-+ "ExternalDetails", "FeedDetails", "Timestamp", "DatasetId", "PendingOp" };
2123+ if (forwardPage) {
2124+- //TODO
2125+- //this is not safe since the incoming thread may reach the same page slot with this page
2126+- //(differ by the log buffer size)
2127+- logPageStatus[prevPage].set(PageState.INACTIVE); // mark
2128+- // previous
2129+- // page
2130+- // inactive
2131++ addFlushRequest(prevPage, old, false);
2132
2133- List<IAType> internalRecordUnionList = new ArrayList<IAType>();
2134- internalRecordUnionList.add(BuiltinType.ANULL);
2135-@@ -179,7 +181,8 @@
2136- AUnionType feedRecordUnion = new AUnionType(feedRecordUnionList, null);
2137-
2138- IAType[] fieldTypes = { BuiltinType.ASTRING, BuiltinType.ASTRING, BuiltinType.ASTRING, BuiltinType.ASTRING,
2139-- internalRecordUnion, externalRecordUnion, feedRecordUnion, BuiltinType.ASTRING, BuiltinType.AINT32 };
2140-+ internalRecordUnion, externalRecordUnion, feedRecordUnion, BuiltinType.ASTRING, BuiltinType.AINT32,
2141-+ BuiltinType.AINT32 };
2142- return new ARecordType("DatasetRecordType", fieldNames, fieldTypes, true);
2143+- /*
2144+- * decrement on the behalf of the log manager. if there are no
2145+- * more owners (count == 0) the page must be marked as a
2146+- * candidate to be flushed.
2147+- */
2148+- int pageDirtyCount = getLogPageOwnershipCount(prevPage).decrementAndGet();
2149+- if (pageDirtyCount == 0) {
2150+- addFlushRequest(prevPage);
2151+- }
2152+-
2153+- /*
2154+- * The transaction thread that discovers the need to forward a
2155+- * page is made to re-acquire a lsn.
2156+- */
2157++ // The transaction thread that discovers the need to forward a
2158++ // page is made to re-acquire a lsn.
2159+ continue;
2160++
2161+ } else {
2162+- /*
2163+- * the transaction thread has been given a space in a log page,
2164+- * but is made to wait until the page is available.
2165+- */
2166++ // the transaction thread has been given a space in a log page,
2167++ // but is made to wait until the page is available.
2168++ // (Is this needed? when does this wait happen?)
2169+ waitUntillPageIsAvailableForWritingLog(pageIndex);
2170+- /*
2171+- * increment the counter as the transaction thread now holds a
2172+- * space in the log page and hence is an owner.
2173+- */
2174++
2175++ // increment the counter as the transaction thread now holds a
2176++ // space in the log page and hence is an owner.
2177+ logPageOwnerCount[pageIndex].incrementAndGet();
2178+- }
2179+- if (requiresFlushing) {
2180+- if (!shouldFlushPage) {
2181+- /*
2182+- * the log record requires the page to be flushed but under
2183+- * the commit policy, the flush task has been deferred. The
2184+- * transaction thread submits its request to flush the page.
2185+- */
2186+- commitRequestStatistics.registerCommitRequest(pageIndex);
2187+- } else {
2188+- /*
2189+- * the flush request was approved by the commit resolver.
2190+- * Thus the page is marked INACTIVE as no more logs will be
2191+- * written on this page. The log manager needs to release
2192+- * its ownership. Note that transaction threads may still
2193+- * continue to be owners of the log page till they fill up
2194+- * the space allocated to them.
2195+- */
2196+- logPageStatus[pageIndex].set(PageState.INACTIVE);
2197+- logPageOwnerCount[pageIndex].decrementAndGet(); // on
2198+- // the
2199+- // behalf
2200+- // of
2201+- // log
2202+- // manager
2203++
2204++ // Before the count is incremented, if the flusher flushed the allocated page,
2205++ // then retry to get new LSN. Otherwise, the log with allocated lsn will be lost.
2206++ if (lastFlushedLsn.get() >= retVal) {
2207++ logPageOwnerCount[pageIndex].decrementAndGet();
2208++ continue;
2209+ }
2210+ }
2211++
2212+ return retVal;
2213+ }
2214 }
2215
2216-@@ -264,13 +267,14 @@
2217- public static final int INDEX_ARECORD_SEARCHKEY_FIELD_INDEX = 4;
2218- public static final int INDEX_ARECORD_ISPRIMARY_FIELD_INDEX = 5;
2219- public static final int INDEX_ARECORD_TIMESTAMP_FIELD_INDEX = 6;
2220-+ public static final int INDEX_ARECORD_PENDINGOP_FIELD_INDEX = 7;
2221+ @Override
2222+- public void log(byte logType, TransactionContext context, int datasetId, int PKHashValue, long resourceId,
2223++ public void log(byte logType, TransactionContext txnCtx, int datasetId, int PKHashValue, long resourceId,
2224+ byte resourceMgrId, int logContentSize, ReusableLogContentObject reusableLogContentObject, ILogger logger,
2225+ LogicalLogLocator logicalLogLocator) throws ACIDException {
2226+- /*
2227+- * logLocator is a re-usable object that is appropriately set in each
2228+- * invocation. If the reference is null, the log manager must throw an
2229+- * exception
2230+- */
2231++
2232++ HashMap<TransactionContext, Integer> map = null;
2233++ int activeTxnCount;
2234++
2235++ // logLocator is a re-usable object that is appropriately set in each invocation.
2236++ // If the reference is null, the log manager must throw an exception.
2237+ if (logicalLogLocator == null) {
2238+ throw new ACIDException(
2239+ " you need to pass in a non-null logLocator, if you dont have it, then pass in a dummy so that the +"
2240+@@ -519,20 +413,19 @@
2241
2242- private static final ARecordType createIndexRecordType() {
2243- AOrderedListType olType = new AOrderedListType(BuiltinType.ASTRING, null);
2244- String[] fieldNames = { "DataverseName", "DatasetName", "IndexName", "IndexStructure", "SearchKey",
2245-- "IsPrimary", "Timestamp" };
2246-+ "IsPrimary", "Timestamp", "PendingOp" };
2247- IAType[] fieldTypes = { BuiltinType.ASTRING, BuiltinType.ASTRING, BuiltinType.ASTRING, BuiltinType.ASTRING,
2248-- olType, BuiltinType.ABOOLEAN, BuiltinType.ASTRING };
2249-+ olType, BuiltinType.ABOOLEAN, BuiltinType.ASTRING, BuiltinType.AINT32 };
2250- return new ARecordType("IndexRecordType", fieldNames, fieldTypes, true);
2251- };
2252+ // all constraints checked and we are good to go and acquire a lsn.
2253+ long previousLSN = -1;
2254+- long currentLSN; // the will be set to the location (a long value)
2255+- // where the log record needs to be placed.
2256
2257-Index: asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataBootstrap.java
2258-===================================================================
2259---- asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataBootstrap.java (revision 1061)
2260-+++ asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataBootstrap.java (working copy)
2261-@@ -31,6 +31,7 @@
2262- import edu.uci.ics.asterix.metadata.IDatasetDetails;
2263- import edu.uci.ics.asterix.metadata.MetadataManager;
2264- import edu.uci.ics.asterix.metadata.MetadataTransactionContext;
2265-+import edu.uci.ics.asterix.metadata.api.IMetadataEntity;
2266- import edu.uci.ics.asterix.metadata.api.IMetadataIndex;
2267- import edu.uci.ics.asterix.metadata.entities.AsterixBuiltinTypeMap;
2268- import edu.uci.ics.asterix.metadata.entities.Dataset;
2269-@@ -226,7 +227,7 @@
2270- public static void insertInitialDataverses(MetadataTransactionContext mdTxnCtx) throws Exception {
2271- String dataverseName = MetadataPrimaryIndexes.DATAVERSE_DATASET.getDataverseName();
2272- String dataFormat = NonTaggedDataFormat.NON_TAGGED_DATA_FORMAT;
2273-- MetadataManager.INSTANCE.addDataverse(mdTxnCtx, new Dataverse(dataverseName, dataFormat));
2274-+ MetadataManager.INSTANCE.addDataverse(mdTxnCtx, new Dataverse(dataverseName, dataFormat, IMetadataEntity.PENDING_NO_OP));
2275- }
2276+- /*
2277+- * The logs written by a transaction need to be linked to each other for
2278+- * a successful rollback/recovery. However there could be multiple
2279+- * threads operating concurrently that are part of a common transaction.
2280+- * These threads need to synchronize and record the lsn corresponding to
2281+- * the last log record written by (any thread of) the transaction.
2282+- */
2283+- synchronized (context) {
2284+- previousLSN = context.getLastLogLocator().getLsn();
2285++ // the will be set to the location (a long value) where the log record needs to be placed.
2286++ long currentLSN;
2287++
2288++ // The logs written by a transaction need to be linked to each other for
2289++ // a successful rollback/recovery. However there could be multiple
2290++ // threads operating concurrently that are part of a common transaction.
2291++ // These threads need to synchronize and record the lsn corresponding to
2292++ // the last log record written by (any thread of) the transaction.
2293++ synchronized (txnCtx) {
2294++ previousLSN = txnCtx.getLastLogLocator().getLsn();
2295+ currentLSN = getLsn(totalLogSize, logType);
2296+- context.setLastLSN(currentLSN);
2297++ txnCtx.setLastLSN(currentLSN);
2298+ if (IS_DEBUG_MODE) {
2299+ System.out.println("--------------> LSN(" + currentLSN + ") is allocated");
2300+ }
2301+@@ -547,48 +440,37 @@
2302+ * performed correctly that is ownership is released.
2303+ */
2304
2305- public static void insertInitialDatasets(MetadataTransactionContext mdTxnCtx) throws Exception {
2306-@@ -236,7 +237,7 @@
2307- primaryIndexes[i].getNodeGroupName());
2308- MetadataManager.INSTANCE.addDataset(mdTxnCtx, new Dataset(primaryIndexes[i].getDataverseName(),
2309- primaryIndexes[i].getIndexedDatasetName(), primaryIndexes[i].getPayloadRecordType().getTypeName(),
2310-- id, DatasetType.INTERNAL, primaryIndexes[i].getDatasetId().getId()));
2311-+ id, DatasetType.INTERNAL, primaryIndexes[i].getDatasetId().getId(), IMetadataEntity.PENDING_NO_OP));
2312- }
2313- }
2314+- boolean decremented = false; // indicates if the transaction thread
2315+- // has release ownership of the
2316+- // page.
2317+- boolean addedFlushRequest = false; // indicates if the transaction
2318+- // thread has submitted a flush
2319+- // request.
2320++ // indicates if the transaction thread has release ownership of the page.
2321++ boolean decremented = false;
2322
2323-@@ -267,7 +268,7 @@
2324- for (int i = 0; i < secondaryIndexes.length; i++) {
2325- MetadataManager.INSTANCE.addIndex(mdTxnCtx, new Index(secondaryIndexes[i].getDataverseName(),
2326- secondaryIndexes[i].getIndexedDatasetName(), secondaryIndexes[i].getIndexName(), IndexType.BTREE,
2327-- secondaryIndexes[i].getPartitioningExpr(), false));
2328-+ secondaryIndexes[i].getPartitioningExpr(), false, IMetadataEntity.PENDING_NO_OP));
2329- }
2330- }
2331+ int pageIndex = (int) getLogPageIndex(currentLSN);
2332
2333-Index: asterix-app/src/test/java/edu/uci/ics/asterix/test/runtime/ExecutionTest.java
2334-===================================================================
2335---- asterix-app/src/test/java/edu/uci/ics/asterix/test/runtime/ExecutionTest.java (revision 1061)
2336-+++ asterix-app/src/test/java/edu/uci/ics/asterix/test/runtime/ExecutionTest.java (working copy)
2337-@@ -95,11 +95,11 @@
2338- File testFile = tcCtx.getTestFile(cUnit);
2339-
2340- /*************** to avoid run failure cases ****************
2341-- if (!testFile.getAbsolutePath().contains("runtimets/queries/failure/")) {
2342-+ if (!testFile.getAbsolutePath().contains("index-selection/")) {
2343- continue;
2344- }
2345- ************************************************************/
2346+- /*
2347+- * the lsn has been obtained for the log record. need to set the
2348+- * LogLocator instance accordingly.
2349+- */
2350 -
2351-+
2352- File expectedResultFile = tcCtx.getExpectedResultFile(cUnit);
2353- File actualFile = new File(PATH_ACTUAL + File.separator
2354- + tcCtx.getTestCase().getFilePath().replace(File.separator, "_") + "_" + cUnit.getName() + ".adm");
2355-Index: asterix-app/src/main/java/edu/uci/ics/asterix/file/DatasetOperations.java
2356-===================================================================
2357---- asterix-app/src/main/java/edu/uci/ics/asterix/file/DatasetOperations.java (revision 1061)
2358-+++ asterix-app/src/main/java/edu/uci/ics/asterix/file/DatasetOperations.java (working copy)
2359-@@ -90,7 +90,7 @@
2360++ // the lsn has been obtained for the log record. need to set the
2361++ // LogLocator instance accordingly.
2362+ try {
2363+-
2364+ logicalLogLocator.setBuffer(logPages[pageIndex]);
2365+ int pageOffset = getLogPageOffset(currentLSN);
2366+ logicalLogLocator.setMemoryOffset(pageOffset);
2367
2368- private static Logger LOGGER = Logger.getLogger(DatasetOperations.class.getName());
2369+- /*
2370+- * write the log header.
2371+- */
2372+- logRecordHelper.writeLogHeader(logicalLogLocator, logType, context, datasetId, PKHashValue, previousLSN,
2373++ // write the log header.
2374++ logRecordHelper.writeLogHeader(logicalLogLocator, logType, txnCtx, datasetId, PKHashValue, previousLSN,
2375+ resourceId, resourceMgrId, logContentSize);
2376
2377-- public static JobSpecification[] createDropDatasetJobSpec(CompiledDatasetDropStatement datasetDropStmt,
2378-+ public static JobSpecification createDropDatasetJobSpec(CompiledDatasetDropStatement datasetDropStmt,
2379- AqlMetadataProvider metadataProvider) throws AlgebricksException, HyracksDataException, RemoteException,
2380- ACIDException, AsterixException {
2381+ // increment the offset so that the transaction can fill up the
2382+ // content in the correct region of the allocated space.
2383+ logicalLogLocator.increaseMemoryOffset(logRecordHelper.getLogHeaderSize(logType));
2384
2385-@@ -111,67 +111,10 @@
2386- throw new AlgebricksException("DROP DATASET: No metadata for dataset " + datasetName);
2387- }
2388- if (dataset.getDatasetType() == DatasetType.EXTERNAL) {
2389-- return new JobSpecification[0];
2390-+ return new JobSpecification();
2391- }
2392--
2393-- List<Index> datasetIndexes = metadataProvider.getDatasetIndexes(dataset.getDataverseName(),
2394-- dataset.getDatasetName());
2395-- int numSecondaryIndexes = 0;
2396-- for (Index index : datasetIndexes) {
2397-- if (index.isSecondaryIndex()) {
2398-- numSecondaryIndexes++;
2399-- }
2400-- }
2401-- JobSpecification[] specs;
2402-- if (numSecondaryIndexes > 0) {
2403-- specs = new JobSpecification[numSecondaryIndexes + 1];
2404-- int i = 0;
2405-- // First, drop secondary indexes.
2406-- for (Index index : datasetIndexes) {
2407-- if (index.isSecondaryIndex()) {
2408-- specs[i] = new JobSpecification();
2409-- Pair<IFileSplitProvider, AlgebricksPartitionConstraint> idxSplitsAndConstraint = metadataProvider
2410-- .splitProviderAndPartitionConstraintsForInternalOrFeedDataset(dataset.getDataverseName(),
2411-- datasetName, index.getIndexName());
2412-- IIndexDataflowHelperFactory dfhFactory;
2413-- switch (index.getIndexType()) {
2414-- case BTREE:
2415-- dfhFactory = new LSMBTreeDataflowHelperFactory(
2416-- AsterixRuntimeComponentsProvider.LSMBTREE_PROVIDER,
2417-- AsterixRuntimeComponentsProvider.LSMBTREE_PROVIDER,
2418-- AsterixRuntimeComponentsProvider.LSMBTREE_PROVIDER);
2419-- break;
2420-- case RTREE:
2421-- dfhFactory = new LSMRTreeDataflowHelperFactory(
2422-- new IPrimitiveValueProviderFactory[] { null }, RTreePolicyType.RTREE,
2423-- new IBinaryComparatorFactory[] { null },
2424-- AsterixRuntimeComponentsProvider.LSMRTREE_PROVIDER,
2425-- AsterixRuntimeComponentsProvider.LSMRTREE_PROVIDER,
2426-- AsterixRuntimeComponentsProvider.LSMRTREE_PROVIDER, null);
2427-- break;
2428-- case NGRAM_INVIX:
2429-- case WORD_INVIX:
2430-- dfhFactory = new LSMInvertedIndexDataflowHelperFactory(
2431-- AsterixRuntimeComponentsProvider.LSMINVERTEDINDEX_PROVIDER,
2432-- AsterixRuntimeComponentsProvider.LSMINVERTEDINDEX_PROVIDER,
2433-- AsterixRuntimeComponentsProvider.LSMINVERTEDINDEX_PROVIDER);
2434-- break;
2435-- default:
2436-- throw new AsterixException("Unknown index type provided.");
2437-- }
2438-- IndexDropOperatorDescriptor secondaryBtreeDrop = new IndexDropOperatorDescriptor(specs[i],
2439-- AsterixRuntimeComponentsProvider.NOINDEX_PROVIDER,
2440-- AsterixRuntimeComponentsProvider.NOINDEX_PROVIDER, idxSplitsAndConstraint.first, dfhFactory);
2441-- AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(specs[i], secondaryBtreeDrop,
2442-- idxSplitsAndConstraint.second);
2443-- i++;
2444-- }
2445-- }
2446-- } else {
2447-- specs = new JobSpecification[1];
2448-- }
2449-+
2450- JobSpecification specPrimary = new JobSpecification();
2451-- specs[specs.length - 1] = specPrimary;
2452+- // a COMMIT log record does not have any content
2453+- // and hence the logger (responsible for putting the log content) is
2454+- // not invoked.
2455++ // a COMMIT log record does not have any content and hence
2456++ // the logger (responsible for putting the log content) is not invoked.
2457+ if (logContentSize != 0) {
2458+- logger.preLog(context, reusableLogContentObject);
2459++ logger.preLog(txnCtx, reusableLogContentObject);
2460+ }
2461
2462- Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint = metadataProvider
2463- .splitProviderAndPartitionConstraintsForInternalOrFeedDataset(dataset.getDataverseName(), datasetName,
2464-@@ -187,7 +130,7 @@
2465+ if (logContentSize != 0) {
2466+ // call the logger implementation and ask to fill in the log
2467+ // record content at the allocated space.
2468+- logger.log(context, logicalLogLocator, logContentSize, reusableLogContentObject);
2469+- logger.postLog(context, reusableLogContentObject);
2470++ logger.log(txnCtx, logicalLogLocator, logContentSize, reusableLogContentObject);
2471++ logger.postLog(txnCtx, reusableLogContentObject);
2472+ if (IS_DEBUG_MODE) {
2473+ logicalLogLocator.setMemoryOffset(logicalLogLocator.getMemoryOffset()
2474+ - logRecordHelper.getLogHeaderSize(logType));
2475+@@ -597,10 +479,8 @@
2476+ }
2477+ }
2478
2479- specPrimary.addRoot(primaryBtreeDrop);
2480+- /*
2481+- * The log record has been written. For integrity checks, compute
2482+- * the checksum and put it at the end of the log record.
2483+- */
2484++ // The log record has been written. For integrity checks, compute
2485++ // the checksum and put it at the end of the log record.
2486+ int startPosChecksum = logicalLogLocator.getMemoryOffset() - logRecordHelper.getLogHeaderSize(logType);
2487+ int length = totalLogSize - logRecordHelper.getLogChecksumSize();
2488+ long checksum = DataUtil.getChecksum(logPages[pageIndex], startPosChecksum, length);
2489+@@ -611,46 +491,31 @@
2490+ System.out.println("--------------> LSN(" + currentLSN + ") is written");
2491+ }
2492
2493-- return specs;
2494-+ return specPrimary;
2495- }
2496+- /*
2497+- * release the ownership as the log record has been placed in
2498+- * created space.
2499+- */
2500+- int pageDirtyCount = logPageOwnerCount[pageIndex].decrementAndGet();
2501++ // release the ownership as the log record has been placed in created space.
2502++ logPageOwnerCount[pageIndex].decrementAndGet();
2503
2504- public static JobSpecification createDatasetJobSpec(Dataverse dataverse, String datasetName,
2505-Index: asterix-app/src/main/java/edu/uci/ics/asterix/aql/translator/AqlTranslator.java
2506-===================================================================
2507---- asterix-app/src/main/java/edu/uci/ics/asterix/aql/translator/AqlTranslator.java (revision 1061)
2508-+++ asterix-app/src/main/java/edu/uci/ics/asterix/aql/translator/AqlTranslator.java (working copy)
2509-@@ -21,6 +21,8 @@
2510- import java.util.HashMap;
2511- import java.util.List;
2512- import java.util.Map;
2513-+import java.util.concurrent.locks.ReadWriteLock;
2514-+import java.util.concurrent.locks.ReentrantReadWriteLock;
2515+ // indicating that the transaction thread has released ownership
2516+ decremented = true;
2517
2518- import org.json.JSONException;
2519+- /*
2520+- * If the transaction thread happens to be the last owner of the log
2521+- * page the page must by marked as a candidate to be flushed.
2522+- */
2523+- if (pageDirtyCount == 0 && logPageStatus[pageIndex].get() == PageState.INACTIVE) {
2524+- addFlushRequest(pageIndex);
2525+- addedFlushRequest = true;
2526+- }
2527+-
2528+- /*
2529+- * If the log type is commit, a flush request is registered, if the
2530+- * log record has not reached the disk. It may be possible that this
2531+- * thread does not get CPU cycles and in-between the log record has
2532+- * been flushed to disk because the containing log page filled up.
2533+- */
2534+- if (logType == LogType.COMMIT) {
2535+- synchronized (logPages[pageIndex]) {
2536+- while (getLastFlushedLsn().get() < currentLSN) {
2537+- logPages[pageIndex].wait();
2538+- }
2539++ if (logType == LogType.ENTITY_COMMIT) {
2540++ map = activeTxnCountMaps.get(pageIndex);
2541++ if (map.containsKey(txnCtx)) {
2542++ activeTxnCount = (Integer) map.get(txnCtx);
2543++ activeTxnCount++;
2544++ map.put(txnCtx, activeTxnCount);
2545++ } else {
2546++ map.put(txnCtx, 1);
2547+ }
2548++ addFlushRequest(pageIndex, currentLSN, false);
2549++ } else if (logType == LogType.COMMIT) {
2550++ addFlushRequest(pageIndex, currentLSN, true);
2551+ }
2552
2553-@@ -68,6 +70,7 @@
2554- import edu.uci.ics.asterix.metadata.MetadataException;
2555- import edu.uci.ics.asterix.metadata.MetadataManager;
2556- import edu.uci.ics.asterix.metadata.MetadataTransactionContext;
2557-+import edu.uci.ics.asterix.metadata.api.IMetadataEntity;
2558- import edu.uci.ics.asterix.metadata.declared.AqlMetadataProvider;
2559- import edu.uci.ics.asterix.metadata.entities.Dataset;
2560- import edu.uci.ics.asterix.metadata.entities.Datatype;
2561-@@ -112,6 +115,7 @@
2562- private final PrintWriter out;
2563- private final SessionConfig sessionConfig;
2564- private final DisplayFormat pdf;
2565-+ private final ReadWriteLock cacheLatch;
2566- private Dataverse activeDefaultDataverse;
2567- private List<FunctionDecl> declaredFunctions;
2568+ } catch (Exception e) {
2569+ e.printStackTrace();
2570+- throw new ACIDException(context, "Thread: " + Thread.currentThread().getName()
2571++ throw new ACIDException(txnCtx, "Thread: " + Thread.currentThread().getName()
2572+ + " logger encountered exception", e);
2573+ } finally {
2574+- /*
2575+- * If an exception was encountered and we did not release ownership
2576+- */
2577+ if (!decremented) {
2578+ logPageOwnerCount[pageIndex].decrementAndGet();
2579+ }
2580+@@ -667,9 +532,6 @@
2581
2582-@@ -121,6 +125,7 @@
2583- this.out = out;
2584- this.sessionConfig = pc;
2585- this.pdf = pdf;
2586-+ this.cacheLatch = new ReentrantReadWriteLock(true);
2587- declaredFunctions = getDeclaredFunctions(aqlStatements);
2588+ logPages[pageIndex].reset(filePath, LogUtil.getFileOffset(this, nextWritePosition),
2589+ logManagerProperties.getLogPageSize());
2590+-
2591+- //TODO Check if this is necessary
2592+- //Arrays.fill(logPages[pageIndex].getArray(), (byte) 0);
2593 }
2594
2595-@@ -143,8 +148,7 @@
2596+ @Override
2597+@@ -747,16 +609,13 @@
2598+ //minimize memory allocation overhead. current code allocates the log page size per reading a log record.
2599
2600- for (Statement stmt : aqlStatements) {
2601- validateOperation(activeDefaultDataverse, stmt);
2602-- MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
2603-- AqlMetadataProvider metadataProvider = new AqlMetadataProvider(mdTxnCtx, activeDefaultDataverse);
2604-+ AqlMetadataProvider metadataProvider = new AqlMetadataProvider(activeDefaultDataverse);
2605- metadataProvider.setWriterFactory(writerFactory);
2606- metadataProvider.setOutputFile(outputFile);
2607- metadataProvider.setConfig(config);
2608-@@ -253,15 +257,9 @@
2609- }
2610+ byte[] pageContent = new byte[logManagerProperties.getLogPageSize()];
2611+- // take a lock on the log page so that the page is not flushed to
2612+- // disk interim
2613++
2614++ // take a lock on the log page so that the page is not flushed to disk interim
2615+ synchronized (logPages[pageIndex]) {
2616+- if (lsnValue > getLastFlushedLsn().get()) { // need to check
2617+- // again
2618+- // (this
2619+- // thread may have got
2620+- // de-scheduled and must
2621+- // refresh!)
2622
2623- }
2624-- MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
2625- } catch (Exception e) {
2626-- MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
2627- throw new AlgebricksException(e);
2628++ // need to check again (this thread may have got de-scheduled and must refresh!)
2629++ if (lsnValue > getLastFlushedLsn().get()) {
2630++
2631+ // get the log record length
2632+ logPages[pageIndex].getBytes(pageContent, 0, pageContent.length);
2633+ byte logType = pageContent[pageOffset + 4];
2634+@@ -765,9 +624,7 @@
2635+ int logRecordSize = logHeaderSize + logBodySize + logRecordHelper.getLogChecksumSize();
2636+ logRecord = new byte[logRecordSize];
2637+
2638+- /*
2639+- * copy the log record content
2640+- */
2641++ // copy the log record content
2642+ System.arraycopy(pageContent, pageOffset, logRecord, 0, logRecordSize);
2643+ MemBasedBuffer memBuffer = new MemBasedBuffer(logRecord);
2644+ if (logicalLogLocator == null) {
2645+@@ -790,9 +647,7 @@
2646 }
2647-- // Following jobs are run under a separate transaction, that is committed/aborted by the JobEventListener
2648-- for (JobSpecification jobspec : jobsToExecute) {
2649-- runJob(hcc, jobspec);
2650-- }
2651 }
2652- return executionResult;
2653+
2654+- /*
2655+- * the log record is residing on the disk, read it from there.
2656+- */
2657++ // the log record is residing on the disk, read it from there.
2658+ readDiskLog(lsnValue, logicalLogLocator);
2659 }
2660-@@ -289,398 +287,802 @@
2661
2662- private Dataverse handleUseDataverseStatement(AqlMetadataProvider metadataProvider, Statement stmt,
2663- List<JobSpecification> jobsToExecute) throws MetadataException, RemoteException, ACIDException {
2664-- DataverseDecl dvd = (DataverseDecl) stmt;
2665-- String dvName = dvd.getDataverseName().getValue();
2666-- Dataverse dv = MetadataManager.INSTANCE.getDataverse(metadataProvider.getMetadataTxnContext(), dvName);
2667-- if (dv == null) {
2668-- throw new MetadataException("Unknown dataverse " + dvName);
2669-+
2670-+ MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
2671-+ metadataProvider.setMetadataTxnContext(mdTxnCtx);
2672-+ acquireReadLatch();
2673-+
2674-+ try {
2675-+ DataverseDecl dvd = (DataverseDecl) stmt;
2676-+ String dvName = dvd.getDataverseName().getValue();
2677-+ Dataverse dv = MetadataManager.INSTANCE.getDataverse(metadataProvider.getMetadataTxnContext(), dvName);
2678-+ if (dv == null) {
2679-+ throw new MetadataException("Unknown dataverse " + dvName);
2680-+ }
2681-+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
2682-+ return dv;
2683-+ } catch (Exception e) {
2684-+ MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
2685-+ throw new MetadataException(e);
2686-+ } finally {
2687-+ releaseReadLatch();
2688- }
2689-- return dv;
2690+@@ -860,30 +715,40 @@
2691+ return logPageOwnerCount[pageIndex];
2692 }
2693
2694- private void handleCreateDataverseStatement(AqlMetadataProvider metadataProvider, Statement stmt,
2695- List<JobSpecification> jobsToExecute) throws MetadataException, AlgebricksException, RemoteException,
2696- ACIDException {
2697-- CreateDataverseStatement stmtCreateDataverse = (CreateDataverseStatement) stmt;
2698-- String dvName = stmtCreateDataverse.getDataverseName().getValue();
2699-- Dataverse dv = MetadataManager.INSTANCE.getDataverse(metadataProvider.getMetadataTxnContext(), dvName);
2700-- if (dv != null && !stmtCreateDataverse.getIfNotExists()) {
2701-- throw new AlgebricksException("A dataverse with this name " + dvName + " already exists.");
2702-+
2703-+ MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
2704-+ metadataProvider.setMetadataTxnContext(mdTxnCtx);
2705-+ acquireWriteLatch();
2706-+
2707-+ try {
2708-+ CreateDataverseStatement stmtCreateDataverse = (CreateDataverseStatement) stmt;
2709-+ String dvName = stmtCreateDataverse.getDataverseName().getValue();
2710-+ Dataverse dv = MetadataManager.INSTANCE.getDataverse(metadataProvider.getMetadataTxnContext(), dvName);
2711-+ if (dv != null && !stmtCreateDataverse.getIfNotExists()) {
2712-+ throw new AlgebricksException("A dataverse with this name " + dvName + " already exists.");
2713-+ }
2714-+ MetadataManager.INSTANCE.addDataverse(metadataProvider.getMetadataTxnContext(), new Dataverse(dvName,
2715-+ stmtCreateDataverse.getFormat(), IMetadataEntity.PENDING_NO_OP));
2716-+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
2717-+ } catch (Exception e) {
2718-+ MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
2719-+ throw new AlgebricksException(e);
2720-+ } finally {
2721-+ releaseWriteLatch();
2722- }
2723-- MetadataManager.INSTANCE.addDataverse(metadataProvider.getMetadataTxnContext(), new Dataverse(dvName,
2724-- stmtCreateDataverse.getFormat()));
2725+- public ICommitResolver getCommitResolver() {
2726+- return commitResolver;
2727+- }
2728+-
2729+- public CommitRequestStatistics getCommitRequestStatistics() {
2730+- return commitRequestStatistics;
2731+- }
2732+-
2733+ public IFileBasedBuffer[] getLogPages() {
2734+ return logPages;
2735 }
2736
2737- private void handleCreateDatasetStatement(AqlMetadataProvider metadataProvider, Statement stmt,
2738- IHyracksClientConnection hcc, List<JobSpecification> jobsToExecute) throws AsterixException, Exception {
2739-- DatasetDecl dd = (DatasetDecl) stmt;
2740-- String dataverseName = dd.getDataverse() != null ? dd.getDataverse().getValue()
2741-- : activeDefaultDataverse != null ? activeDefaultDataverse.getDataverseName() : null;
2742-- if (dataverseName == null) {
2743-- throw new AlgebricksException(" dataverse not specified ");
2744-- }
2745-- String datasetName = dd.getName().getValue();
2746-- DatasetType dsType = dd.getDatasetType();
2747-- String itemTypeName = dd.getItemTypeName().getValue();
2748-
2749-- IDatasetDetails datasetDetails = null;
2750-- Dataset ds = MetadataManager.INSTANCE.getDataset(metadataProvider.getMetadataTxnContext(), dataverseName,
2751-- datasetName);
2752-- if (ds != null) {
2753-- if (dd.getIfNotExists()) {
2754-- return;
2755-- } else {
2756-- throw new AlgebricksException("A dataset with this name " + datasetName + " already exists.");
2757-+ MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
2758-+ boolean bActiveTxn = true;
2759-+ metadataProvider.setMetadataTxnContext(mdTxnCtx);
2760-+ acquireWriteLatch();
2761+- public int getLastFlushedPage() {
2762+- return lastFlushedPage.get();
2763+- }
2764+-
2765+- public void setLastFlushedPage(int lastFlushedPage) {
2766+- this.lastFlushedPage.set(lastFlushedPage);
2767+- }
2768+-
2769+ @Override
2770+ public TransactionSubsystem getTransactionSubsystem() {
2771+ return provider;
2772+ }
2773 +
2774-+ try {
2775-+ DatasetDecl dd = (DatasetDecl) stmt;
2776-+ String dataverseName = dd.getDataverse() != null ? dd.getDataverse().getValue()
2777-+ : activeDefaultDataverse != null ? activeDefaultDataverse.getDataverseName() : null;
2778-+ if (dataverseName == null) {
2779-+ throw new AlgebricksException(" dataverse not specified ");
2780- }
2781-- }
2782-- Datatype dt = MetadataManager.INSTANCE.getDatatype(metadataProvider.getMetadataTxnContext(), dataverseName,
2783-- itemTypeName);
2784-- if (dt == null) {
2785-- throw new AlgebricksException(": type " + itemTypeName + " could not be found.");
2786-- }
2787-- switch (dd.getDatasetType()) {
2788-- case INTERNAL: {
2789-- IAType itemType = dt.getDatatype();
2790-- if (itemType.getTypeTag() != ATypeTag.RECORD) {
2791-- throw new AlgebricksException("Can only partition ARecord's.");
2792-+ String datasetName = dd.getName().getValue();
2793-+ DatasetType dsType = dd.getDatasetType();
2794-+ String itemTypeName = dd.getItemTypeName().getValue();
2795++ public void decrementActiveTxnCountOnIndexes(int pageIndex) throws HyracksDataException {
2796++ TransactionContext ctx = null;
2797++ int count = 0;
2798++ int i = 0;
2799 +
2800-+ IDatasetDetails datasetDetails = null;
2801-+ Dataset ds = MetadataManager.INSTANCE.getDataset(metadataProvider.getMetadataTxnContext(), dataverseName,
2802-+ datasetName);
2803-+ if (ds != null) {
2804-+ if (dd.getIfNotExists()) {
2805-+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
2806-+ return;
2807-+ } else {
2808-+ throw new AlgebricksException("A dataset with this name " + datasetName + " already exists.");
2809- }
2810-- List<String> partitioningExprs = ((InternalDetailsDecl) dd.getDatasetDetailsDecl())
2811-- .getPartitioningExprs();
2812-- String ngName = ((InternalDetailsDecl) dd.getDatasetDetailsDecl()).getNodegroupName().getValue();
2813-- datasetDetails = new InternalDatasetDetails(InternalDatasetDetails.FileStructure.BTREE,
2814-- InternalDatasetDetails.PartitioningStrategy.HASH, partitioningExprs, partitioningExprs, ngName);
2815-- break;
2816- }
2817-- case EXTERNAL: {
2818-- String adapter = ((ExternalDetailsDecl) dd.getDatasetDetailsDecl()).getAdapter();
2819-- Map<String, String> properties = ((ExternalDetailsDecl) dd.getDatasetDetailsDecl()).getProperties();
2820-- datasetDetails = new ExternalDatasetDetails(adapter, properties);
2821-- break;
2822-+ Datatype dt = MetadataManager.INSTANCE.getDatatype(metadataProvider.getMetadataTxnContext(), dataverseName,
2823-+ itemTypeName);
2824-+ if (dt == null) {
2825-+ throw new AlgebricksException(": type " + itemTypeName + " could not be found.");
2826- }
2827-- case FEED: {
2828-- IAType itemType = dt.getDatatype();
2829-- if (itemType.getTypeTag() != ATypeTag.RECORD) {
2830-- throw new AlgebricksException("Can only partition ARecord's.");
2831-+ switch (dd.getDatasetType()) {
2832-+ case INTERNAL: {
2833-+ IAType itemType = dt.getDatatype();
2834-+ if (itemType.getTypeTag() != ATypeTag.RECORD) {
2835-+ throw new AlgebricksException("Can only partition ARecord's.");
2836++ HashMap<TransactionContext, Integer> map = activeTxnCountMaps.get(pageIndex);
2837++ Set<Map.Entry<TransactionContext, Integer>> entrySet = map.entrySet();
2838++ if (entrySet != null) {
2839++ for (Map.Entry<TransactionContext, Integer> entry : entrySet) {
2840++ if (entry != null) {
2841++ if (entry.getValue() != null) {
2842++ count = entry.getValue();
2843 + }
2844-+ List<String> partitioningExprs = ((InternalDetailsDecl) dd.getDatasetDetailsDecl())
2845-+ .getPartitioningExprs();
2846-+ String ngName = ((InternalDetailsDecl) dd.getDatasetDetailsDecl()).getNodegroupName().getValue();
2847-+ datasetDetails = new InternalDatasetDetails(InternalDatasetDetails.FileStructure.BTREE,
2848-+ InternalDatasetDetails.PartitioningStrategy.HASH, partitioningExprs, partitioningExprs,
2849-+ ngName);
2850-+ break;
2851- }
2852-- List<String> partitioningExprs = ((FeedDetailsDecl) dd.getDatasetDetailsDecl()).getPartitioningExprs();
2853-- String ngName = ((FeedDetailsDecl) dd.getDatasetDetailsDecl()).getNodegroupName().getValue();
2854-- String adapter = ((FeedDetailsDecl) dd.getDatasetDetailsDecl()).getAdapterFactoryClassname();
2855-- Map<String, String> configuration = ((FeedDetailsDecl) dd.getDatasetDetailsDecl()).getConfiguration();
2856-- FunctionSignature signature = ((FeedDetailsDecl) dd.getDatasetDetailsDecl()).getFunctionSignature();
2857-- datasetDetails = new FeedDatasetDetails(InternalDatasetDetails.FileStructure.BTREE,
2858-- InternalDatasetDetails.PartitioningStrategy.HASH, partitioningExprs, partitioningExprs, ngName,
2859-- adapter, configuration, signature, FeedDatasetDetails.FeedState.INACTIVE.toString());
2860-- break;
2861-+ case EXTERNAL: {
2862-+ String adapter = ((ExternalDetailsDecl) dd.getDatasetDetailsDecl()).getAdapter();
2863-+ Map<String, String> properties = ((ExternalDetailsDecl) dd.getDatasetDetailsDecl()).getProperties();
2864-+ datasetDetails = new ExternalDatasetDetails(adapter, properties);
2865-+ break;
2866-+ }
2867-+ case FEED: {
2868-+ IAType itemType = dt.getDatatype();
2869-+ if (itemType.getTypeTag() != ATypeTag.RECORD) {
2870-+ throw new AlgebricksException("Can only partition ARecord's.");
2871++ if (count > 0) {
2872++ ctx = entry.getKey();
2873++ for (i = 0; i < count; i++) {
2874++ ctx.decreaseActiveTransactionCountOnIndexes();
2875++ }
2876 + }
2877-+ List<String> partitioningExprs = ((FeedDetailsDecl) dd.getDatasetDetailsDecl())
2878-+ .getPartitioningExprs();
2879-+ String ngName = ((FeedDetailsDecl) dd.getDatasetDetailsDecl()).getNodegroupName().getValue();
2880-+ String adapter = ((FeedDetailsDecl) dd.getDatasetDetailsDecl()).getAdapterFactoryClassname();
2881-+ Map<String, String> configuration = ((FeedDetailsDecl) dd.getDatasetDetailsDecl())
2882-+ .getConfiguration();
2883-+ FunctionSignature signature = ((FeedDetailsDecl) dd.getDatasetDetailsDecl()).getFunctionSignature();
2884-+ datasetDetails = new FeedDatasetDetails(InternalDatasetDetails.FileStructure.BTREE,
2885-+ InternalDatasetDetails.PartitioningStrategy.HASH, partitioningExprs, partitioningExprs,
2886-+ ngName, adapter, configuration, signature, FeedDatasetDetails.FeedState.INACTIVE.toString());
2887-+ break;
2888 + }
2889- }
2890-+
2891-+ //#. add a new dataset with PendingAddOp
2892-+ Dataset dataset = new Dataset(dataverseName, datasetName, itemTypeName, datasetDetails, dsType,
2893-+ DatasetIdFactory.generateDatasetId(), IMetadataEntity.PENDING_ADD_OP);
2894-+ MetadataManager.INSTANCE.addDataset(metadataProvider.getMetadataTxnContext(), dataset);
2895-+
2896-+ if (dd.getDatasetType() == DatasetType.INTERNAL || dd.getDatasetType() == DatasetType.FEED) {
2897-+ Dataverse dataverse = MetadataManager.INSTANCE.getDataverse(metadataProvider.getMetadataTxnContext(),
2898-+ dataverseName);
2899-+ JobSpecification jobSpec = DatasetOperations.createDatasetJobSpec(dataverse, datasetName,
2900-+ metadataProvider);
2901-+
2902-+ //#. make metadataTxn commit before calling runJob.
2903-+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
2904-+ bActiveTxn = false;
2905-+
2906-+ //#. runJob
2907-+ runJob(hcc, jobSpec);
2908-+
2909-+ //#. begin new metadataTxn
2910-+ mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
2911-+ bActiveTxn = true;
2912-+ metadataProvider.setMetadataTxnContext(mdTxnCtx);
2913 + }
2914++ }
2915 +
2916-+ //#. add a new dataset with PendingNoOp after deleting the dataset with PendingAddOp
2917-+ MetadataManager.INSTANCE.dropDataset(metadataProvider.getMetadataTxnContext(), dataverseName, datasetName);
2918-+ MetadataManager.INSTANCE.addDataset(metadataProvider.getMetadataTxnContext(), new Dataset(dataverseName,
2919-+ datasetName, itemTypeName, datasetDetails, dsType, dataset.getDatasetId(),
2920-+ IMetadataEntity.PENDING_NO_OP));
2921-+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
2922-+ } catch (Exception e) {
2923-+ if (bActiveTxn) {
2924-+ MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
2925-+ }
2926-+ throw new AlgebricksException(e);
2927-+ } finally {
2928-+ releaseWriteLatch();
2929- }
2930-- MetadataManager.INSTANCE.addDataset(metadataProvider.getMetadataTxnContext(), new Dataset(dataverseName,
2931-- datasetName, itemTypeName, datasetDetails, dsType, DatasetIdFactory.generateDatasetId()));
2932-- if (dd.getDatasetType() == DatasetType.INTERNAL || dd.getDatasetType() == DatasetType.FEED) {
2933-- Dataverse dataverse = MetadataManager.INSTANCE.getDataverse(metadataProvider.getMetadataTxnContext(),
2934-- dataverseName);
2935-- runJob(hcc, DatasetOperations.createDatasetJobSpec(dataverse, datasetName, metadataProvider));
2936-- }
2937- }
2938++ map.clear();
2939++ }
2940+ }
2941
2942- private void handleCreateIndexStatement(AqlMetadataProvider metadataProvider, Statement stmt,
2943- IHyracksClientConnection hcc, List<JobSpecification> jobsToExecute) throws Exception {
2944-- CreateIndexStatement stmtCreateIndex = (CreateIndexStatement) stmt;
2945-- String dataverseName = stmtCreateIndex.getDataverseName() == null ? activeDefaultDataverse == null ? null
2946-- : activeDefaultDataverse.getDataverseName() : stmtCreateIndex.getDataverseName().getValue();
2947-- if (dataverseName == null) {
2948-- throw new AlgebricksException(" dataverse not specified ");
2949-- }
2950-- String datasetName = stmtCreateIndex.getDatasetName().getValue();
2951-- Dataset ds = MetadataManager.INSTANCE.getDataset(metadataProvider.getMetadataTxnContext(), dataverseName,
2952-- datasetName);
2953-- if (ds == null) {
2954-- throw new AlgebricksException("There is no dataset with this name " + datasetName + " in dataverse "
2955-- + dataverseName);
2956-- }
2957-- String indexName = stmtCreateIndex.getIndexName().getValue();
2958-- Index idx = MetadataManager.INSTANCE.getIndex(metadataProvider.getMetadataTxnContext(), dataverseName,
2959-- datasetName, indexName);
2960-- if (idx != null) {
2961-- if (!stmtCreateIndex.getIfNotExists()) {
2962-- throw new AlgebricksException("An index with this name " + indexName + " already exists.");
2963-- } else {
2964-- stmtCreateIndex.setNeedToCreate(false);
2965-+
2966-+ MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
2967-+ boolean bActiveTxn = true;
2968-+ metadataProvider.setMetadataTxnContext(mdTxnCtx);
2969-+ acquireWriteLatch();
2970-+
2971-+ try {
2972-+ CreateIndexStatement stmtCreateIndex = (CreateIndexStatement) stmt;
2973-+ String dataverseName = stmtCreateIndex.getDataverseName() == null ? activeDefaultDataverse == null ? null
2974-+ : activeDefaultDataverse.getDataverseName() : stmtCreateIndex.getDataverseName().getValue();
2975-+ if (dataverseName == null) {
2976-+ throw new AlgebricksException(" dataverse not specified ");
2977- }
2978-- } else {
2979-+ String datasetName = stmtCreateIndex.getDatasetName().getValue();
2980-+
2981-+ Dataset ds = MetadataManager.INSTANCE.getDataset(metadataProvider.getMetadataTxnContext(), dataverseName,
2982-+ datasetName);
2983-+ if (ds == null) {
2984-+ throw new AlgebricksException("There is no dataset with this name " + datasetName + " in dataverse "
2985-+ + dataverseName);
2986-+ }
2987-+
2988-+ String indexName = stmtCreateIndex.getIndexName().getValue();
2989-+ Index idx = MetadataManager.INSTANCE.getIndex(metadataProvider.getMetadataTxnContext(), dataverseName,
2990-+ datasetName, indexName);
2991-+
2992-+ if (idx != null) {
2993-+ if (!stmtCreateIndex.getIfNotExists()) {
2994-+ throw new AlgebricksException("An index with this name " + indexName + " already exists.");
2995-+ } else {
2996-+ stmtCreateIndex.setNeedToCreate(false);
2997-+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
2998-+ return;
2999-+ }
3000-+ }
3001-+
3002-+ //#. add a new index with PendingAddOp
3003- Index index = new Index(dataverseName, datasetName, indexName, stmtCreateIndex.getIndexType(),
3004-- stmtCreateIndex.getFieldExprs(), stmtCreateIndex.getGramLength(), false);
3005-+ stmtCreateIndex.getFieldExprs(), stmtCreateIndex.getGramLength(), false,
3006-+ IMetadataEntity.PENDING_ADD_OP);
3007- MetadataManager.INSTANCE.addIndex(metadataProvider.getMetadataTxnContext(), index);
3008-- runCreateIndexJob(hcc, stmtCreateIndex, metadataProvider);
3009+ /*
3010+@@ -895,36 +760,82 @@
3011+ class LogPageFlushThread extends Thread {
3012
3013-+ //#. create the index artifact in NC.
3014- CompiledCreateIndexStatement cis = new CompiledCreateIndexStatement(index.getIndexName(), dataverseName,
3015- index.getDatasetName(), index.getKeyFieldNames(), index.getGramLength(), index.getIndexType());
3016-- JobSpecification loadIndexJobSpec = IndexOperations
3017-- .buildSecondaryIndexLoadingJobSpec(cis, metadataProvider);
3018-- runJob(hcc, loadIndexJobSpec);
3019-+ JobSpecification spec = IndexOperations.buildSecondaryIndexCreationJobSpec(cis, metadataProvider);
3020-+ if (spec == null) {
3021-+ throw new AsterixException("Failed to create job spec for creating index '"
3022-+ + stmtCreateIndex.getDatasetName() + "." + stmtCreateIndex.getIndexName() + "'");
3023-+ }
3024-+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
3025-+ bActiveTxn = false;
3026-+
3027-+ runJob(hcc, spec);
3028-+
3029-+ mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
3030-+ bActiveTxn = true;
3031-+ metadataProvider.setMetadataTxnContext(mdTxnCtx);
3032-+
3033-+ //#. load data into the index in NC.
3034-+ cis = new CompiledCreateIndexStatement(index.getIndexName(), dataverseName, index.getDatasetName(),
3035-+ index.getKeyFieldNames(), index.getGramLength(), index.getIndexType());
3036-+ spec = IndexOperations.buildSecondaryIndexLoadingJobSpec(cis, metadataProvider);
3037-+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
3038-+ bActiveTxn = false;
3039-+
3040-+ runJob(hcc, spec);
3041-+
3042-+ //#. begin new metadataTxn
3043-+ mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
3044-+ bActiveTxn = true;
3045-+ metadataProvider.setMetadataTxnContext(mdTxnCtx);
3046-+
3047-+ //#. add another new index with PendingNoOp after deleting the index with PendingAddOp
3048-+ MetadataManager.INSTANCE.dropIndex(metadataProvider.getMetadataTxnContext(), dataverseName, datasetName,
3049-+ indexName);
3050-+ index = new Index(dataverseName, datasetName, indexName, stmtCreateIndex.getIndexType(),
3051-+ stmtCreateIndex.getFieldExprs(), stmtCreateIndex.getGramLength(), false,
3052-+ IMetadataEntity.PENDING_NO_OP);
3053-+ MetadataManager.INSTANCE.addIndex(metadataProvider.getMetadataTxnContext(), index);
3054-+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
3055-+
3056-+ } catch (Exception e) {
3057-+ if (bActiveTxn) {
3058-+ MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
3059-+ }
3060-+ throw new AlgebricksException(e);
3061-+ } finally {
3062-+ releaseWriteLatch();
3063- }
3064- }
3065+ private LogManager logManager;
3066++ /*
3067++ * pendingFlushRequests is a map with key as Integer denoting the page
3068++ * index. When a (transaction) thread discovers the need to flush a page, it
3069++ * puts its Thread object into the corresponding value that is a
3070++ * LinkedBlockingQueue. The LogManager has a LogFlusher thread that scans
3071++ * this map in order of page index (and circling around). The flusher thread
3072++ * needs to flush pages in order and waits for a thread to deposit an object
3073++ * in the blocking queue corresponding to the next page in order. A request
3074++ * to flush a page is conveyed to the flush thread by simply depositing an
3075++ * object in to corresponding blocking queue. It is blocking in the sense
3076++ * that the flusher thread will continue to wait for an object to arrive in
3077++ * the queue. The object itself is ignored by the fliusher and just acts as
3078++ * a signal/event that a page needs to be flushed.
3079++ */
3080++ private final LinkedBlockingQueue<Object>[] flushRequestQueue;
3081++ private final Object[] flushRequests;
3082++ private int lastFlushedPageIndex;
3083++ private final long groupCommitWaitPeriod;
3084
3085- private void handleCreateTypeStatement(AqlMetadataProvider metadataProvider, Statement stmt,
3086- List<JobSpecification> jobsToExecute) throws AlgebricksException, RemoteException, ACIDException,
3087- MetadataException {
3088-- MetadataTransactionContext mdTxnCtx = metadataProvider.getMetadataTxnContext();
3089-- TypeDecl stmtCreateType = (TypeDecl) stmt;
3090-- String dataverseName = stmtCreateType.getDataverseName() == null ? activeDefaultDataverse == null ? null
3091-- : activeDefaultDataverse.getDataverseName() : stmtCreateType.getDataverseName().getValue();
3092-- if (dataverseName == null) {
3093-- throw new AlgebricksException(" dataverse not specified ");
3094-- }
3095-- String typeName = stmtCreateType.getIdent().getValue();
3096-- Dataverse dv = MetadataManager.INSTANCE.getDataverse(mdTxnCtx, dataverseName);
3097-- if (dv == null) {
3098-- throw new AlgebricksException("Unknonw dataverse " + dataverseName);
3099-- }
3100-- Datatype dt = MetadataManager.INSTANCE.getDatatype(mdTxnCtx, dataverseName, typeName);
3101-- if (dt != null) {
3102-- if (!stmtCreateType.getIfNotExists())
3103-- throw new AlgebricksException("A datatype with this name " + typeName + " already exists.");
3104-- } else {
3105-- if (builtinTypeMap.get(typeName) != null) {
3106-- throw new AlgebricksException("Cannot redefine builtin type " + typeName + ".");
3107-+
3108-+ MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
3109-+ metadataProvider.setMetadataTxnContext(mdTxnCtx);
3110-+ acquireWriteLatch();
3111-+
3112-+ try {
3113-+ TypeDecl stmtCreateType = (TypeDecl) stmt;
3114-+ String dataverseName = stmtCreateType.getDataverseName() == null ? activeDefaultDataverse == null ? null
3115-+ : activeDefaultDataverse.getDataverseName() : stmtCreateType.getDataverseName().getValue();
3116-+ if (dataverseName == null) {
3117-+ throw new AlgebricksException(" dataverse not specified ");
3118-+ }
3119-+ String typeName = stmtCreateType.getIdent().getValue();
3120-+ Dataverse dv = MetadataManager.INSTANCE.getDataverse(mdTxnCtx, dataverseName);
3121-+ if (dv == null) {
3122-+ throw new AlgebricksException("Unknonw dataverse " + dataverseName);
3123-+ }
3124-+ Datatype dt = MetadataManager.INSTANCE.getDatatype(mdTxnCtx, dataverseName, typeName);
3125-+ if (dt != null) {
3126-+ if (!stmtCreateType.getIfNotExists()) {
3127-+ throw new AlgebricksException("A datatype with this name " + typeName + " already exists.");
3128-+ }
3129- } else {
3130-- Map<TypeSignature, IAType> typeMap = TypeTranslator.computeTypes(mdTxnCtx, (TypeDecl) stmt,
3131-- dataverseName);
3132-- TypeSignature typeSignature = new TypeSignature(dataverseName, typeName);
3133-- IAType type = typeMap.get(typeSignature);
3134-- MetadataManager.INSTANCE.addDatatype(mdTxnCtx, new Datatype(dataverseName, typeName, type, false));
3135-+ if (builtinTypeMap.get(typeName) != null) {
3136-+ throw new AlgebricksException("Cannot redefine builtin type " + typeName + ".");
3137-+ } else {
3138-+ Map<TypeSignature, IAType> typeMap = TypeTranslator.computeTypes(mdTxnCtx, (TypeDecl) stmt,
3139-+ dataverseName);
3140-+ TypeSignature typeSignature = new TypeSignature(dataverseName, typeName);
3141-+ IAType type = typeMap.get(typeSignature);
3142-+ MetadataManager.INSTANCE.addDatatype(mdTxnCtx, new Datatype(dataverseName, typeName, type, false));
3143-+ }
3144- }
3145-+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
3146-+ } catch (Exception e) {
3147-+ MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
3148-+ throw new AlgebricksException(e);
3149-+ } finally {
3150-+ releaseWriteLatch();
3151- }
3152+ public LogPageFlushThread(LogManager logManager) {
3153+ this.logManager = logManager;
3154+ setName("Flusher");
3155++ int numLogPages = logManager.getLogManagerProperties().getNumLogPages();
3156++ this.flushRequestQueue = new LinkedBlockingQueue[numLogPages];
3157++ this.flushRequests = new Object[numLogPages];
3158++ for (int i = 0; i < numLogPages; i++) {
3159++ flushRequestQueue[i] = new LinkedBlockingQueue<Object>(1);
3160++ flushRequests[i] = new Object();
3161++ }
3162++ this.lastFlushedPageIndex = -1;
3163++ groupCommitWaitPeriod = logManager.getLogManagerProperties().getGroupCommitWaitPeriod();
3164 }
3165
3166- private void handleDataverseDropStatement(AqlMetadataProvider metadataProvider, Statement stmt,
3167- IHyracksClientConnection hcc, List<JobSpecification> jobsToExecute) throws Exception {
3168-- MetadataTransactionContext mdTxnCtx = metadataProvider.getMetadataTxnContext();
3169-- DataverseDropStatement stmtDelete = (DataverseDropStatement) stmt;
3170-- String dvName = stmtDelete.getDataverseName().getValue();
3171-
3172-- Dataverse dv = MetadataManager.INSTANCE.getDataverse(mdTxnCtx, dvName);
3173-- if (dv == null) {
3174-- if (!stmtDelete.getIfExists()) {
3175-- throw new AlgebricksException("There is no dataverse with this name " + dvName + ".");
3176-+ MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
3177-+ boolean bActiveTxn = true;
3178-+ metadataProvider.setMetadataTxnContext(mdTxnCtx);
3179-+ acquireWriteLatch();
3180-+
3181-+ try {
3182-+ DataverseDropStatement stmtDelete = (DataverseDropStatement) stmt;
3183-+ String dvName = stmtDelete.getDataverseName().getValue();
3184-+
3185-+ Dataverse dv = MetadataManager.INSTANCE.getDataverse(mdTxnCtx, dvName);
3186-+ if (dv == null) {
3187-+ if (!stmtDelete.getIfExists()) {
3188-+ throw new AlgebricksException("There is no dataverse with this name " + dvName + ".");
3189-+ }
3190-+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
3191++ public void requestFlush(int pageIndex, long lsn, boolean isSynchronous) {
3192++ synchronized (logManager.getLogPage(pageIndex)) {
3193++ //return if flushedLSN >= lsn
3194++ if (logManager.getLastFlushedLsn().get() >= lsn) {
3195 + return;
3196- }
3197-- } else {
3198-+
3199-+ //#. prepare jobs which will drop corresponding datasets with indexes.
3200- List<Dataset> datasets = MetadataManager.INSTANCE.getDataverseDatasets(mdTxnCtx, dvName);
3201- for (int j = 0; j < datasets.size(); j++) {
3202- String datasetName = datasets.get(j).getDatasetName();
3203- DatasetType dsType = datasets.get(j).getDatasetType();
3204- if (dsType == DatasetType.INTERNAL || dsType == DatasetType.FEED) {
3205-+
3206- List<Index> indexes = MetadataManager.INSTANCE.getDatasetIndexes(mdTxnCtx, dvName, datasetName);
3207- for (int k = 0; k < indexes.size(); k++) {
3208- if (indexes.get(k).isSecondaryIndex()) {
3209-- compileIndexDropStatement(hcc, dvName, datasetName, indexes.get(k).getIndexName(),
3210-- metadataProvider);
3211-+ CompiledIndexDropStatement cds = new CompiledIndexDropStatement(dvName, datasetName,
3212-+ indexes.get(k).getIndexName());
3213-+ jobsToExecute.add(IndexOperations.buildDropSecondaryIndexJobSpec(cds, metadataProvider));
3214- }
3215- }
3216-+
3217-+ CompiledDatasetDropStatement cds = new CompiledDatasetDropStatement(dvName, datasetName);
3218-+ jobsToExecute.add(DatasetOperations.createDropDatasetJobSpec(cds, metadataProvider));
3219- }
3220-- compileDatasetDropStatement(hcc, dvName, datasetName, metadataProvider);
3221- }
3222-
3223-+ //#. mark PendingDropOp on the dataverse record by
3224-+ // first, deleting the dataverse record from the DATAVERSE_DATASET
3225-+ // second, inserting the dataverse record with the PendingDropOp value into the DATAVERSE_DATASET
3226- MetadataManager.INSTANCE.dropDataverse(mdTxnCtx, dvName);
3227-+ MetadataManager.INSTANCE.addDataverse(mdTxnCtx, new Dataverse(dvName, dv.getDataFormat(),
3228-+ IMetadataEntity.PENDING_DROP_OP));
3229-+
3230-+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
3231-+ bActiveTxn = false;
3232-+
3233-+ for (JobSpecification jobSpec : jobsToExecute) {
3234-+ runJob(hcc, jobSpec);
3235 + }
3236 +
3237-+ mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
3238-+ bActiveTxn = true;
3239-+ metadataProvider.setMetadataTxnContext(mdTxnCtx);
3240++ //put a new request to the queue only if the request on the page is not in the queue.
3241++ flushRequestQueue[pageIndex].offer(flushRequests[pageIndex]);
3242 +
3243-+ //#. finally, delete the dataverse.
3244-+ MetadataManager.INSTANCE.dropDataverse(mdTxnCtx, dvName);
3245- if (activeDefaultDataverse != null && activeDefaultDataverse.getDataverseName() == dvName) {
3246- activeDefaultDataverse = null;
3247- }
3248-+
3249-+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
3250-+ } catch (Exception e) {
3251-+ if (bActiveTxn) {
3252-+ MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
3253++ //return if the request is asynchronous
3254++ if (!isSynchronous) {
3255++ return;
3256 + }
3257-+ throw new AlgebricksException(e);
3258-+ } finally {
3259-+ releaseWriteLatch();
3260- }
3261- }
3262-
3263- private void handleDatasetDropStatement(AqlMetadataProvider metadataProvider, Statement stmt,
3264- IHyracksClientConnection hcc, List<JobSpecification> jobsToExecute) throws Exception {
3265-- MetadataTransactionContext mdTxnCtx = metadataProvider.getMetadataTxnContext();
3266-- DropStatement stmtDelete = (DropStatement) stmt;
3267-- String dataverseName = stmtDelete.getDataverseName() == null ? activeDefaultDataverse == null ? null
3268-- : activeDefaultDataverse.getDataverseName() : stmtDelete.getDataverseName().getValue();
3269-- if (dataverseName == null) {
3270-- throw new AlgebricksException(" dataverse not specified ");
3271-- }
3272-- String datasetName = stmtDelete.getDatasetName().getValue();
3273-- Dataset ds = MetadataManager.INSTANCE.getDataset(mdTxnCtx, dataverseName, datasetName);
3274-- if (ds == null) {
3275-- if (!stmtDelete.getIfExists())
3276-- throw new AlgebricksException("There is no dataset with this name " + datasetName + " in dataverse "
3277-- + dataverseName + ".");
3278-- } else {
3279 +
3280-+ MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
3281-+ boolean bActiveTxn = true;
3282-+ metadataProvider.setMetadataTxnContext(mdTxnCtx);
3283-+ acquireWriteLatch();
3284-+
3285-+ try {
3286-+ DropStatement stmtDelete = (DropStatement) stmt;
3287-+ String dataverseName = stmtDelete.getDataverseName() == null ? activeDefaultDataverse == null ? null
3288-+ : activeDefaultDataverse.getDataverseName() : stmtDelete.getDataverseName().getValue();
3289-+ if (dataverseName == null) {
3290-+ throw new AlgebricksException(" dataverse not specified ");
3291-+ }
3292-+ String datasetName = stmtDelete.getDatasetName().getValue();
3293-+
3294-+ Dataset ds = MetadataManager.INSTANCE.getDataset(mdTxnCtx, dataverseName, datasetName);
3295-+ if (ds == null) {
3296-+ if (!stmtDelete.getIfExists()) {
3297-+ throw new AlgebricksException("There is no dataset with this name " + datasetName
3298-+ + " in dataverse " + dataverseName + ".");
3299++ //wait until there is flush.
3300++ boolean isNotified = false;
3301++ while (!isNotified) {
3302++ try {
3303++ logManager.getLogPage(pageIndex).wait();
3304++ isNotified = true;
3305++ } catch (InterruptedException e) {
3306++ e.printStackTrace();
3307 + }
3308-+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
3309-+ return;
3310 + }
3311++ }
3312++ }
3313 +
3314- if (ds.getDatasetType() == DatasetType.INTERNAL || ds.getDatasetType() == DatasetType.FEED) {
3315-+
3316-+ //#. prepare jobs to drop the datatset and the indexes in NC
3317- List<Index> indexes = MetadataManager.INSTANCE.getDatasetIndexes(mdTxnCtx, dataverseName, datasetName);
3318- for (int j = 0; j < indexes.size(); j++) {
3319-- if (indexes.get(j).isPrimaryIndex()) {
3320-- compileIndexDropStatement(hcc, dataverseName, datasetName, indexes.get(j).getIndexName(),
3321-- metadataProvider);
3322-+ if (indexes.get(j).isSecondaryIndex()) {
3323-+ CompiledIndexDropStatement cds = new CompiledIndexDropStatement(dataverseName, datasetName,
3324-+ indexes.get(j).getIndexName());
3325-+ jobsToExecute.add(IndexOperations.buildDropSecondaryIndexJobSpec(cds, metadataProvider));
3326+ @Override
3327+ public void run() {
3328+ while (true) {
3329+ try {
3330+- int pageToFlush = logManager.getNextPageInSequence(logManager.getLastFlushedPage());
3331++ int pageToFlush = logManager.getNextPageInSequence(lastFlushedPageIndex);
3332+
3333+- /*
3334+- * A wait call on the linkedBLockingQueue. The flusher thread is
3335+- * notified when an object is added to the queue. Please note
3336+- * that each page has an associated blocking queue.
3337+- */
3338+- logManager.getPendingFlushRequests(pageToFlush).take();
3339++ // A wait call on the linkedBLockingQueue. The flusher thread is
3340++ // notified when an object is added to the queue. Please note
3341++ // that each page has an associated blocking queue.
3342++ flushRequestQueue[pageToFlush].take();
3343+
3344+- /*
3345+- * The LogFlusher was waiting for a page to be marked as a
3346+- * candidate for flushing. Now that has happened. The thread
3347+- * shall proceed to take a lock on the log page
3348+- */
3349+- synchronized (logManager.getLogPages()[pageToFlush]) {
3350++ synchronized (logManager.getLogPage(pageToFlush)) {
3351+
3352+- /*
3353+- * lock the internal state of the log manager and create a
3354+- * log file if necessary.
3355+- */
3356++ // lock the internal state of the log manager and create a
3357++ // log file if necessary.
3358+ int prevLogFileId = logManager.getLogFileId(logManager.getLastFlushedLsn().get());
3359+ int nextLogFileId = logManager.getLogFileId(logManager.getLastFlushedLsn().get()
3360+ + logManager.getLogManagerProperties().getLogPageSize());
3361+@@ -936,198 +847,60 @@
3362+ logManager.getLogManagerProperties().getLogPageSize());
3363 }
3364- }
3365-+ CompiledDatasetDropStatement cds = new CompiledDatasetDropStatement(dataverseName, datasetName);
3366-+ jobsToExecute.add(DatasetOperations.createDropDatasetJobSpec(cds, metadataProvider));
3367-+
3368-+ //#. mark the existing dataset as PendingDropOp
3369-+ MetadataManager.INSTANCE.dropDataset(mdTxnCtx, dataverseName, datasetName);
3370-+ MetadataManager.INSTANCE.addDataset(
3371-+ mdTxnCtx,
3372-+ new Dataset(dataverseName, datasetName, ds.getItemTypeName(), ds.getDatasetDetails(), ds
3373-+ .getDatasetType(), ds.getDatasetId(), IMetadataEntity.PENDING_DROP_OP));
3374-+
3375-+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
3376-+ bActiveTxn = false;
3377-+
3378-+ //#. run the jobs
3379-+ for (JobSpecification jobSpec : jobsToExecute) {
3380-+ runJob(hcc, jobSpec);
3381-+ }
3382-+
3383-+ mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
3384-+ bActiveTxn = true;
3385-+ metadataProvider.setMetadataTxnContext(mdTxnCtx);
3386- }
3387-- compileDatasetDropStatement(hcc, dataverseName, datasetName, metadataProvider);
3388-+
3389-+ //#. finally, delete the dataset.
3390-+ MetadataManager.INSTANCE.dropDataset(mdTxnCtx, dataverseName, datasetName);
3391-+
3392-+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
3393-+ } catch (Exception e) {
3394-+ if (bActiveTxn) {
3395-+ MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
3396-+ }
3397-+ throw new AlgebricksException(e);
3398-+ } finally {
3399-+ releaseWriteLatch();
3400- }
3401- }
3402
3403- private void handleIndexDropStatement(AqlMetadataProvider metadataProvider, Statement stmt,
3404- IHyracksClientConnection hcc, List<JobSpecification> jobsToExecute) throws Exception {
3405-- MetadataTransactionContext mdTxnCtx = metadataProvider.getMetadataTxnContext();
3406-- IndexDropStatement stmtIndexDrop = (IndexDropStatement) stmt;
3407-- String datasetName = stmtIndexDrop.getDatasetName().getValue();
3408-- String dataverseName = stmtIndexDrop.getDataverseName() == null ? activeDefaultDataverse == null ? null
3409-- : activeDefaultDataverse.getDataverseName() : stmtIndexDrop.getDataverseName().getValue();
3410-- if (dataverseName == null) {
3411-- throw new AlgebricksException(" dataverse not specified ");
3412-+
3413-+ MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
3414-+ boolean bActiveTxn = true;
3415-+ metadataProvider.setMetadataTxnContext(mdTxnCtx);
3416-+ acquireWriteLatch();
3417-+
3418-+ try {
3419-+ IndexDropStatement stmtIndexDrop = (IndexDropStatement) stmt;
3420-+ String datasetName = stmtIndexDrop.getDatasetName().getValue();
3421-+ String dataverseName = stmtIndexDrop.getDataverseName() == null ? activeDefaultDataverse == null ? null
3422-+ : activeDefaultDataverse.getDataverseName() : stmtIndexDrop.getDataverseName().getValue();
3423-+ if (dataverseName == null) {
3424-+ throw new AlgebricksException(" dataverse not specified ");
3425-+ }
3426-+
3427-+ Dataset ds = MetadataManager.INSTANCE.getDataset(mdTxnCtx, dataverseName, datasetName);
3428-+ if (ds == null) {
3429-+ throw new AlgebricksException("There is no dataset with this name " + datasetName + " in dataverse "
3430-+ + dataverseName);
3431-+ }
3432-+
3433-+ if (ds.getDatasetType() == DatasetType.INTERNAL || ds.getDatasetType() == DatasetType.FEED) {
3434-+ String indexName = stmtIndexDrop.getIndexName().getValue();
3435-+ Index index = MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataverseName, datasetName, indexName);
3436-+ if (index == null) {
3437-+ if (!stmtIndexDrop.getIfExists()) {
3438-+ throw new AlgebricksException("There is no index with this name " + indexName + ".");
3439-+ }
3440-+ } else {
3441-+ //#. prepare a job to drop the index in NC.
3442-+ CompiledIndexDropStatement cds = new CompiledIndexDropStatement(dataverseName, datasetName,
3443-+ indexName);
3444-+ jobsToExecute.add(IndexOperations.buildDropSecondaryIndexJobSpec(cds, metadataProvider));
3445-+
3446-+ //#. mark PendingDropOp on the existing index
3447-+ MetadataManager.INSTANCE.dropIndex(mdTxnCtx, dataverseName, datasetName, indexName);
3448-+ MetadataManager.INSTANCE.addIndex(
3449-+ mdTxnCtx,
3450-+ new Index(dataverseName, datasetName, indexName, index.getIndexType(), index
3451-+ .getKeyFieldNames(), index.isPrimaryIndex(), IMetadataEntity.PENDING_DROP_OP));
3452-+
3453-+ //#. commit the existing transaction before calling runJob.
3454-+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
3455-+ bActiveTxn = false;
3456-+
3457-+ for (JobSpecification jobSpec : jobsToExecute) {
3458-+ runJob(hcc, jobSpec);
3459-+ }
3460-+
3461-+ //#. begin a new transaction
3462-+ mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
3463-+ bActiveTxn = true;
3464-+ metadataProvider.setMetadataTxnContext(mdTxnCtx);
3465-+
3466-+ //#. finally, delete the existing index
3467-+ MetadataManager.INSTANCE.dropIndex(mdTxnCtx, dataverseName, datasetName, indexName);
3468-+ }
3469-+ } else {
3470-+ throw new AlgebricksException(datasetName
3471-+ + " is an external dataset. Indexes are not maintained for external datasets.");
3472-+ }
3473-+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
3474-+
3475-+ } catch (Exception e) {
3476-+ if (bActiveTxn) {
3477-+ MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
3478-+ }
3479-+ throw new AlgebricksException(e);
3480-+
3481-+ } finally {
3482-+ releaseWriteLatch();
3483- }
3484-- Dataset ds = MetadataManager.INSTANCE.getDataset(mdTxnCtx, dataverseName, datasetName);
3485-- if (ds == null)
3486-- throw new AlgebricksException("There is no dataset with this name " + datasetName + " in dataverse "
3487-- + dataverseName);
3488-- if (ds.getDatasetType() == DatasetType.INTERNAL || ds.getDatasetType() == DatasetType.FEED) {
3489-- String indexName = stmtIndexDrop.getIndexName().getValue();
3490-- Index idx = MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataverseName, datasetName, indexName);
3491-- if (idx == null) {
3492-- if (!stmtIndexDrop.getIfExists())
3493-- throw new AlgebricksException("There is no index with this name " + indexName + ".");
3494-- } else
3495-- compileIndexDropStatement(hcc, dataverseName, datasetName, indexName, metadataProvider);
3496-- } else {
3497-- throw new AlgebricksException(datasetName
3498-- + " is an external dataset. Indexes are not maintained for external datasets.");
3499-- }
3500- }
3501+- logManager.getLogPage(pageToFlush).flush(); // put the
3502+- // content to
3503+- // disk, the
3504+- // thread still
3505+- // has a lock on
3506+- // the log page
3507++ //#. sleep during the groupCommitWaitTime
3508++ sleep(groupCommitWaitPeriod);
3509
3510- private void handleTypeDropStatement(AqlMetadataProvider metadataProvider, Statement stmt,
3511- List<JobSpecification> jobsToExecute) throws AlgebricksException, MetadataException, RemoteException,
3512- ACIDException {
3513-- MetadataTransactionContext mdTxnCtx = metadataProvider.getMetadataTxnContext();
3514-- TypeDropStatement stmtTypeDrop = (TypeDropStatement) stmt;
3515-- String dataverseName = stmtTypeDrop.getDataverseName() == null ? (activeDefaultDataverse == null ? null
3516-- : activeDefaultDataverse.getDataverseName()) : stmtTypeDrop.getDataverseName().getValue();
3517-- if (dataverseName == null) {
3518-- throw new AlgebricksException(" dataverse not specified ");
3519-+
3520-+ MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
3521-+ metadataProvider.setMetadataTxnContext(mdTxnCtx);
3522-+ acquireWriteLatch();
3523-+
3524-+ try {
3525-+ TypeDropStatement stmtTypeDrop = (TypeDropStatement) stmt;
3526-+ String dataverseName = stmtTypeDrop.getDataverseName() == null ? (activeDefaultDataverse == null ? null
3527-+ : activeDefaultDataverse.getDataverseName()) : stmtTypeDrop.getDataverseName().getValue();
3528-+ if (dataverseName == null) {
3529-+ throw new AlgebricksException(" dataverse not specified ");
3530-+ }
3531-+ String typeName = stmtTypeDrop.getTypeName().getValue();
3532-+ Datatype dt = MetadataManager.INSTANCE.getDatatype(mdTxnCtx, dataverseName, typeName);
3533-+ if (dt == null) {
3534-+ if (!stmtTypeDrop.getIfExists())
3535-+ throw new AlgebricksException("There is no datatype with this name " + typeName + ".");
3536-+ } else {
3537-+ MetadataManager.INSTANCE.dropDatatype(mdTxnCtx, dataverseName, typeName);
3538-+ }
3539-+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
3540-+ } catch (Exception e) {
3541-+ MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
3542-+ throw new AlgebricksException(e);
3543-+ } finally {
3544-+ releaseWriteLatch();
3545- }
3546-- String typeName = stmtTypeDrop.getTypeName().getValue();
3547-- Datatype dt = MetadataManager.INSTANCE.getDatatype(mdTxnCtx, dataverseName, typeName);
3548-- if (dt == null) {
3549-- if (!stmtTypeDrop.getIfExists())
3550-- throw new AlgebricksException("There is no datatype with this name " + typeName + ".");
3551-- } else {
3552-- MetadataManager.INSTANCE.dropDatatype(mdTxnCtx, dataverseName, typeName);
3553-- }
3554- }
3555+- /*
3556+- * acquire lock on the log manager as we need to update the
3557+- * internal bookkeeping data.
3558+- */
3559++ //#. set the logPageStatus to INACTIVE in order to prevent other txns from writing on this page.
3560++ logManager.getLogPageStatus(pageToFlush).set(PageState.INACTIVE);
3561
3562- private void handleNodegroupDropStatement(AqlMetadataProvider metadataProvider, Statement stmt,
3563- List<JobSpecification> jobsToExecute) throws MetadataException, AlgebricksException, RemoteException,
3564- ACIDException {
3565-- MetadataTransactionContext mdTxnCtx = metadataProvider.getMetadataTxnContext();
3566-- NodeGroupDropStatement stmtDelete = (NodeGroupDropStatement) stmt;
3567-- String nodegroupName = stmtDelete.getNodeGroupName().getValue();
3568-- NodeGroup ng = MetadataManager.INSTANCE.getNodegroup(mdTxnCtx, nodegroupName);
3569-- if (ng == null) {
3570-- if (!stmtDelete.getIfExists())
3571-- throw new AlgebricksException("There is no nodegroup with this name " + nodegroupName + ".");
3572-- } else {
3573-- MetadataManager.INSTANCE.dropNodegroup(mdTxnCtx, nodegroupName);
3574-+
3575-+ MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
3576-+ metadataProvider.setMetadataTxnContext(mdTxnCtx);
3577-+ acquireWriteLatch();
3578-+
3579-+ try {
3580-+ NodeGroupDropStatement stmtDelete = (NodeGroupDropStatement) stmt;
3581-+ String nodegroupName = stmtDelete.getNodeGroupName().getValue();
3582-+ NodeGroup ng = MetadataManager.INSTANCE.getNodegroup(mdTxnCtx, nodegroupName);
3583-+ if (ng == null) {
3584-+ if (!stmtDelete.getIfExists())
3585-+ throw new AlgebricksException("There is no nodegroup with this name " + nodegroupName + ".");
3586-+ } else {
3587-+ MetadataManager.INSTANCE.dropNodegroup(mdTxnCtx, nodegroupName);
3588-+ }
3589-+
3590-+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
3591-+ } catch (Exception e) {
3592-+ MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
3593-+ throw new AlgebricksException(e);
3594-+ } finally {
3595-+ releaseWriteLatch();
3596- }
3597- }
3598+- // increment the last flushed lsn.
3599+- long lastFlushedLsn = logManager.incrementLastFlushedLsn(logManager.getLogManagerProperties()
3600+- .getLogPageSize());
3601++ //#. need to wait until the logPageOwnerCount reaches 1 (LOG_WRITER)
3602++ // meaning every one has finished writing logs on this page.
3603++ while (logManager.getLogPageOwnershipCount(pageToFlush).get() != PageOwnershipStatus.LOG_WRITER) {
3604++ sleep(0);
3605++ }
3606
3607- private void handleCreateFunctionStatement(AqlMetadataProvider metadataProvider, Statement stmt,
3608- List<JobSpecification> jobsToExecute) throws AlgebricksException, MetadataException, RemoteException,
3609- ACIDException {
3610-- MetadataTransactionContext mdTxnCtx = metadataProvider.getMetadataTxnContext();
3611-- CreateFunctionStatement cfs = (CreateFunctionStatement) stmt;
3612-- String dataverse = cfs.getSignature().getNamespace() == null ? activeDefaultDataverse == null ? null
3613-- : activeDefaultDataverse.getDataverseName() : cfs.getSignature().getNamespace();
3614-- if (dataverse == null) {
3615-- throw new AlgebricksException(" dataverse not specified ");
3616-+ MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
3617-+ metadataProvider.setMetadataTxnContext(mdTxnCtx);
3618-+ acquireWriteLatch();
3619-+
3620-+ try {
3621-+ CreateFunctionStatement cfs = (CreateFunctionStatement) stmt;
3622-+ String dataverse = cfs.getSignature().getNamespace() == null ? activeDefaultDataverse == null ? null
3623-+ : activeDefaultDataverse.getDataverseName() : cfs.getSignature().getNamespace();
3624-+ if (dataverse == null) {
3625-+ throw new AlgebricksException(" dataverse not specified ");
3626-+ }
3627-+ Dataverse dv = MetadataManager.INSTANCE.getDataverse(mdTxnCtx, dataverse);
3628-+ if (dv == null) {
3629-+ throw new AlgebricksException("There is no dataverse with this name " + dataverse + ".");
3630-+ }
3631-+ Function function = new Function(dataverse, cfs.getaAterixFunction().getName(), cfs.getaAterixFunction()
3632-+ .getArity(), cfs.getParamList(), Function.RETURNTYPE_VOID, cfs.getFunctionBody(),
3633-+ Function.LANGUAGE_AQL, FunctionKind.SCALAR.toString());
3634-+ MetadataManager.INSTANCE.addFunction(mdTxnCtx, function);
3635-+
3636-+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
3637-+ } catch (Exception e) {
3638-+ MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
3639-+ throw new AlgebricksException(e);
3640-+ } finally {
3641-+ releaseWriteLatch();
3642- }
3643-- Dataverse dv = MetadataManager.INSTANCE.getDataverse(mdTxnCtx, dataverse);
3644-- if (dv == null) {
3645-- throw new AlgebricksException("There is no dataverse with this name " + dataverse + ".");
3646-- }
3647-- Function function = new Function(dataverse, cfs.getaAterixFunction().getName(), cfs.getaAterixFunction()
3648-- .getArity(), cfs.getParamList(), Function.RETURNTYPE_VOID, cfs.getFunctionBody(),
3649-- Function.LANGUAGE_AQL, FunctionKind.SCALAR.toString());
3650-- MetadataManager.INSTANCE.addFunction(mdTxnCtx, function);
3651- }
3652+- /*
3653+- * the log manager gains back ownership of the page. this is
3654+- * reflected by incrementing the owner count of the page.
3655+- * recall that when the page is begin flushed the owner
3656+- * count is actually 0 Value of zero implicitly indicates
3657+- * that the page is operated upon by the log flusher thread.
3658+- */
3659+- logManager.getLogPageOwnershipCount(pageToFlush).incrementAndGet();
3660++ //#. set the logPageOwnerCount to 0 (LOG_FLUSHER)
3661++ // meaning it is flushing.
3662++ logManager.getLogPageOwnershipCount(pageToFlush).set(PageOwnershipStatus.LOG_FLUSHER);
3663
3664- private void handleFunctionDropStatement(AqlMetadataProvider metadataProvider, Statement stmt,
3665- List<JobSpecification> jobsToExecute) throws MetadataException, RemoteException, ACIDException,
3666- AlgebricksException {
3667-- MetadataTransactionContext mdTxnCtx = metadataProvider.getMetadataTxnContext();
3668-- FunctionDropStatement stmtDropFunction = (FunctionDropStatement) stmt;
3669-- FunctionSignature signature = stmtDropFunction.getFunctionSignature();
3670-- Function function = MetadataManager.INSTANCE.getFunction(mdTxnCtx, signature);
3671-- if (function == null) {
3672-- if (!stmtDropFunction.getIfExists())
3673-- throw new AlgebricksException("Unknonw function " + signature);
3674-- } else {
3675-- MetadataManager.INSTANCE.dropFunction(mdTxnCtx, signature);
3676-+ MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
3677-+ metadataProvider.setMetadataTxnContext(mdTxnCtx);
3678-+ acquireWriteLatch();
3679-+
3680-+ try {
3681-+ FunctionDropStatement stmtDropFunction = (FunctionDropStatement) stmt;
3682-+ FunctionSignature signature = stmtDropFunction.getFunctionSignature();
3683-+ Function function = MetadataManager.INSTANCE.getFunction(mdTxnCtx, signature);
3684-+ if (function == null) {
3685-+ if (!stmtDropFunction.getIfExists())
3686-+ throw new AlgebricksException("Unknonw function " + signature);
3687-+ } else {
3688-+ MetadataManager.INSTANCE.dropFunction(mdTxnCtx, signature);
3689-+ }
3690-+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
3691-+ } catch (Exception e) {
3692-+ MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
3693-+ throw new AlgebricksException(e);
3694-+ } finally {
3695-+ releaseWriteLatch();
3696- }
3697- }
3698+- /*
3699+- * get the number of log buffers that have been written so
3700+- * far. A log buffer = number of log pages * size of a log
3701+- * page
3702+- */
3703+- int numCycles = (int) lastFlushedLsn / logManager.getLogManagerProperties().getLogBufferSize();
3704+- if (lastFlushedLsn % logManager.getLogManagerProperties().getLogBufferSize() == 0) {
3705+- numCycles--;
3706+- }
3707++ // put the content to disk (the thread still has a lock on the log page)
3708++ logManager.getLogPage(pageToFlush).flush();
3709
3710- private void handleLoadFromFileStatement(AqlMetadataProvider metadataProvider, Statement stmt,
3711- IHyracksClientConnection hcc, List<JobSpecification> jobsToExecute) throws Exception {
3712-- MetadataTransactionContext mdTxnCtx = metadataProvider.getMetadataTxnContext();
3713-- LoadFromFileStatement loadStmt = (LoadFromFileStatement) stmt;
3714-- String dataverseName = loadStmt.getDataverseName() == null ? activeDefaultDataverse == null ? null
3715-- : activeDefaultDataverse.getDataverseName() : loadStmt.getDataverseName().getValue();
3716-- CompiledLoadFromFileStatement cls = new CompiledLoadFromFileStatement(dataverseName, loadStmt.getDatasetName()
3717-- .getValue(), loadStmt.getAdapter(), loadStmt.getProperties(), loadStmt.dataIsAlreadySorted());
3718+- /*
3719+- * Map the log page to a new region in the log file.
3720+- */
3721++ // increment the last flushed lsn and lastFlushedPage
3722++ logManager.incrementLastFlushedLsn(logManager.getLogManagerProperties().getLogPageSize());
3723++ lastFlushedPageIndex = pageToFlush;
3724
3725-- IDataFormat format = getDataFormat(metadataProvider.getMetadataTxnContext(), dataverseName);
3726-- Job job = DatasetOperations.createLoadDatasetJobSpec(metadataProvider, cls, format);
3727-- jobsToExecute.add(job.getJobSpec());
3728-- // Also load the dataset's secondary indexes.
3729-- List<Index> datasetIndexes = MetadataManager.INSTANCE.getDatasetIndexes(mdTxnCtx, dataverseName, loadStmt
3730-- .getDatasetName().getValue());
3731-- for (Index index : datasetIndexes) {
3732-- if (!index.isSecondaryIndex()) {
3733-- continue;
3734-+ MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
3735-+ boolean bActiveTxn = true;
3736-+ metadataProvider.setMetadataTxnContext(mdTxnCtx);
3737-+ acquireReadLatch();
3738++ // decrement activeTxnCountOnIndexes
3739++ logManager.decrementActiveTxnCountOnIndexes(pageToFlush);
3740 +
3741-+ try {
3742-+ LoadFromFileStatement loadStmt = (LoadFromFileStatement) stmt;
3743-+ String dataverseName = loadStmt.getDataverseName() == null ? activeDefaultDataverse == null ? null
3744-+ : activeDefaultDataverse.getDataverseName() : loadStmt.getDataverseName().getValue();
3745-+ CompiledLoadFromFileStatement cls = new CompiledLoadFromFileStatement(dataverseName, loadStmt
3746-+ .getDatasetName().getValue(), loadStmt.getAdapter(), loadStmt.getProperties(),
3747-+ loadStmt.dataIsAlreadySorted());
3748++ // reset the count to 1
3749++ logManager.getLogPageOwnershipCount(pageToFlush).set(PageOwnershipStatus.LOG_WRITER);
3750 +
3751-+ IDataFormat format = getDataFormat(metadataProvider.getMetadataTxnContext(), dataverseName);
3752-+ Job job = DatasetOperations.createLoadDatasetJobSpec(metadataProvider, cls, format);
3753-+ jobsToExecute.add(job.getJobSpec());
3754-+ // Also load the dataset's secondary indexes.
3755-+ List<Index> datasetIndexes = MetadataManager.INSTANCE.getDatasetIndexes(mdTxnCtx, dataverseName, loadStmt
3756-+ .getDatasetName().getValue());
3757-+ for (Index index : datasetIndexes) {
3758-+ if (!index.isSecondaryIndex()) {
3759-+ continue;
3760-+ }
3761-+ // Create CompiledCreateIndexStatement from metadata entity 'index'.
3762-+ CompiledCreateIndexStatement cis = new CompiledCreateIndexStatement(index.getIndexName(),
3763-+ dataverseName, index.getDatasetName(), index.getKeyFieldNames(), index.getGramLength(),
3764-+ index.getIndexType());
3765-+ jobsToExecute.add(IndexOperations.buildSecondaryIndexLoadingJobSpec(cis, metadataProvider));
3766- }
3767-- // Create CompiledCreateIndexStatement from metadata entity 'index'.
3768-- CompiledCreateIndexStatement cis = new CompiledCreateIndexStatement(index.getIndexName(), dataverseName,
3769-- index.getDatasetName(), index.getKeyFieldNames(), index.getGramLength(), index.getIndexType());
3770-- jobsToExecute.add(IndexOperations.buildSecondaryIndexLoadingJobSpec(cis, metadataProvider));
3771-+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
3772-+ bActiveTxn = false;
3773-+
3774-+ for (JobSpecification jobspec : jobsToExecute) {
3775-+ runJob(hcc, jobspec);
3776-+ }
3777-+ } catch (Exception e) {
3778-+ if (bActiveTxn) {
3779-+ MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
3780-+ }
3781-+ throw new AlgebricksException(e);
3782-+ } finally {
3783-+ releaseReadLatch();
3784- }
3785- }
3786++ // Map the log page to a new region in the log file.
3787+ long nextWritePosition = logManager.getLogPages()[pageToFlush].getNextWritePosition()
3788+ + logManager.getLogManagerProperties().getLogBufferSize();
3789
3790- private void handleWriteFromQueryResultStatement(AqlMetadataProvider metadataProvider, Statement stmt,
3791- IHyracksClientConnection hcc, List<JobSpecification> jobsToExecute) throws Exception {
3792-- metadataProvider.setWriteTransaction(true);
3793-- WriteFromQueryResultStatement st1 = (WriteFromQueryResultStatement) stmt;
3794-- String dataverseName = st1.getDataverseName() == null ? activeDefaultDataverse == null ? null
3795-- : activeDefaultDataverse.getDataverseName() : st1.getDataverseName().getValue();
3796-- CompiledWriteFromQueryResultStatement clfrqs = new CompiledWriteFromQueryResultStatement(dataverseName, st1
3797-- .getDatasetName().getValue(), st1.getQuery(), st1.getVarCounter());
3798+- /*
3799+- * long nextPos = (numCycles + 1)
3800+- * logManager.getLogManagerProperties() .getLogBufferSize()
3801+- * + pageToFlush logManager.getLogManagerProperties()
3802+- * .getLogPageSize();
3803+- */
3804+ logManager.resetLogPage(nextWritePosition, pageToFlush);
3805
3806-- Pair<JobSpecification, FileSplit> compiled = rewriteCompileQuery(metadataProvider, clfrqs.getQuery(), clfrqs);
3807-- if (compiled.first != null) {
3808-- jobsToExecute.add(compiled.first);
3809-+ MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
3810-+ boolean bActiveTxn = true;
3811-+ metadataProvider.setMetadataTxnContext(mdTxnCtx);
3812-+ acquireReadLatch();
3813-+
3814-+ try {
3815-+ metadataProvider.setWriteTransaction(true);
3816-+ WriteFromQueryResultStatement st1 = (WriteFromQueryResultStatement) stmt;
3817-+ String dataverseName = st1.getDataverseName() == null ? activeDefaultDataverse == null ? null
3818-+ : activeDefaultDataverse.getDataverseName() : st1.getDataverseName().getValue();
3819-+ CompiledWriteFromQueryResultStatement clfrqs = new CompiledWriteFromQueryResultStatement(dataverseName, st1
3820-+ .getDatasetName().getValue(), st1.getQuery(), st1.getVarCounter());
3821-+
3822-+ Pair<JobSpecification, FileSplit> compiled = rewriteCompileQuery(metadataProvider, clfrqs.getQuery(),
3823-+ clfrqs);
3824-+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
3825-+ bActiveTxn = false;
3826-+ if (compiled.first != null) {
3827-+ runJob(hcc, compiled.first);
3828-+ }
3829-+ } catch (Exception e) {
3830-+ if (bActiveTxn) {
3831-+ MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
3832-+ }
3833-+ throw new AlgebricksException(e);
3834-+ } finally {
3835-+ releaseReadLatch();
3836- }
3837- }
3838+ // mark the page as ACTIVE
3839+ logManager.getLogPageStatus(pageToFlush).set(LogManager.PageState.ACTIVE);
3840
3841- private void handleInsertStatement(AqlMetadataProvider metadataProvider, Statement stmt,
3842- IHyracksClientConnection hcc, List<JobSpecification> jobsToExecute) throws Exception {
3843-- metadataProvider.setWriteTransaction(true);
3844-- InsertStatement stmtInsert = (InsertStatement) stmt;
3845-- String dataverseName = stmtInsert.getDataverseName() == null ? activeDefaultDataverse == null ? null
3846-- : activeDefaultDataverse.getDataverseName() : stmtInsert.getDataverseName().getValue();
3847-- CompiledInsertStatement clfrqs = new CompiledInsertStatement(dataverseName, stmtInsert.getDatasetName()
3848-- .getValue(), stmtInsert.getQuery(), stmtInsert.getVarCounter());
3849-- Pair<JobSpecification, FileSplit> compiled = rewriteCompileQuery(metadataProvider, clfrqs.getQuery(), clfrqs);
3850-- if (compiled.first != null) {
3851-- jobsToExecute.add(compiled.first);
3852-+
3853-+ MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
3854-+ boolean bActiveTxn = true;
3855-+ metadataProvider.setMetadataTxnContext(mdTxnCtx);
3856-+ acquireReadLatch();
3857-+
3858-+ try {
3859-+ metadataProvider.setWriteTransaction(true);
3860-+ InsertStatement stmtInsert = (InsertStatement) stmt;
3861-+ String dataverseName = stmtInsert.getDataverseName() == null ? activeDefaultDataverse == null ? null
3862-+ : activeDefaultDataverse.getDataverseName() : stmtInsert.getDataverseName().getValue();
3863-+ CompiledInsertStatement clfrqs = new CompiledInsertStatement(dataverseName, stmtInsert.getDatasetName()
3864-+ .getValue(), stmtInsert.getQuery(), stmtInsert.getVarCounter());
3865-+ Pair<JobSpecification, FileSplit> compiled = rewriteCompileQuery(metadataProvider, clfrqs.getQuery(),
3866-+ clfrqs);
3867-+
3868-+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
3869-+ bActiveTxn = false;
3870-+
3871-+ if (compiled.first != null) {
3872-+ runJob(hcc, compiled.first);
3873-+ }
3874-+
3875-+ } catch (Exception e) {
3876-+ if (bActiveTxn) {
3877-+ MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
3878-+ }
3879-+ throw new AlgebricksException(e);
3880-+ } finally {
3881-+ releaseReadLatch();
3882- }
3883- }
3884+- // notify all waiting (transaction) threads.
3885+- // Transaction thread may be waiting for the page to be
3886+- // available or may have a commit log record on the page
3887+- // that got flushed.
3888+- logManager.getLogPages()[pageToFlush].notifyAll();
3889+- logManager.setLastFlushedPage(pageToFlush);
3890++ //#. checks the queue whether there is another flush request on the same log buffer
3891++ // If there is another request, then simply remove it.
3892++ if (flushRequestQueue[pageToFlush].peek() != null) {
3893++ flushRequestQueue[pageToFlush].take();
3894++ }
3895
3896- private void handleDeleteStatement(AqlMetadataProvider metadataProvider, Statement stmt,
3897- IHyracksClientConnection hcc, List<JobSpecification> jobsToExecute) throws Exception {
3898-- metadataProvider.setWriteTransaction(true);
3899-- DeleteStatement stmtDelete = (DeleteStatement) stmt;
3900-- String dataverseName = stmtDelete.getDataverseName() == null ? activeDefaultDataverse == null ? null
3901-- : activeDefaultDataverse.getDataverseName() : stmtDelete.getDataverseName().getValue();
3902-- CompiledDeleteStatement clfrqs = new CompiledDeleteStatement(stmtDelete.getVariableExpr(), dataverseName,
3903-- stmtDelete.getDatasetName().getValue(), stmtDelete.getCondition(), stmtDelete.getDieClause(),
3904-- stmtDelete.getVarCounter(), metadataProvider);
3905-- Pair<JobSpecification, FileSplit> compiled = rewriteCompileQuery(metadataProvider, clfrqs.getQuery(), clfrqs);
3906-- if (compiled.first != null) {
3907-- jobsToExecute.add(compiled.first);
3908-+
3909-+ MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
3910-+ boolean bActiveTxn = true;
3911-+ metadataProvider.setMetadataTxnContext(mdTxnCtx);
3912-+ acquireReadLatch();
3913-+
3914-+ try {
3915-+ metadataProvider.setWriteTransaction(true);
3916-+ DeleteStatement stmtDelete = (DeleteStatement) stmt;
3917-+ String dataverseName = stmtDelete.getDataverseName() == null ? activeDefaultDataverse == null ? null
3918-+ : activeDefaultDataverse.getDataverseName() : stmtDelete.getDataverseName().getValue();
3919-+ CompiledDeleteStatement clfrqs = new CompiledDeleteStatement(stmtDelete.getVariableExpr(), dataverseName,
3920-+ stmtDelete.getDatasetName().getValue(), stmtDelete.getCondition(), stmtDelete.getDieClause(),
3921-+ stmtDelete.getVarCounter(), metadataProvider);
3922-+ Pair<JobSpecification, FileSplit> compiled = rewriteCompileQuery(metadataProvider, clfrqs.getQuery(),
3923-+ clfrqs);
3924-+
3925-+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
3926-+ bActiveTxn = false;
3927-+
3928-+ if (compiled.first != null) {
3929-+ runJob(hcc, compiled.first);
3930-+ }
3931-+
3932-+ } catch (Exception e) {
3933-+ if (bActiveTxn) {
3934-+ MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
3935-+ }
3936-+ throw new AlgebricksException(e);
3937-+ } finally {
3938-+ releaseReadLatch();
3939++ // notify all waiting (transaction) threads.
3940++ logManager.getLogPage(pageToFlush).notifyAll();
3941+ }
3942+ } catch (IOException ioe) {
3943+ ioe.printStackTrace();
3944+ throw new Error(" exception in flushing log page", ioe);
3945+ } catch (InterruptedException e) {
3946+ e.printStackTrace();
3947+- break; // must break from the loop as the exception indicates
3948+- // some thing horrendous has happened elsewhere
3949++ break;
3950+ }
3951 }
3952 }
3953+-}
3954+-
3955+-/*
3956+- * TODO: By default the commit policy is to commit at each request and not have
3957+- * a group commit. The following code needs to change to support group commit.
3958+- * The code for group commit has not been tested thoroughly and is under
3959+- * development.
3960+- */
3961+-class BasicCommitResolver implements ICommitResolver {
3962+-
3963+- public boolean shouldCommitPage(int pageIndex, LogManager logManager,
3964+- CommitRequestStatistics commitRequestStatistics) {
3965+- return true;
3966+- }
3967+-
3968+- public void init(LogManager logManager) {
3969+- }
3970+-}
3971+-
3972+-class GroupCommitResolver implements ICommitResolver {
3973+-
3974+- public boolean shouldCommitPage(int pageIndex, LogManager logManager,
3975+- CommitRequestStatistics commitRequestStatistics) {
3976+- long maxCommitWait = logManager.getLogManagerProperties().getGroupCommitWaitPeriod();
3977+- long timestamp = commitRequestStatistics.getPageLevelLastCommitRequestTimestamp(pageIndex);
3978+- if (timestamp == -1) {
3979+- if (maxCommitWait == 0) {
3980+- return true;
3981+- } else {
3982+- timestamp = System.currentTimeMillis();
3983+- }
3984+- }
3985+- long currenTime = System.currentTimeMillis();
3986+- if (currenTime - timestamp > maxCommitWait) {
3987+- return true;
3988+- }
3989+- return false;
3990+- }
3991+-
3992+- public void init(LogManager logManager) {
3993+- GroupCommitHandlerThread groupCommitHandler = new GroupCommitHandlerThread(logManager);
3994+- groupCommitHandler.setDaemon(true);
3995+- groupCommitHandler.start();
3996+- }
3997+-
3998+- class GroupCommitHandlerThread extends Thread {
3999+-
4000+- private LogManager logManager;
4001+-
4002+- public GroupCommitHandlerThread(LogManager logManager) {
4003+- this.logManager = logManager;
4004+- setName("Group Commit Handler");
4005+- }
4006+-
4007+- @Override
4008+- public void run() {
4009+- int pageIndex = -1;
4010+- while (true) {
4011+- pageIndex = logManager.getNextPageInSequence(pageIndex);
4012+- long lastCommitRequeestTimestamp = logManager.getCommitRequestStatistics()
4013+- .getPageLevelLastCommitRequestTimestamp(pageIndex);
4014+- if (lastCommitRequeestTimestamp != -1
4015+- && System.currentTimeMillis() - lastCommitRequeestTimestamp > logManager
4016+- .getLogManagerProperties().getGroupCommitWaitPeriod()) {
4017+- int dirtyCount = logManager.getLogPageOwnershipCount(pageIndex).decrementAndGet();
4018+- if (dirtyCount == 0) {
4019+- try {
4020+- logManager.getLogPageStatus(pageIndex).set(LogManager.PageState.INACTIVE);
4021+- logManager.getPendingFlushRequests(pageIndex).put(Thread.currentThread());
4022+- } catch (InterruptedException e) {
4023+- e.printStackTrace();
4024+- break;
4025+- }
4026+- logManager.getCommitRequestStatistics().committedPage(pageIndex);
4027+- }
4028+- }
4029+- }
4030+- }
4031+- }
4032+-
4033+-}
4034+-
4035+-interface ICommitResolver {
4036+- public boolean shouldCommitPage(int pageIndex, LogManager logManager,
4037+- CommitRequestStatistics commitRequestStatistics);
4038+-
4039+- public void init(LogManager logManager);
4040+-}
4041+-
4042+-/**
4043+- * Represents a collection of all commit requests by transactions for each log
4044+- * page. The requests are accumulated until the commit policy triggers a flush
4045+- * of the corresponding log page. Upon a flush of a page, all commit requests
4046+- * for the page are cleared.
4047+- */
4048+-class CommitRequestStatistics {
4049+-
4050+- AtomicInteger[] pageLevelCommitRequestCount;
4051+- AtomicLong[] pageLevelLastCommitRequestTimestamp;
4052+-
4053+- public CommitRequestStatistics(int numPages) {
4054+- pageLevelCommitRequestCount = new AtomicInteger[numPages];
4055+- pageLevelLastCommitRequestTimestamp = new AtomicLong[numPages];
4056+- for (int i = 0; i < numPages; i++) {
4057+- pageLevelCommitRequestCount[i] = new AtomicInteger(0);
4058+- pageLevelLastCommitRequestTimestamp[i] = new AtomicLong(-1L);
4059+- }
4060+- }
4061+-
4062+- public void registerCommitRequest(int pageIndex) {
4063+- pageLevelCommitRequestCount[pageIndex].incrementAndGet();
4064+- pageLevelLastCommitRequestTimestamp[pageIndex].set(System.currentTimeMillis());
4065+- }
4066+-
4067+- public long getPageLevelLastCommitRequestTimestamp(int pageIndex) {
4068+- return pageLevelLastCommitRequestTimestamp[pageIndex].get();
4069+- }
4070+-
4071+- public void committedPage(int pageIndex) {
4072+- pageLevelCommitRequestCount[pageIndex].set(0);
4073+- pageLevelLastCommitRequestTimestamp[pageIndex].set(-1L);
4074+- }
4075+-
4076+-}
4077++}
4078+\ No newline at end of file
4079+Index: asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogRecordHelper.java
4080+===================================================================
4081+--- asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogRecordHelper.java (revision 1194)
4082++++ asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogRecordHelper.java (working copy)
4083+@@ -152,6 +152,9 @@
4084+ case LogType.UPDATE:
4085+ logTypeDisplay = "UPDATE";
4086+ break;
4087++ case LogType.ENTITY_COMMIT:
4088++ logTypeDisplay = "ENTITY_COMMIT";
4089++ break;
4090+ }
4091+ builder.append(" LSN : ").append(logicalLogLocator.getLsn());
4092+ builder.append(" Log Type : ").append(logTypeDisplay);
4093+Index: asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogType.java
4094+===================================================================
4095+--- asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogType.java (revision 1194)
4096++++ asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogType.java (working copy)
4097+@@ -18,5 +18,6 @@
4098
4099-@@ -704,46 +1106,109 @@
4100+ public static final byte UPDATE = 0;
4101+ public static final byte COMMIT = 1;
4102++ public static final byte ENTITY_COMMIT = 2;
4103
4104- private void handleBeginFeedStatement(AqlMetadataProvider metadataProvider, Statement stmt,
4105- IHyracksClientConnection hcc, List<JobSpecification> jobsToExecute) throws Exception {
4106-- BeginFeedStatement bfs = (BeginFeedStatement) stmt;
4107-- String dataverseName = bfs.getDataverseName() == null ? activeDefaultDataverse == null ? null
4108-- : activeDefaultDataverse.getDataverseName() : bfs.getDatasetName().getValue();
4109+ }
4110+Index: asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogManagerProperties.java
4111+===================================================================
4112+--- asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogManagerProperties.java (revision 1194)
4113++++ asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogManagerProperties.java (working copy)
4114+@@ -1,5 +1,5 @@
4115+ /*
4116+- * Copyright 2009-2010 by The Regents of the University of California
4117++ * Copyright 2009-2012 by The Regents of the University of California
4118+ * Licensed under the Apache License, Version 2.0 (the "License");
4119+ * you may not use this file except in compliance with the License.
4120+ * you may obtain a copy of the License from
4121+@@ -41,7 +41,7 @@
4122+ private int logPageSize = 128 * 1024; // 128 KB
4123+ private int numLogPages = 8; // number of log pages in the log buffer.
4124
4125-- CompiledBeginFeedStatement cbfs = new CompiledBeginFeedStatement(dataverseName,
4126-- bfs.getDatasetName().getValue(), bfs.getQuery(), bfs.getVarCounter());
4127-+ MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
4128-+ boolean bActiveTxn = true;
4129-+ metadataProvider.setMetadataTxnContext(mdTxnCtx);
4130-+ acquireReadLatch();
4131+- private long groupCommitWaitPeriod = 0; // time in milliseconds for which a
4132++ private long groupCommitWaitPeriod = 1; // time in milliseconds for which a
4133+ // commit record will wait before
4134+ // the housing page is marked for
4135+ // flushing.
4136+Index: asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/recovery/RecoveryManager.java
4137+===================================================================
4138+--- asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/recovery/RecoveryManager.java (revision 1194)
4139++++ asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/recovery/RecoveryManager.java (working copy)
4140+@@ -184,6 +184,7 @@
4141+ break;
4142
4143-- Dataset dataset;
4144-- dataset = MetadataManager.INSTANCE.getDataset(metadataProvider.getMetadataTxnContext(), dataverseName, bfs
4145-- .getDatasetName().getValue());
4146-- IDatasetDetails datasetDetails = dataset.getDatasetDetails();
4147-- if (datasetDetails.getDatasetType() != DatasetType.FEED) {
4148-- throw new IllegalArgumentException("Dataset " + bfs.getDatasetName().getValue() + " is not a feed dataset");
4149-+ try {
4150-+ BeginFeedStatement bfs = (BeginFeedStatement) stmt;
4151-+ String dataverseName = bfs.getDataverseName() == null ? activeDefaultDataverse == null ? null
4152-+ : activeDefaultDataverse.getDataverseName() : bfs.getDatasetName().getValue();
4153-+
4154-+ CompiledBeginFeedStatement cbfs = new CompiledBeginFeedStatement(dataverseName, bfs.getDatasetName()
4155-+ .getValue(), bfs.getQuery(), bfs.getVarCounter());
4156-+
4157-+ Dataset dataset;
4158-+ dataset = MetadataManager.INSTANCE.getDataset(metadataProvider.getMetadataTxnContext(), dataverseName, bfs
4159-+ .getDatasetName().getValue());
4160-+ IDatasetDetails datasetDetails = dataset.getDatasetDetails();
4161-+ if (datasetDetails.getDatasetType() != DatasetType.FEED) {
4162-+ throw new IllegalArgumentException("Dataset " + bfs.getDatasetName().getValue()
4163-+ + " is not a feed dataset");
4164-+ }
4165-+ bfs.initialize(metadataProvider.getMetadataTxnContext(), dataset);
4166-+ cbfs.setQuery(bfs.getQuery());
4167-+ Pair<JobSpecification, FileSplit> compiled = rewriteCompileQuery(metadataProvider, bfs.getQuery(), cbfs);
4168-+
4169-+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
4170-+ bActiveTxn = false;
4171-+
4172-+ if (compiled.first != null) {
4173-+ runJob(hcc, compiled.first);
4174-+ }
4175-+
4176-+ } catch (Exception e) {
4177-+ if (bActiveTxn) {
4178-+ MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
4179-+ }
4180-+ throw new AlgebricksException(e);
4181-+ } finally {
4182-+ releaseReadLatch();
4183- }
4184-- bfs.initialize(metadataProvider.getMetadataTxnContext(), dataset);
4185-- cbfs.setQuery(bfs.getQuery());
4186-- Pair<JobSpecification, FileSplit> compiled = rewriteCompileQuery(metadataProvider, bfs.getQuery(), cbfs);
4187-- if (compiled.first != null) {
4188-- jobsToExecute.add(compiled.first);
4189-- }
4190- }
4191+ case LogType.COMMIT:
4192++ case LogType.ENTITY_COMMIT:
4193+ tempKeyTxnId.setTxnId(logRecordHelper.getJobId(currentLogLocator),
4194+ logRecordHelper.getDatasetId(currentLogLocator),
4195+ logRecordHelper.getPKHashValue(currentLogLocator));
4196+@@ -218,6 +219,7 @@
4197+ IIndex index = null;
4198+ LocalResource localResource = null;
4199+ ILocalResourceMetadata localResourceMetadata = null;
4200++ List<Long> resourceIdList = new ArrayList<Long>();
4201
4202- private void handleControlFeedStatement(AqlMetadataProvider metadataProvider, Statement stmt,
4203- IHyracksClientConnection hcc, List<JobSpecification> jobsToExecute) throws Exception {
4204-- ControlFeedStatement cfs = (ControlFeedStatement) stmt;
4205-- String dataverseName = cfs.getDataverseName() == null ? activeDefaultDataverse == null ? null
4206-- : activeDefaultDataverse.getDataverseName() : cfs.getDatasetName().getValue();
4207-- CompiledControlFeedStatement clcfs = new CompiledControlFeedStatement(cfs.getOperationType(), dataverseName,
4208-- cfs.getDatasetName().getValue(), cfs.getAlterAdapterConfParams());
4209-- jobsToExecute.add(FeedOperations.buildControlFeedJobSpec(clcfs, metadataProvider));
4210-+
4211-+ MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
4212-+ boolean bActiveTxn = true;
4213-+ metadataProvider.setMetadataTxnContext(mdTxnCtx);
4214-+ acquireReadLatch();
4215-+
4216-+ try {
4217-+ ControlFeedStatement cfs = (ControlFeedStatement) stmt;
4218-+ String dataverseName = cfs.getDataverseName() == null ? activeDefaultDataverse == null ? null
4219-+ : activeDefaultDataverse.getDataverseName() : cfs.getDatasetName().getValue();
4220-+ CompiledControlFeedStatement clcfs = new CompiledControlFeedStatement(cfs.getOperationType(),
4221-+ dataverseName, cfs.getDatasetName().getValue(), cfs.getAlterAdapterConfParams());
4222-+ JobSpecification jobSpec = FeedOperations.buildControlFeedJobSpec(clcfs, metadataProvider);
4223-+
4224-+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
4225-+ bActiveTxn = false;
4226-+
4227-+ runJob(hcc, jobSpec);
4228-+
4229-+ } catch (Exception e) {
4230-+ if (bActiveTxn) {
4231-+ MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
4232-+ }
4233-+ throw new AlgebricksException(e);
4234-+ } finally {
4235-+ releaseReadLatch();
4236-+ }
4237- }
4238+ //#. get indexLifeCycleManager
4239+ IAsterixAppRuntimeContextProvider appRuntimeContext = txnSubsystem.getAsterixAppRuntimeContextProvider();
4240+@@ -272,6 +274,8 @@
4241+ index = localResourceMetadata.createIndexInstance(appRuntimeContext,
4242+ localResource.getResourceName(), localResource.getPartition());
4243+ indexLifecycleManager.register(resourceId, index);
4244++ indexLifecycleManager.open(resourceId);
4245++ resourceIdList.add(resourceId);
4246+ }
4247
4248- private QueryResult handleQuery(AqlMetadataProvider metadataProvider, Query query, IHyracksClientConnection hcc,
4249- List<JobSpecification> jobsToExecute) throws Exception {
4250-- Pair<JobSpecification, FileSplit> compiled = rewriteCompileQuery(metadataProvider, query, null);
4251-- if (compiled.first != null) {
4252-- GlobalConfig.ASTERIX_LOGGER.info(compiled.first.toJSON().toString(1));
4253-- jobsToExecute.add(compiled.first);
4254-+
4255-+ MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
4256-+ boolean bActiveTxn = true;
4257-+ metadataProvider.setMetadataTxnContext(mdTxnCtx);
4258-+ acquireReadLatch();
4259-+
4260-+ try {
4261-+ Pair<JobSpecification, FileSplit> compiled = rewriteCompileQuery(metadataProvider, query, null);
4262-+
4263-+ QueryResult queryResult = new QueryResult(query, compiled.second.getLocalFile().getFile().getAbsolutePath());
4264-+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
4265-+ bActiveTxn = false;
4266-+
4267-+ if (compiled.first != null) {
4268-+ GlobalConfig.ASTERIX_LOGGER.info(compiled.first.toJSON().toString(1));
4269-+ runJob(hcc, compiled.first);
4270-+ }
4271-+
4272-+ return queryResult;
4273-+ } catch (Exception e) {
4274-+ if (bActiveTxn) {
4275-+ MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
4276-+ }
4277-+ throw new AlgebricksException(e);
4278-+ } finally {
4279-+ releaseReadLatch();
4280- }
4281-- return new QueryResult(query, compiled.second.getLocalFile().getFile().getAbsolutePath());
4282- }
4283+ /***************************************************/
4284+@@ -300,6 +304,7 @@
4285+ break;
4286
4287- private void runCreateIndexJob(IHyracksClientConnection hcc, CreateIndexStatement stmtCreateIndex,
4288-@@ -768,20 +1233,32 @@
4289- private void handleCreateNodeGroupStatement(AqlMetadataProvider metadataProvider, Statement stmt,
4290- List<JobSpecification> jobsToExecute) throws MetadataException, AlgebricksException, RemoteException,
4291- ACIDException {
4292-- MetadataTransactionContext mdTxnCtx = metadataProvider.getMetadataTxnContext();
4293-- NodegroupDecl stmtCreateNodegroup = (NodegroupDecl) stmt;
4294-- String ngName = stmtCreateNodegroup.getNodegroupName().getValue();
4295-- NodeGroup ng = MetadataManager.INSTANCE.getNodegroup(mdTxnCtx, ngName);
4296-- if (ng != null) {
4297-- if (!stmtCreateNodegroup.getIfNotExists())
4298-- throw new AlgebricksException("A nodegroup with this name " + ngName + " already exists.");
4299-- } else {
4300-- List<Identifier> ncIdentifiers = stmtCreateNodegroup.getNodeControllerNames();
4301-- List<String> ncNames = new ArrayList<String>(ncIdentifiers.size());
4302-- for (Identifier id : ncIdentifiers) {
4303-- ncNames.add(id.getValue());
4304-+
4305-+ MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
4306-+ metadataProvider.setMetadataTxnContext(mdTxnCtx);
4307-+ acquireWriteLatch();
4308-+
4309-+ try {
4310-+ NodegroupDecl stmtCreateNodegroup = (NodegroupDecl) stmt;
4311-+ String ngName = stmtCreateNodegroup.getNodegroupName().getValue();
4312-+ NodeGroup ng = MetadataManager.INSTANCE.getNodegroup(mdTxnCtx, ngName);
4313-+ if (ng != null) {
4314-+ if (!stmtCreateNodegroup.getIfNotExists())
4315-+ throw new AlgebricksException("A nodegroup with this name " + ngName + " already exists.");
4316-+ } else {
4317-+ List<Identifier> ncIdentifiers = stmtCreateNodegroup.getNodeControllerNames();
4318-+ List<String> ncNames = new ArrayList<String>(ncIdentifiers.size());
4319-+ for (Identifier id : ncIdentifiers) {
4320-+ ncNames.add(id.getValue());
4321-+ }
4322-+ MetadataManager.INSTANCE.addNodegroup(mdTxnCtx, new NodeGroup(ngName, ncNames));
4323+ case LogType.COMMIT:
4324++ case LogType.ENTITY_COMMIT:
4325+ //do nothing
4326+ break;
4327+
4328+@@ -308,6 +313,11 @@
4329 }
4330-- MetadataManager.INSTANCE.addNodegroup(mdTxnCtx, new NodeGroup(ngName, ncNames));
4331-+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
4332-+ } catch (Exception e) {
4333-+ MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
4334-+ throw new AlgebricksException(e);
4335-+ } finally {
4336-+ releaseWriteLatch();
4337 }
4338- }
4339-
4340-@@ -791,10 +1268,37 @@
4341-
4342- private void compileIndexDropStatement(IHyracksClientConnection hcc, String dataverseName, String datasetName,
4343- String indexName, AqlMetadataProvider metadataProvider) throws Exception {
4344-+ MetadataTransactionContext mdTxnCtx = metadataProvider.getMetadataTxnContext();
4345-+ Index index = MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataverseName, datasetName, indexName);
4346-+
4347-+ //#. mark PendingDropOp on the existing index
4348-+ MetadataManager.INSTANCE.dropIndex(mdTxnCtx, dataverseName, datasetName, indexName);
4349-+ MetadataManager.INSTANCE.addIndex(
4350-+ mdTxnCtx,
4351-+ new Index(dataverseName, datasetName, indexName, index.getIndexType(), index.getKeyFieldNames(), index
4352-+ .isPrimaryIndex(), IMetadataEntity.PENDING_DROP_OP));
4353- CompiledIndexDropStatement cds = new CompiledIndexDropStatement(dataverseName, datasetName, indexName);
4354-- runJob(hcc, IndexOperations.buildDropSecondaryIndexJobSpec(cds, metadataProvider));
4355-- MetadataManager.INSTANCE.dropIndex(metadataProvider.getMetadataTxnContext(), dataverseName, datasetName,
4356-- indexName);
4357-+ JobSpecification jobSpec = IndexOperations.buildDropSecondaryIndexJobSpec(cds, metadataProvider);
4358-+
4359-+ //#. commit the existing transaction before calling runJob.
4360-+ // the caller should begin the transaction before calling this function.
4361-+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
4362-+
4363-+ try {
4364-+ runJob(hcc, jobSpec);
4365-+ } catch (Exception e) {
4366-+ //need to create the mdTxnCtx to be aborted by caller properly
4367-+ mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
4368-+ metadataProvider.setMetadataTxnContext(mdTxnCtx);
4369-+ throw e;
4370+
4371++ //close all indexes
4372++ for (long r : resourceIdList) {
4373++ indexLifecycleManager.close(r);
4374 + }
4375-+
4376-+ //#. begin a new transaction
4377-+ mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
4378-+ metadataProvider.setMetadataTxnContext(mdTxnCtx);
4379-+
4380-+ //#. finally, delete the existing index
4381-+ MetadataManager.INSTANCE.dropIndex(mdTxnCtx, dataverseName, datasetName, indexName);
4382++
4383+ JobIdFactory.initJobId(maxJobId);
4384 }
4385
4386- private void compileDatasetDropStatement(IHyracksClientConnection hcc, String dataverseName, String datasetName,
4387-@@ -803,10 +1307,32 @@
4388- CompiledDatasetDropStatement cds = new CompiledDatasetDropStatement(dataverseName, datasetName);
4389- Dataset ds = MetadataManager.INSTANCE.getDataset(mdTxnCtx, dataverseName, datasetName);
4390- if (ds.getDatasetType() == DatasetType.INTERNAL || ds.getDatasetType() == DatasetType.FEED) {
4391-- JobSpecification[] jobSpecs = DatasetOperations.createDropDatasetJobSpec(cds, metadataProvider);
4392-- for (JobSpecification spec : jobSpecs)
4393-- runJob(hcc, spec);
4394-+ JobSpecification jobSpec = DatasetOperations.createDropDatasetJobSpec(cds, metadataProvider);
4395-+
4396-+ //#. mark PendingDropOp on the existing dataset
4397-+ MetadataManager.INSTANCE.dropDataset(mdTxnCtx, dataverseName, datasetName);
4398-+ MetadataManager.INSTANCE.addDataset(mdTxnCtx, new Dataset(dataverseName, datasetName, ds.getItemTypeName(),
4399-+ ds.getDatasetDetails(), ds.getDatasetType(), ds.getDatasetId(), IMetadataEntity.PENDING_DROP_OP));
4400-+
4401-+ //#. commit the transaction
4402-+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
4403-+
4404-+ //#. run the job
4405-+ try {
4406-+ runJob(hcc, jobSpec);
4407-+ } catch (Exception e) {
4408-+ //need to create the mdTxnCtx to be aborted by caller properly
4409-+ mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
4410-+ metadataProvider.setMetadataTxnContext(mdTxnCtx);
4411-+ throw e;
4412+@@ -539,6 +549,7 @@
4413+ break;
4414+
4415+ case LogType.COMMIT:
4416++ case LogType.ENTITY_COMMIT:
4417+ undoLSNSet = loserTxnTable.get(tempKeyTxnId);
4418+ if (undoLSNSet != null) {
4419+ loserTxnTable.remove(tempKeyTxnId);
4420+Index: asterix-app/src/test/java/edu/uci/ics/asterix/test/metadata/MetadataTest.java
4421+===================================================================
4422+--- asterix-app/src/test/java/edu/uci/ics/asterix/test/metadata/MetadataTest.java (revision 1194)
4423++++ asterix-app/src/test/java/edu/uci/ics/asterix/test/metadata/MetadataTest.java (working copy)
4424+@@ -42,6 +42,16 @@
4425+ List<CompilationUnit> cUnits = tcCtx.getTestCase().getCompilationUnit();
4426+ for (CompilationUnit cUnit : cUnits) {
4427+ File testFile = tcCtx.getTestFile(cUnit);
4428++
4429++ /*****************
4430++ if (!testFile.getAbsolutePath().contains("meta09.aql")) {
4431++ System.out.println(testFile.getAbsolutePath());
4432++ continue;
4433 + }
4434-+
4435-+ //#. start a new transaction
4436-+ mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
4437-+ metadataProvider.setMetadataTxnContext(mdTxnCtx);
4438- }
4439-+
4440-+ //#. finally, delete the existing dataset.
4441- MetadataManager.INSTANCE.dropDataset(mdTxnCtx, dataverseName, datasetName);
4442- }
4443++ System.out.println(testFile.getAbsolutePath());
4444++ *****************/
4445++
4446++
4447+ File expectedResultFile = tcCtx.getExpectedResultFile(cUnit);
4448+ File actualFile = new File(PATH_ACTUAL + File.separator
4449+ + tcCtx.getTestCase().getFilePath().replace(File.separator, "_") + "_" + cUnit.getName() + ".adm");
4450+Index: asterix-app/src/test/java/edu/uci/ics/asterix/test/runtime/ExecutionTest.java
4451+===================================================================
4452+--- asterix-app/src/test/java/edu/uci/ics/asterix/test/runtime/ExecutionTest.java (revision 1194)
4453++++ asterix-app/src/test/java/edu/uci/ics/asterix/test/runtime/ExecutionTest.java (working copy)
4454+@@ -95,9 +95,10 @@
4455+ File testFile = tcCtx.getTestFile(cUnit);
4456
4457-@@ -831,4 +1357,20 @@
4458- }
4459- return format;
4460- }
4461-+
4462-+ private void acquireWriteLatch() {
4463-+ cacheLatch.writeLock().lock();
4464-+ }
4465-+
4466-+ private void releaseWriteLatch() {
4467-+ cacheLatch.writeLock().unlock();
4468-+ }
4469-+
4470-+ private void acquireReadLatch() {
4471-+ cacheLatch.readLock().lock();
4472-+ }
4473-+
4474-+ private void releaseReadLatch() {
4475-+ cacheLatch.readLock().unlock();
4476-+ }
4477- }
4478+ /*************** to avoid run failure cases ****************
4479+- if (!testFile.getAbsolutePath().contains("index-selection/")) {
4480++ if (!testFile.getAbsolutePath().contains("query-issue205.aql")) {
4481+ continue;
4482+ }
4483++ System.out.println(testFile.getAbsolutePath());
4484+ ************************************************************/
4485+
4486+ File expectedResultFile = tcCtx.getExpectedResultFile(cUnit);