Merge branch 'master' into raman/master_issue376
diff --git a/asterix-app/src/test/resources/optimizerts/results/scan-delete-rtree-secondary-index.plan b/asterix-app/src/test/resources/optimizerts/results/scan-delete-rtree-secondary-index.plan
index bf8978b..4f0d931 100644
--- a/asterix-app/src/test/resources/optimizerts/results/scan-delete-rtree-secondary-index.plan
+++ b/asterix-app/src/test/resources/optimizerts/results/scan-delete-rtree-secondary-index.plan
@@ -3,23 +3,21 @@
-- ONE_TO_ONE_EXCHANGE |PARTITIONED|
-- INDEX_INSERT_DELETE |PARTITIONED|
-- ONE_TO_ONE_EXCHANGE |PARTITIONED|
- -- STABLE_SORT [$$29(ASC), $$30(ASC), $$31(ASC), $$32(ASC)] |PARTITIONED|
- -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- STREAM_PROJECT |PARTITIONED|
+ -- ASSIGN |PARTITIONED|
-- STREAM_PROJECT |PARTITIONED|
-- ASSIGN |PARTITIONED|
- -- STREAM_PROJECT |PARTITIONED|
- -- ASSIGN |PARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- INSERT_DELETE |PARTITIONED|
-- ONE_TO_ONE_EXCHANGE |PARTITIONED|
- -- INSERT_DELETE |PARTITIONED|
- -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
- -- STABLE_SORT [$$14(ASC)] |PARTITIONED|
- -- HASH_PARTITION_EXCHANGE [$$14] |PARTITIONED|
+ -- STABLE_SORT [$$14(ASC)] |PARTITIONED|
+ -- HASH_PARTITION_EXCHANGE [$$14] |PARTITIONED|
+ -- ASSIGN |PARTITIONED|
+ -- STREAM_PROJECT |PARTITIONED|
-- ASSIGN |PARTITIONED|
-- STREAM_PROJECT |PARTITIONED|
- -- ASSIGN |PARTITIONED|
- -- STREAM_PROJECT |PARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- BTREE_SEARCH |PARTITIONED|
-- ONE_TO_ONE_EXCHANGE |PARTITIONED|
- -- BTREE_SEARCH |PARTITIONED|
- -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
- -- ASSIGN |PARTITIONED|
- -- EMPTY_TUPLE_SOURCE |PARTITIONED|
\ No newline at end of file
+ -- ASSIGN |PARTITIONED|
+ -- EMPTY_TUPLE_SOURCE |PARTITIONED|
\ No newline at end of file
diff --git a/asterix-app/src/test/resources/optimizerts/results/scan-insert-secondary-index.plan b/asterix-app/src/test/resources/optimizerts/results/scan-insert-secondary-index.plan
index e6cf237..1458c66 100644
--- a/asterix-app/src/test/resources/optimizerts/results/scan-insert-secondary-index.plan
+++ b/asterix-app/src/test/resources/optimizerts/results/scan-insert-secondary-index.plan
@@ -3,28 +3,24 @@
-- ONE_TO_ONE_EXCHANGE |PARTITIONED|
-- INDEX_INSERT_DELETE |PARTITIONED|
-- ONE_TO_ONE_EXCHANGE |PARTITIONED|
- -- STABLE_SORT [$$11(ASC)] |PARTITIONED|
- -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- STREAM_PROJECT |PARTITIONED|
+ -- ASSIGN |PARTITIONED|
-- STREAM_PROJECT |PARTITIONED|
- -- ASSIGN |PARTITIONED|
- -- STREAM_PROJECT |PARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- INDEX_INSERT_DELETE |PARTITIONED|
-- ONE_TO_ONE_EXCHANGE |PARTITIONED|
- -- INDEX_INSERT_DELETE |PARTITIONED|
+ -- ASSIGN |PARTITIONED|
-- ONE_TO_ONE_EXCHANGE |PARTITIONED|
- -- STABLE_SORT [$$10(ASC)] |PARTITIONED|
+ -- INSERT_DELETE |PARTITIONED|
-- ONE_TO_ONE_EXCHANGE |PARTITIONED|
- -- ASSIGN |PARTITIONED|
- -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
- -- INSERT_DELETE |PARTITIONED|
- -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
- -- STABLE_SORT [$$6(ASC)] |PARTITIONED|
- -- HASH_PARTITION_EXCHANGE [$$6] |PARTITIONED|
+ -- STABLE_SORT [$$6(ASC)] |PARTITIONED|
+ -- HASH_PARTITION_EXCHANGE [$$6] |PARTITIONED|
+ -- ASSIGN |PARTITIONED|
+ -- STREAM_PROJECT |PARTITIONED|
+ -- ASSIGN |PARTITIONED|
+ -- STREAM_PROJECT |PARTITIONED|
-- ASSIGN |PARTITIONED|
- -- STREAM_PROJECT |PARTITIONED|
- -- ASSIGN |PARTITIONED|
- -- STREAM_PROJECT |PARTITIONED|
- -- ASSIGN |PARTITIONED|
- -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
- -- DATASOURCE_SCAN |PARTITIONED|
- -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
- -- EMPTY_TUPLE_SOURCE |PARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- DATASOURCE_SCAN |PARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- EMPTY_TUPLE_SOURCE |PARTITIONED|
diff --git a/asterix-installer/src/main/java/edu/uci/ics/asterix/installer/events/PatternCreator.java b/asterix-installer/src/main/java/edu/uci/ics/asterix/installer/events/PatternCreator.java
index 4216978..0f382f9 100644
--- a/asterix-installer/src/main/java/edu/uci/ics/asterix/installer/events/PatternCreator.java
+++ b/asterix-installer/src/main/java/edu/uci/ics/asterix/installer/events/PatternCreator.java
@@ -315,14 +315,11 @@
List<Pattern> patternList = new ArrayList<Pattern>();
Cluster cluster = instance.getCluster();
Nodeid nodeid = null;
- String pargs = null;
Event event = null;
for (Node node : cluster.getNode()) {
- String iodevices = node.getIodevices() == null ? cluster.getIodevices() : node.getIodevices();
- String primaryIODevice = iodevices.split(",")[0].trim();
- pargs = primaryIODevice + File.separator + InstallerUtil.TXN_LOG_DIR;
+ String txnLogDir = node.getTxnLogDir() == null ? cluster.getTxnLogDir() : node.getTxnLogDir();
nodeid = new Nodeid(new Value(null, node.getId()));
- event = new Event("file_delete", nodeid, pargs);
+ event = new Event("file_delete", nodeid, txnLogDir);
patternList.add(new Pattern(null, 1, null, event));
}
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/scalar/AbstractScalarAggregateDescriptor.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/scalar/AbstractScalarAggregateDescriptor.java
index 3de05f8..232eace 100644
--- a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/scalar/AbstractScalarAggregateDescriptor.java
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/aggregates/scalar/AbstractScalarAggregateDescriptor.java
@@ -20,21 +20,23 @@
@Override
public ICopyEvaluatorFactory createEvaluatorFactory(final ICopyEvaluatorFactory[] args) throws AlgebricksException {
+
+ // The aggregate function will get a SingleFieldFrameTupleReference that points to the result of the ScanCollection.
+ // The list-item will always reside in the first field (column) of the SingleFieldFrameTupleReference.
+ ICopyEvaluatorFactory[] aggFuncArgs = new ICopyEvaluatorFactory[1];
+ aggFuncArgs[0] = new ColumnAccessEvalFactory(0);
+ // Create aggregate function from this scalar version.
+ FunctionIdentifier fid = AsterixBuiltinFunctions.getAggregateFunction(getIdentifier());
+ IFunctionManager mgr = FunctionManagerHolder.getFunctionManager();
+ IFunctionDescriptor fd = mgr.lookupFunction(fid);
+ AbstractAggregateFunctionDynamicDescriptor aggFuncDesc = (AbstractAggregateFunctionDynamicDescriptor) fd;
+ final ICopyAggregateFunctionFactory aggFuncFactory = aggFuncDesc.createAggregateFunctionFactory(aggFuncArgs);
+
return new ICopyEvaluatorFactory() {
private static final long serialVersionUID = 1L;
@Override
public ICopyEvaluator createEvaluator(IDataOutputProvider output) throws AlgebricksException {
- // The aggregate function will get a SingleFieldFrameTupleReference that points to the result of the ScanCollection.
- // The list-item will always reside in the first field (column) of the SingleFieldFrameTupleReference.
- ICopyEvaluatorFactory[] aggFuncArgs = new ICopyEvaluatorFactory[1];
- aggFuncArgs[0] = new ColumnAccessEvalFactory(0);
- // Create aggregate function from this scalar version.
- FunctionIdentifier fid = AsterixBuiltinFunctions.getAggregateFunction(getIdentifier());
- IFunctionManager mgr = FunctionManagerHolder.getFunctionManager();
- IFunctionDescriptor fd = mgr.lookupFunction(fid);
- AbstractAggregateFunctionDynamicDescriptor aggFuncDesc = (AbstractAggregateFunctionDynamicDescriptor) fd;
- ICopyAggregateFunctionFactory aggFuncFactory = aggFuncDesc.createAggregateFunctionFactory(aggFuncArgs);
// Use ScanCollection to iterate over list items.
ScanCollectionUnnestingFunctionFactory scanCollectionFactory = new ScanCollectionUnnestingFunctionFactory(
args[0]);
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/LockWaiterManager.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/LockWaiterManager.java
index dbe76ff..bd414de 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/LockWaiterManager.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/LockWaiterManager.java
@@ -223,45 +223,35 @@
*/
private void shrink() {
int i;
- boolean bContiguous = true;
- int decreaseCount = 0;
+ int removeCount = 0;
int size = pArray.size();
int maxDecreaseCount = size / 2;
ChildLockWaiterArrayManager child;
- for (i = size - 1; i >= 0; i--) {
- child = pArray.get(i);
- if (child.isEmpty() || child.isDeinitialized()) {
- if (bContiguous) {
- pArray.remove(i);
- if (++decreaseCount == maxDecreaseCount) {
- break;
- }
- } else {
- bContiguous = false;
- if (child.isEmpty()) {
- child.deinitialize();
- if (++decreaseCount == maxDecreaseCount) {
- break;
- }
- }
- }
- } else {
- bContiguous = false;
+
+ //The first buffer never be deinitialized.
+ for (i = 1; i < size; i++) {
+ if (pArray.get(i).isEmpty()) {
+ pArray.get(i).deinitialize();
}
}
- //reset allocChild when the child is removed or deinitialized.
- size = pArray.size();
- if (allocChild >= size || pArray.get(allocChild).isDeinitialized()) {
- //set allocChild to any initialized one.
- //It is guaranteed that there is at least one initialized child.
- for (i = 0; i < size; i++) {
- if (!pArray.get(i).isDeinitialized()) {
- allocChild = i;
+ //remove the empty buffers from the end
+ for (i = size - 1; i >= 1; i--) {
+ child = pArray.get(i);
+ if (child.isDeinitialized()) {
+ pArray.remove(i);
+ if (++removeCount == maxDecreaseCount) {
break;
}
+ } else {
+ break;
}
}
+
+ //reset allocChild to the first buffer
+ allocChild = 0;
+
+ isShrinkTimerOn = false;
}
public String prettyPrint() {
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogManager.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogManager.java
index 199fd0f..38d39cc 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogManager.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogManager.java
@@ -259,9 +259,16 @@
}
if (forwardPage) {
-
+ logPages[prevPage].acquireReadLatch();
+ // increment the counter as the transaction thread now holds a
+ // space in the log page and hence is an owner.
+ logPages[prevPage].incRefCnt();
+ logPages[prevPage].releaseReadLatch();
+
// forward the nextWriteOffset in the log page
- logPages[pageIndex].setBufferNextWriteOffset(logPageSize);
+ logPages[prevPage].setBufferNextWriteOffset(logPageSize);
+
+ logPages[prevPage].decRefCnt();
addFlushRequest(prevPage, old, false);
@@ -433,7 +440,7 @@
// indicating that the transaction thread has released ownership
decremented = true;
-
+
addFlushRequest(pageIndex, currentLSN, false);
} else if (logType == LogType.COMMIT) {
@@ -872,16 +879,16 @@
if (logManager.getLastFlushedLsn().get() + 1 > logManager.getCurrentLsn().get()) {
logManager.getCurrentLsn().set(logManager.getLastFlushedLsn().get() + 1);
}
-
+
// Map the log page to a new region in the log file if the flushOffset reached the logPageSize
if (afterFlushOffset == logPageSize) {
- long diskNextWriteOffset = logManager.getLogPages()[flushPageIndex].getDiskNextWriteOffset()
- + logBufferSize;
+ long diskNextWriteOffset = logManager.getLogPages()[flushPageIndex]
+ .getDiskNextWriteOffset() + logBufferSize;
logManager.resetLogPage(logManager.getLastFlushedLsn().get() + 1 + logBufferSize,
diskNextWriteOffset, flushPageIndex);
resetFlushPageIndex = true;
}
-
+
// decrement activeTxnCountOnIndexes
logManager.decrementActiveTxnCountOnIndexes(flushPageIndex);