Proper handling of waiting for ongoing io ops before closing an a dataset. Also added config para.
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/config/AsterixStorageProperties.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/config/AsterixStorageProperties.java
index 85f90e4..d7ff15d 100644
--- a/asterix-common/src/main/java/edu/uci/ics/asterix/common/config/AsterixStorageProperties.java
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/config/AsterixStorageProperties.java
@@ -31,6 +31,9 @@
private static final String STORAGE_MEMORYCOMPONENT_NUMPAGES_KEY = "storage.memorycomponent.numpages";
private static final int STORAGE_MEMORYCOMPONENT_NUMPAGES_DEFAULT = 1024; // ... so 32MB components
+ private static final String STORAGE_MEMORYCOMPONENT_NUMCOMPONENTS_KEY = "storage.memorycomponent.numcomponents";
+ private static final int STORAGE_MEMORYCOMPONENT_NUMCOMPONENTS_DEFAULT = 2; // 2 components
+
private static final String STORAGE_MEMORYCOMPONENT_GLOBALBUDGET_KEY = "storage.memorycomponent.globalbudget";
private static final long STORAGE_MEMORYCOMPONENT_GLOBALBUDGET_DEFAULT = 536870912; // 512MB
@@ -53,7 +56,7 @@
return accessor.getProperty(STORAGE_BUFFERCACHE_SIZE_KEY, STORAGE_BUFFERCACHE_SIZE_DEFAULT,
PropertyInterpreters.getLongPropertyInterpreter());
}
-
+
public int getBufferCacheNumPages() {
return (int) (getBufferCacheSize() / getBufferCachePageSize());
}
@@ -73,6 +76,11 @@
PropertyInterpreters.getIntegerPropertyInterpreter());
}
+ public int getMemoryComponentsNum() {
+ return accessor.getProperty(STORAGE_MEMORYCOMPONENT_NUMCOMPONENTS_KEY,
+ STORAGE_MEMORYCOMPONENT_NUMCOMPONENTS_DEFAULT, PropertyInterpreters.getIntegerPropertyInterpreter());
+ }
+
public long getMemoryComponentGlobalBudget() {
return accessor.getProperty(STORAGE_MEMORYCOMPONENT_GLOBALBUDGET_KEY,
STORAGE_MEMORYCOMPONENT_GLOBALBUDGET_DEFAULT, PropertyInterpreters.getLongPropertyInterpreter());
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/context/DatasetLifecycleManager.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/context/DatasetLifecycleManager.java
index ded7237..eea3d99 100644
--- a/asterix-common/src/main/java/edu/uci/ics/asterix/common/context/DatasetLifecycleManager.java
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/context/DatasetLifecycleManager.java
@@ -31,7 +31,9 @@
import edu.uci.ics.hyracks.api.lifecycle.ILifeCycleComponent;
import edu.uci.ics.hyracks.storage.am.common.api.IIndex;
import edu.uci.ics.hyracks.storage.am.common.api.IIndexLifecycleManager;
+import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallback;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndex;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMOperationTracker;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.IVirtualBufferCache;
import edu.uci.ics.hyracks.storage.am.lsm.common.impls.MultitenantVirtualBufferCache;
@@ -48,7 +50,6 @@
private final ILocalResourceRepository resourceRepository;
private final long capacity;
private long used;
- private final static int NUM_MUTABLE_BUFFERS = 2;
public DatasetLifecycleManager(AsterixStorageProperties storageProperties,
ILocalResourceRepository resourceRepository) {
@@ -96,48 +97,61 @@
}
@Override
- public void unregister(long resourceID) throws HyracksDataException {
- int did;
- PrimaryIndexOperationTracker opTracker;
- DatasetInfo dsInfo;
- IndexInfo iInfo;
- synchronized (this) {
- did = getDIDfromRID(resourceID);
- dsInfo = datasetInfos.get(did);
- iInfo = dsInfo.indexes.get(resourceID);
+ public synchronized void unregister(long resourceID) throws HyracksDataException {
+ int did = getDIDfromRID(resourceID);
+ DatasetInfo dsInfo = datasetInfos.get(did);
+ IndexInfo iInfo = dsInfo.indexes.get(resourceID);
- if (dsInfo == null || iInfo == null) {
- throw new HyracksDataException("Index with resource ID " + resourceID + " does not exist.");
- }
+ if (dsInfo == null || iInfo == null) {
+ throw new HyracksDataException("Index with resource ID " + resourceID + " does not exist.");
+ }
- opTracker = (PrimaryIndexOperationTracker) datasetOpTrackers.get(dsInfo.datasetID);
- if (iInfo.referenceCount != 0 || (opTracker != null && opTracker.getNumActiveOperations() != 0)) {
- throw new HyracksDataException("Cannot remove index while it is open.");
- }
+ PrimaryIndexOperationTracker opTracker = (PrimaryIndexOperationTracker) datasetOpTrackers.get(dsInfo.datasetID);
+ if (iInfo.referenceCount != 0 || (opTracker != null && opTracker.getNumActiveOperations() != 0)) {
+ throw new HyracksDataException("Cannot remove index while it is open.");
+ }
- while (dsInfo.numActiveIOOps > 0) {
- try {
- wait();
- } catch (InterruptedException e) {
- throw new HyracksDataException(e);
- }
+ // TODO: use fine-grained counters, one for each index instead of a single counter per dataset.
+
+ // First wait for any ongoing IO operations
+ while (dsInfo.numActiveIOOps > 0) {
+ try {
+ wait();
+ } catch (InterruptedException e) {
+ throw new HyracksDataException(e);
}
}
+
if (iInfo.isOpen) {
- iInfo.index.deactivate(true);
+ ILSMIndexAccessor accessor = (ILSMIndexAccessor) iInfo.index.createAccessor(NoOpOperationCallback.INSTANCE,
+ NoOpOperationCallback.INSTANCE);
+ accessor.scheduleFlush(((BaseOperationTracker) iInfo.index.getOperationTracker()).getIOOperationCallback());
}
- synchronized (this) {
- dsInfo.indexes.remove(resourceID);
- if (dsInfo.referenceCount == 0 && dsInfo.isOpen && dsInfo.indexes.isEmpty()) {
- List<IVirtualBufferCache> vbcs = getVirtualBufferCaches(did);
- assert vbcs != null;
- for (IVirtualBufferCache vbc : vbcs) {
- used -= (vbc.getNumPages() * vbc.getPageSize());
- }
- datasetInfos.remove(did);
+ // Then wait for above flush op.
+ // They are separated so they don't deadlock each other.
+ while (dsInfo.numActiveIOOps > 0) {
+ try {
+ wait();
+ } catch (InterruptedException e) {
+ throw new HyracksDataException(e);
}
}
+
+ if (iInfo.isOpen) {
+ iInfo.index.deactivate(false);
+ }
+
+ dsInfo.indexes.remove(resourceID);
+ if (dsInfo.referenceCount == 0 && dsInfo.isOpen && dsInfo.indexes.isEmpty()) {
+ List<IVirtualBufferCache> vbcs = getVirtualBufferCaches(did);
+ assert vbcs != null;
+ for (IVirtualBufferCache vbc : vbcs) {
+ used -= (vbc.getNumPages() * vbc.getPageSize());
+ }
+ datasetInfos.remove(did);
+ }
+
}
public synchronized void declareActiveIOOperation(int datasetID) throws HyracksDataException {
@@ -208,6 +222,25 @@
.get(dsInfo.datasetID);
if (opTracker != null && opTracker.getNumActiveOperations() == 0 && dsInfo.referenceCount == 0
&& dsInfo.isOpen) {
+
+ // First wait for any ongoing IO operations
+ while (dsInfo.numActiveIOOps > 0) {
+ try {
+ wait();
+ } catch (InterruptedException e) {
+ throw new HyracksDataException(e);
+ }
+ }
+
+ for (IndexInfo iInfo : dsInfo.indexes.values()) {
+ if (iInfo.isOpen) {
+ ILSMIndexAccessor accessor = (ILSMIndexAccessor) iInfo.index.createAccessor(
+ NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE);
+ accessor.scheduleFlush(((BaseOperationTracker) iInfo.index.getOperationTracker())
+ .getIOOperationCallback());
+ }
+ }
+ // Wait for the above flush ops.
while (dsInfo.numActiveIOOps > 0) {
try {
wait();
@@ -217,20 +250,22 @@
}
for (IndexInfo iInfo : dsInfo.indexes.values()) {
if (iInfo.isOpen) {
- iInfo.index.deactivate(true);
+ iInfo.index.deactivate(false);
iInfo.isOpen = false;
}
assert iInfo.referenceCount == 0;
}
+ dsInfo.isOpen = false;
List<IVirtualBufferCache> vbcs = getVirtualBufferCaches(dsInfo.datasetID);
for (IVirtualBufferCache vbc : vbcs) {
used -= vbc.getNumPages() * vbc.getPageSize();
}
- dsInfo.isOpen = false;
return true;
+
}
}
+
return false;
}
@@ -267,10 +302,12 @@
List<IVirtualBufferCache> vbcs = datasetVirtualBufferCaches.get(datasetID);
if (vbcs == null) {
vbcs = new ArrayList<IVirtualBufferCache>();
- for (int i = 0; i < NUM_MUTABLE_BUFFERS; i++) {
- MultitenantVirtualBufferCache vbc = new MultitenantVirtualBufferCache(new VirtualBufferCache(
- new HeapBufferAllocator(), storageProperties.getMemoryComponentPageSize(),
- datasetID < 100? storageProperties.getMemoryComponentNumPages(): 4 / NUM_MUTABLE_BUFFERS));
+ for (int i = 0; i < storageProperties.getMemoryComponentsNum(); i++) {
+ MultitenantVirtualBufferCache vbc = new MultitenantVirtualBufferCache(
+ new VirtualBufferCache(new HeapBufferAllocator(),
+ storageProperties.getMemoryComponentPageSize(),
+ storageProperties.getMemoryComponentNumPages()
+ / storageProperties.getMemoryComponentsNum()));
vbcs.add(vbc);
}
datasetVirtualBufferCaches.put(datasetID, vbcs);
diff --git a/asterix-common/src/test/java/edu/uci/ics/asterix/test/aql/TestsUtils.java b/asterix-common/src/test/java/edu/uci/ics/asterix/test/aql/TestsUtils.java
index ca9b232..861cce5 100644
--- a/asterix-common/src/test/java/edu/uci/ics/asterix/test/aql/TestsUtils.java
+++ b/asterix-common/src/test/java/edu/uci/ics/asterix/test/aql/TestsUtils.java
@@ -325,7 +325,8 @@
List<CompilationUnit> cUnits = testCaseCtx.getTestCase().getCompilationUnit();
for (CompilationUnit cUnit : cUnits) {
- System.out.println(cUnit.getName());
+ LOGGER.info("Starting [TEST]: " + testCaseCtx.getTestCase().getFilePath() + "/"
+ + cUnit.getName() + " ... ");
testFileCtxs = testCaseCtx.getTestFiles(cUnit);
expectedResultFileCtxs = testCaseCtx.getExpectedResultFiles(cUnit);
diff --git a/asterix-installer/src/main/resources/conf/asterix-configuration.xml b/asterix-installer/src/main/resources/conf/asterix-configuration.xml
index 6b3133c..deeb5b0 100644
--- a/asterix-installer/src/main/resources/conf/asterix-configuration.xml
+++ b/asterix-installer/src/main/resources/conf/asterix-configuration.xml
@@ -68,6 +68,14 @@
</description>
</property>
+ <property>
+ <name>storage.memorycomponent.numcomponents</name>
+ <value>2</value>
+ <description>The number of memory components to be used per lsm index.
+ (Default = 2)
+ </description>
+ </property>
+
<property>
<name>storage.memorycomponent.globalbudget</name>
<value>536870192</value>