[ASTERIXDB-2081][STO] Introduce DatasetMemoryManager
- user model changes: no
- storage format changes: no
- interface changes: yes
Added IDatasetMemoryManager to manage datasets memory
reservation and allocation.
Details:
- Reserve metadata datasets memory to allow them to be opened
when needed.
- Add UngracefulShutdownNCApplication to force recovery
to run on AsterixHyracksIntegrationUtil.
- Refactor the use of firstAvilableUserDatasetID to check
for metadata datasets.
- Add ThreadSafe annotation.
- Add test case for RecoveryManager after creating multiple
datasets.
Change-Id: Ica76b3c8eca6f7d2ad1d962fb5ef84267c258571
Reviewed-on: https://asterix-gerrit.ics.uci.edu/2112
Sonar-Qube: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Contrib: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Michael Blow <mblow@apache.org>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java
index ecf25eb..50c3ff6 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java
@@ -63,6 +63,7 @@
public ClusterControllerService cc;
public NodeControllerService[] ncs = new NodeControllerService[0];
public IHyracksClientConnection hcc;
+ protected boolean gracefulShutdown = true;
private static final String DEFAULT_STORAGE_PATH = joinPath("target", "io", "dir");
private static String storagePath = DEFAULT_STORAGE_PATH;
@@ -158,6 +159,9 @@
}
protected INCApplication createNCApplication() {
+ if (!gracefulShutdown) {
+ return new UngracefulShutdownNCApplication();
+ }
return new NCApplication();
}
@@ -227,6 +231,10 @@
storagePath = path;
}
+ public void setGracefulShutdown(boolean gracefulShutdown) {
+ this.gracefulShutdown = gracefulShutdown;
+ }
+
public static void restoreDefaultStoragePath() {
storagePath = DEFAULT_STORAGE_PATH;
}
@@ -288,4 +296,11 @@
Thread.sleep(10000);
}
}
+
+ private class UngracefulShutdownNCApplication extends NCApplication {
+ @Override
+ public void stop() throws Exception {
+ // ungraceful shutdown
+ }
+ }
}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
index 5370c03..7b08f68 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
@@ -32,6 +32,7 @@
import org.apache.asterix.active.ActiveManager;
import org.apache.asterix.api.common.AppRuntimeContextProviderForRecovery;
import org.apache.asterix.common.api.IDatasetLifecycleManager;
+import org.apache.asterix.common.api.IDatasetMemoryManager;
import org.apache.asterix.common.api.INcApplicationContext;
import org.apache.asterix.common.cluster.ClusterPartition;
import org.apache.asterix.common.config.ActiveProperties;
@@ -48,11 +49,11 @@
import org.apache.asterix.common.config.StorageProperties;
import org.apache.asterix.common.config.TransactionProperties;
import org.apache.asterix.common.context.DatasetLifecycleManager;
+import org.apache.asterix.common.context.DatasetMemoryManager;
import org.apache.asterix.common.context.IStorageComponentProvider;
import org.apache.asterix.common.exceptions.ACIDException;
import org.apache.asterix.common.exceptions.AsterixException;
import org.apache.asterix.common.library.ILibraryManager;
-import org.apache.asterix.common.metadata.MetadataIndexImmutableProperties;
import org.apache.asterix.common.replication.IRemoteRecoveryManager;
import org.apache.asterix.common.replication.IReplicaResourcesManager;
import org.apache.asterix.common.replication.IReplicationChannel;
@@ -119,6 +120,7 @@
private MessagingProperties messagingProperties;
private final NodeProperties nodeProperties;
private ExecutorService threadExecutor;
+ private IDatasetMemoryManager datasetMemoryManager;
private IDatasetLifecycleManager datasetLifecycleManager;
private IBufferCache bufferCache;
private ITransactionSubsystem txnSubsystem;
@@ -198,9 +200,10 @@
localResourceRepository.deleteStorageData(true);
}
- datasetLifecycleManager = new DatasetLifecycleManager(storageProperties, localResourceRepository,
- MetadataIndexImmutableProperties.FIRST_AVAILABLE_USER_DATASET_ID, txnSubsystem.getLogManager(),
- ioManager.getIODevices().size());
+ datasetMemoryManager = new DatasetMemoryManager(storageProperties);
+ datasetLifecycleManager =
+ new DatasetLifecycleManager(storageProperties, localResourceRepository, txnSubsystem.getLogManager(),
+ datasetMemoryManager, ioManager.getIODevices().size());
isShuttingdown = false;
@@ -316,6 +319,11 @@
}
@Override
+ public IDatasetMemoryManager getDatasetMemoryManager() {
+ return datasetMemoryManager;
+ }
+
+ @Override
public double getBloomFilterFalsePositiveRate() {
return storageProperties.getBloomFilterFalsePositiveRate();
}
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/txn/RecoveryManagerTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/txn/RecoveryManagerTest.java
new file mode 100644
index 0000000..723786c
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/txn/RecoveryManagerTest.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.test.txn;
+
+import java.io.File;
+import java.io.InputStream;
+import java.util.Random;
+
+import org.apache.asterix.api.common.AsterixHyracksIntegrationUtil;
+import org.apache.asterix.common.config.GlobalConfig;
+import org.apache.asterix.common.configuration.AsterixConfiguration;
+import org.apache.asterix.common.configuration.Property;
+import org.apache.asterix.common.utils.Servlets;
+import org.apache.asterix.test.common.TestExecutor;
+import org.apache.asterix.test.common.TestHelper;
+import org.apache.asterix.testframework.context.TestCaseContext;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+
+public class RecoveryManagerTest {
+
+ private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+ private static final String DEFAULT_TEST_CONFIG_FILE_NAME = "asterix-build-configuration.xml";
+ private static final String TEST_CONFIG_FILE_NAME = "asterix-test-configuration.xml";
+ private static final String TEST_CONFIG_PATH =
+ System.getProperty("user.dir") + File.separator + "target" + File.separator + "config";
+ private static final String TEST_CONFIG_FILE_PATH = TEST_CONFIG_PATH + File.separator + TEST_CONFIG_FILE_NAME;
+ private static final TestExecutor testExecutor = new TestExecutor();
+ private static final AsterixHyracksIntegrationUtil integrationUtil = new AsterixHyracksIntegrationUtil();
+ private static final Random random = new Random();
+ private static final int numRecords = 1;
+
+ @Before
+ public void setUp() throws Exception {
+ // Read default test configurations
+ AsterixConfiguration ac = TestHelper.getConfigurations(DEFAULT_TEST_CONFIG_FILE_NAME);
+ // override memory config to enforce dataset eviction
+ ac.getProperty().add(new Property("storage.memorycomponent.globalbudget", "128MB", ""));
+ ac.getProperty().add(new Property("storage.memorycomponent.numpages", "32", ""));
+ // Write test config file
+ TestHelper.writeConfigurations(ac, TEST_CONFIG_FILE_PATH);
+ System.setProperty(GlobalConfig.CONFIG_FILE_PROPERTY, TEST_CONFIG_FILE_PATH);
+ integrationUtil.setGracefulShutdown(false);
+ integrationUtil.init(true);
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ integrationUtil.deinit(true);
+ }
+
+ @Test
+ public void multiDatasetRecovery() throws Exception {
+ String datasetNamePrefix = "ds_";
+ final TestCaseContext.OutputFormat format = TestCaseContext.OutputFormat.CLEAN_JSON;
+ testExecutor.executeSqlppUpdateOrDdl("CREATE TYPE KeyType AS { id: int };", format);
+ int numDatasets = 50;
+ String datasetName = null;
+ for (int i = 1; i <= numDatasets; i++) {
+ datasetName = datasetNamePrefix + i;
+ testExecutor.executeSqlppUpdateOrDdl("CREATE DATASET " + datasetName + "(KeyType) PRIMARY KEY id;", format);
+ insertData(datasetName);
+ }
+ // do ungraceful shutdown to enforce recovery
+ integrationUtil.deinit(false);
+ integrationUtil.init(false);
+ validateRecovery(datasetName);
+
+ // create more datasets after recovery
+ numDatasets = 100;
+ for (int i = 51; i <= numDatasets; i++) {
+ datasetName = datasetNamePrefix + i;
+ testExecutor.executeSqlppUpdateOrDdl("CREATE DATASET " + datasetName + "(KeyType) PRIMARY KEY id;", format);
+ insertData(datasetName);
+ }
+ // do ungraceful shutdown to enforce recovery again
+ integrationUtil.deinit(false);
+ integrationUtil.init(false);
+ validateRecovery(datasetName);
+ }
+
+ private void insertData(String datasetName) throws Exception {
+ for (int i = 0; i < numRecords; i++) {
+ testExecutor.executeSqlppUpdateOrDdl("UPSERT INTO " + datasetName + " ({\"id\": " + random.nextInt() + "})",
+ TestCaseContext.OutputFormat.CLEAN_JSON);
+ }
+ }
+
+ private void validateRecovery(String datasetName) throws Exception {
+ final String query = "select value count(*) from `" + datasetName + "`;";
+ final InputStream inputStream = testExecutor
+ .executeQueryService(query, testExecutor.getEndpoint(Servlets.QUERY_SERVICE),
+ TestCaseContext.OutputFormat.CLEAN_JSON);
+ final ObjectNode jsonNodes = OBJECT_MAPPER.readValue(inputStream, ObjectNode.class);
+ JsonNode result = jsonNodes.get("results");
+ // make sure there is result
+ Assert.assertEquals(1, result.size());
+ for (int i = 0; i < result.size(); i++) {
+ JsonNode json = result.get(i);
+ Assert.assertEquals(numRecords, json.asInt());
+ }
+ }
+}
\ No newline at end of file
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetMemoryManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetMemoryManager.java
new file mode 100644
index 0000000..fde2c80
--- /dev/null
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetMemoryManager.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.api;
+
+public interface IDatasetMemoryManager {
+
+ /**
+ * Allocates memory for dataset {@code datasetId}.
+ *
+ * @param datasetId
+ * @return true, if the allocation is successful, otherwise false.
+ */
+ boolean allocate(int datasetId);
+
+ /**
+ * Deallocates memory of dataset {@code datasetId}.
+ *
+ * @param datasetId
+ */
+ void deallocate(int datasetId);
+
+ /**
+ * Reserves memory for dataset {@code datasetId}. The reserved memory
+ * is guaranteed to be allocatable when needed for the dataset. Reserve
+ * maybe called after allocation to reserve the allocated budget
+ * on deallocation.
+ *
+ * @param datasetId
+ * @return true, if the allocation is successful, otherwise false.
+ */
+ boolean reserve(int datasetId);
+
+ /**
+ * Cancels the reserved memory for dataset {@code datasetId}.
+ *
+ * @param datasetId
+ */
+ void cancelReserved(int datasetId);
+
+ /**
+ * @return The remaining memory budget that can be used for datasets.
+ */
+ long getAvailable();
+
+ /**
+ * @param datasetId
+ * @return The number of virtual buffer cache pages that should be allocated for dataset {@code datasetId}.
+ */
+ int getNumPages(int datasetId);
+}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/INcApplicationContext.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/INcApplicationContext.java
index d4b9a92..548907c 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/INcApplicationContext.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/INcApplicationContext.java
@@ -62,6 +62,8 @@
IDatasetLifecycleManager getDatasetLifecycleManager();
+ IDatasetMemoryManager getDatasetMemoryManager();
+
IResourceIdFactory getResourceIdFactory();
ILSMOperationTracker getLSMBTreeOperationTracker(int datasetID);
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
index 545382a..e79f002 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
@@ -29,6 +29,7 @@
import java.util.logging.Logger;
import org.apache.asterix.common.api.IDatasetLifecycleManager;
+import org.apache.asterix.common.api.IDatasetMemoryManager;
import org.apache.asterix.common.config.StorageProperties;
import org.apache.asterix.common.dataflow.DatasetLocalResource;
import org.apache.asterix.common.exceptions.ACIDException;
@@ -51,27 +52,24 @@
import org.apache.hyracks.storage.common.LocalResource;
public class DatasetLifecycleManager implements IDatasetLifecycleManager, ILifeCycleComponent {
+
private static final Logger LOGGER = Logger.getLogger(DatasetLifecycleManager.class.getName());
private final Map<Integer, DatasetResource> datasets = new ConcurrentHashMap<>();
private final StorageProperties storageProperties;
private final ILocalResourceRepository resourceRepository;
- private final int firstAvilableUserDatasetID;
- private final long capacity;
- private long used;
+ private final IDatasetMemoryManager memoryManager;
private final ILogManager logManager;
private final LogRecord logRecord;
private final int numPartitions;
private volatile boolean stopped = false;
public DatasetLifecycleManager(StorageProperties storageProperties, ILocalResourceRepository resourceRepository,
- int firstAvilableUserDatasetID, ILogManager logManager, int numPartitions) {
+ ILogManager logManager, IDatasetMemoryManager memoryManager, int numPartitions) {
this.logManager = logManager;
this.storageProperties = storageProperties;
this.resourceRepository = resourceRepository;
- this.firstAvilableUserDatasetID = firstAvilableUserDatasetID;
+ this.memoryManager = memoryManager;
this.numPartitions = numPartitions;
- capacity = storageProperties.getMemoryComponentGlobalBudget();
- used = 0;
logRecord = new LogRecord();
}
@@ -200,9 +198,10 @@
for (DatasetResource dsr : datasetsResources) {
PrimaryIndexOperationTracker opTracker = dsr.getOpTracker();
if (opTracker != null && opTracker.getNumActiveOperations() == 0
- && dsr.getDatasetInfo().getReferenceCount() == 0 && dsr.getDatasetInfo().isOpen()
- && dsr.getDatasetInfo().getDatasetID() >= getFirstAvilableUserDatasetID()) {
+ && dsr.getDatasetInfo().getReferenceCount() == 0 && dsr.getDatasetInfo().isOpen() && !dsr
+ .isMetadataDataset()) {
closeDataset(dsr.getDatasetInfo());
+ LOGGER.info(() -> "Evicted Dataset" + dsr.getDatasetID());
return true;
}
}
@@ -230,8 +229,9 @@
if (dsr == null) {
DatasetInfo dsInfo = new DatasetInfo(did);
PrimaryIndexOperationTracker opTracker = new PrimaryIndexOperationTracker(did, logManager, dsInfo);
- DatasetVirtualBufferCaches vbcs = new DatasetVirtualBufferCaches(did, storageProperties,
- getFirstAvilableUserDatasetID(), getNumPartitions());
+ DatasetVirtualBufferCaches vbcs =
+ new DatasetVirtualBufferCaches(did, storageProperties, memoryManager.getNumPages(did),
+ numPartitions);
dsr = new DatasetResource(dsInfo, opTracker, vbcs);
datasets.put(did, dsr);
}
@@ -322,8 +322,8 @@
}
@Override
- public synchronized void start() {
- used = 0;
+ public void start() {
+ // no op
}
@Override
@@ -449,7 +449,7 @@
public synchronized void closeUserDatasets() throws HyracksDataException {
ArrayList<DatasetResource> openDatasets = new ArrayList<>(datasets.values());
for (DatasetResource dsr : openDatasets) {
- if (dsr.getDatasetID() >= getFirstAvilableUserDatasetID()) {
+ if (!dsr.isMetadataDataset()) {
closeDataset(dsr.getDatasetInfo());
}
}
@@ -474,8 +474,8 @@
public void dumpState(OutputStream outputStream) throws IOException {
StringBuilder sb = new StringBuilder();
- sb.append(String.format("Memory budget = %d\n", capacity));
- sb.append(String.format("Memory used = %d\n", used));
+ sb.append(String.format("Memory budget = %d%n", storageProperties.getMemoryComponentGlobalBudget()));
+ sb.append(String.format("Memory available = %d%n", memoryManager.getAvailable()));
sb.append("\n");
String dsHeaderFormat = "%-10s %-6s %-16s %-12s\n";
@@ -515,7 +515,7 @@
}
synchronized (dsInfo) {
if (dsInfo.isOpen() && dsInfo.isMemoryAllocated()) {
- used -= getVirtualBufferCaches(dsInfo.getDatasetID()).getTotalSize();
+ memoryManager.deallocate(datasetId);
dsInfo.setMemoryAllocated(false);
}
}
@@ -534,27 +534,17 @@
synchronized (dsInfo) {
// This is not needed for external datasets' indexes since they never use the virtual buffer cache.
if (!dsInfo.isMemoryAllocated() && !dsInfo.isExternal()) {
- long additionalSize = getVirtualBufferCaches(dsInfo.getDatasetID()).getTotalSize();
- while (used + additionalSize > capacity) {
+ while (!memoryManager.allocate(datasetId)) {
if (!evictCandidateDataset()) {
throw new HyracksDataException("Cannot allocate dataset " + dsInfo.getDatasetID()
+ " memory since memory budget would be exceeded.");
}
}
- used += additionalSize;
dsInfo.setMemoryAllocated(true);
}
}
}
- public int getFirstAvilableUserDatasetID() {
- return firstAvilableUserDatasetID;
- }
-
- public int getNumPartitions() {
- return numPartitions;
- }
-
@Override
public void flushDataset(IReplicationStrategy replicationStrategy) throws HyracksDataException {
for (DatasetResource dsr : datasets.values()) {
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetMemoryManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetMemoryManager.java
new file mode 100644
index 0000000..88f406e
--- /dev/null
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetMemoryManager.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.context;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.logging.Logger;
+
+import org.apache.asterix.common.api.IDatasetMemoryManager;
+import org.apache.asterix.common.config.StorageProperties;
+import org.apache.asterix.common.metadata.MetadataIndexImmutableProperties;
+import org.apache.hyracks.util.annotations.ThreadSafe;
+
+@ThreadSafe
+public class DatasetMemoryManager implements IDatasetMemoryManager {
+
+ private static final Logger LOGGER = Logger.getLogger(DatasetMemoryManager.class.getName());
+ private final Map<Integer, Long> allocatedMap = new HashMap<>();
+ private final Map<Integer, Long> reservedMap = new HashMap<>();
+ private long available;
+ private final StorageProperties storageProperties;
+
+ public DatasetMemoryManager(StorageProperties storageProperties) {
+ this.storageProperties = storageProperties;
+ available = storageProperties.getMemoryComponentGlobalBudget();
+ }
+
+ @Override
+ public synchronized boolean allocate(int datasetId) {
+ if (allocatedMap.containsKey(datasetId)) {
+ throw new IllegalStateException("Memory is already allocated for dataset: " + datasetId);
+ }
+ if (reservedMap.containsKey(datasetId)) {
+ allocateReserved(datasetId);
+ return true;
+ }
+ final long required = getTotalSize(datasetId);
+ if (!isAllocatable(required)) {
+ return false;
+ }
+ allocatedMap.put(datasetId, required);
+ available -= required;
+ LOGGER.info(() -> "Allocated(" + required + ") for dataset(" + datasetId + ")");
+ return true;
+ }
+
+ @Override
+ public synchronized void deallocate(int datasetId) {
+ if (!allocatedMap.containsKey(datasetId) && !reservedMap.containsKey(datasetId)) {
+ throw new IllegalStateException("No allocated or reserved memory for dataset: " + datasetId);
+ }
+ final Long allocated = allocatedMap.remove(datasetId);
+ // return the allocated budget if it is not reserved
+ if (allocated != null && !reservedMap.containsKey(datasetId)) {
+ available += allocated;
+ LOGGER.info(() -> "Deallocated(" + allocated + ") from dataset(" + datasetId + ")");
+ }
+ }
+
+ @Override
+ public synchronized boolean reserve(int datasetId) {
+ if (reservedMap.containsKey(datasetId)) {
+ throw new IllegalStateException("Memory is already reserved for dataset: " + datasetId);
+ }
+ final long required = getTotalSize(datasetId);
+ if (!isAllocatable(required) && !allocatedMap.containsKey(datasetId)) {
+ return false;
+ }
+ reservedMap.put(datasetId, required);
+ // if the budget is already allocated, no need to reserve it again
+ if (!allocatedMap.containsKey(datasetId)) {
+ available -= required;
+ }
+ LOGGER.info(() -> "Reserved(" + required + ") for dataset(" + datasetId + ")");
+ return true;
+ }
+
+ @Override
+ public synchronized void cancelReserved(int datasetId) {
+ final Long reserved = reservedMap.remove(datasetId);
+ if (reserved == null) {
+ throw new IllegalStateException("No reserved memory for dataset: " + datasetId);
+ }
+ available += reserved;
+ LOGGER.info(() -> "Cancelled reserved(" + reserved + ") from dataset(" + datasetId + ")");
+ }
+
+ @Override
+ public long getAvailable() {
+ return available;
+ }
+
+ @Override
+ public int getNumPages(int datasetId) {
+ return MetadataIndexImmutableProperties.isMetadataDataset(datasetId) ?
+ storageProperties.getMetadataMemoryComponentNumPages() :
+ storageProperties.getMemoryComponentNumPages();
+ }
+
+ private long getTotalSize(int datasetId) {
+ return storageProperties.getMemoryComponentPageSize() * (long) getNumPages(datasetId);
+ }
+
+ private boolean isAllocatable(long required) {
+ return available - required >= 0;
+ }
+
+ private void allocateReserved(int datasetId) {
+ final Long reserved = reservedMap.get(datasetId);
+ allocatedMap.put(datasetId, reserved);
+ }
+}
\ No newline at end of file
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetResource.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetResource.java
index f2f3b93..b59fe6a 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetResource.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetResource.java
@@ -21,6 +21,7 @@
import java.util.Map;
import org.apache.asterix.common.dataflow.DatasetLocalResource;
+import org.apache.asterix.common.metadata.MetadataIndexImmutableProperties;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
import org.apache.hyracks.storage.common.IIndex;
@@ -141,4 +142,8 @@
public int getDatasetID() {
return datasetInfo.getDatasetID();
}
+
+ public boolean isMetadataDataset() {
+ return MetadataIndexImmutableProperties.isMetadataDataset(getDatasetID());
+ }
}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetVirtualBufferCaches.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetVirtualBufferCaches.java
index c7eda4d..c9b9698 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetVirtualBufferCaches.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetVirtualBufferCaches.java
@@ -33,51 +33,39 @@
public class DatasetVirtualBufferCaches {
private final int datasetID;
private final StorageProperties storageProperties;
- private final int firstAvilableUserDatasetID;
private final int numPartitions;
+ private final int numPages;
private final Map<Integer, List<IVirtualBufferCache>> ioDeviceVirtualBufferCaches = new HashMap<>();
- public DatasetVirtualBufferCaches(int datasetID, StorageProperties storageProperties,
- int firstAvilableUserDatasetID, int numPartitions) {
+ public DatasetVirtualBufferCaches(int datasetID, StorageProperties storageProperties, int numPages,
+ int numPartitions) {
this.datasetID = datasetID;
this.storageProperties = storageProperties;
- this.firstAvilableUserDatasetID = firstAvilableUserDatasetID;
this.numPartitions = numPartitions;
- }
-
- public List<IVirtualBufferCache> initializeVirtualBufferCaches(IResourceMemoryManager memoryManager,
- int ioDeviceNum) {
- int numPages = datasetID < firstAvilableUserDatasetID
- ? storageProperties.getMetadataMemoryComponentNumPages()
- : storageProperties.getMemoryComponentNumPages();
- List<IVirtualBufferCache> vbcs = new ArrayList<>();
- for (int i = 0; i < storageProperties.getMemoryComponentsNum(); i++) {
- MultitenantVirtualBufferCache vbc = new MultitenantVirtualBufferCache(
- new VirtualBufferCache(
- new ResourceHeapBufferAllocator(memoryManager,
- Integer.toString(datasetID)),
- storageProperties.getMemoryComponentPageSize(),
- numPages / storageProperties.getMemoryComponentsNum() / numPartitions));
- vbcs.add(vbc);
- }
- ioDeviceVirtualBufferCaches.put(ioDeviceNum, vbcs);
- return vbcs;
+ this.numPages = numPages;
}
public List<IVirtualBufferCache> getVirtualBufferCaches(IResourceMemoryManager memoryManager, int ioDeviceNum) {
synchronized (ioDeviceVirtualBufferCaches) {
List<IVirtualBufferCache> vbcs = ioDeviceVirtualBufferCaches.get(ioDeviceNum);
if (vbcs == null) {
- vbcs = initializeVirtualBufferCaches(memoryManager, ioDeviceNum);
+ vbcs = initializeVirtualBufferCaches(memoryManager, ioDeviceNum, numPages);
}
return vbcs;
}
}
- public long getTotalSize() {
- int numPages = datasetID < firstAvilableUserDatasetID
- ? storageProperties.getMetadataMemoryComponentNumPages()
- : storageProperties.getMemoryComponentNumPages();
- return storageProperties.getMemoryComponentPageSize() * ((long) numPages);
+ private List<IVirtualBufferCache> initializeVirtualBufferCaches(IResourceMemoryManager memoryManager,
+ int ioDeviceNum, int numPages) {
+ List<IVirtualBufferCache> vbcs = new ArrayList<>();
+ for (int i = 0; i < storageProperties.getMemoryComponentsNum(); i++) {
+ MultitenantVirtualBufferCache vbc = new MultitenantVirtualBufferCache(
+ new VirtualBufferCache(new ResourceHeapBufferAllocator(memoryManager, Integer.toString(datasetID)),
+ storageProperties.getMemoryComponentPageSize(),
+ numPages / storageProperties.getMemoryComponentsNum() / numPartitions));
+ vbcs.add(vbc);
+ }
+ ioDeviceVirtualBufferCaches.put(ioDeviceNum, vbcs);
+ return vbcs;
}
}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/metadata/MetadataIndexImmutableProperties.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/metadata/MetadataIndexImmutableProperties.java
index 74589cc..8b4c779 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/metadata/MetadataIndexImmutableProperties.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/metadata/MetadataIndexImmutableProperties.java
@@ -49,4 +49,8 @@
public String getDatasetName() {
return indexName;
}
+
+ public static boolean isMetadataDataset(int datasetId) {
+ return datasetId < FIRST_AVAILABLE_USER_DATASET_ID;
+ }
}
diff --git a/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/context/DatasetMemoryManagerTest.java b/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/context/DatasetMemoryManagerTest.java
new file mode 100644
index 0000000..b8e3604
--- /dev/null
+++ b/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/context/DatasetMemoryManagerTest.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.test.context;
+
+import org.apache.asterix.common.config.StorageProperties;
+import org.apache.asterix.common.context.DatasetMemoryManager;
+import org.junit.Assert;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+public class DatasetMemoryManagerTest {
+
+ private static final StorageProperties storageProperties;
+ private static final long GLOBAL_BUDGET = 1000L;
+ private static final long METADATA_DATASET_BUDGET = 200L;
+ private static final long DATASET_BUDGET = 400L;
+
+ static {
+ storageProperties = Mockito.mock(StorageProperties.class);
+ Mockito.when(storageProperties.getMemoryComponentGlobalBudget()).thenReturn(GLOBAL_BUDGET);
+ Mockito.when(storageProperties.getMemoryComponentNumPages()).thenReturn(8);
+ Mockito.when(storageProperties.getMetadataMemoryComponentNumPages()).thenReturn(4);
+ Mockito.when(storageProperties.getMemoryComponentPageSize()).thenReturn(50);
+ Mockito.when(storageProperties.getMemoryComponentsNum()).thenReturn(2);
+ }
+
+ @Test
+ public void allocate() {
+ DatasetMemoryManager memoryManager = new DatasetMemoryManager(storageProperties);
+ // double allocate
+ Assert.assertTrue(memoryManager.allocate(1));
+ boolean thrown = false;
+ try {
+ memoryManager.allocate(1);
+ } catch (IllegalStateException e) {
+ Assert.assertTrue(e.getMessage().contains("already allocated"));
+ thrown = true;
+ }
+ Assert.assertTrue(thrown);
+
+ // allocate metadata and non-metadata datasets
+ Assert.assertTrue(memoryManager.allocate(400));
+
+ long expectedBudget = GLOBAL_BUDGET - METADATA_DATASET_BUDGET - DATASET_BUDGET;
+ Assert.assertEquals(memoryManager.getAvailable(), expectedBudget);
+
+ // reserve after allocate shouldn't allocate the budget again
+ Assert.assertTrue(memoryManager.allocate(401));
+ Assert.assertTrue(memoryManager.reserve(401));
+
+ // deallocate should still keep the reserved memory
+ memoryManager.deallocate(401);
+ expectedBudget = GLOBAL_BUDGET - METADATA_DATASET_BUDGET - (DATASET_BUDGET * 2);
+ Assert.assertEquals(memoryManager.getAvailable(), expectedBudget);
+
+ // exceed budget should return false
+ Assert.assertFalse(memoryManager.allocate(402));
+ }
+
+ @Test
+ public void reserve() {
+ DatasetMemoryManager memoryManager = new DatasetMemoryManager(storageProperties);
+ // reserve then allocate budget
+ Assert.assertTrue(memoryManager.reserve(1));
+ Assert.assertTrue(memoryManager.allocate(1));
+ long expectedBudget = GLOBAL_BUDGET - METADATA_DATASET_BUDGET;
+ Assert.assertEquals(memoryManager.getAvailable(), expectedBudget);
+
+ // double reserve
+ boolean thrown = false;
+ Assert.assertTrue(memoryManager.reserve(2));
+ try {
+ memoryManager.reserve(2);
+ } catch (IllegalStateException e) {
+ Assert.assertTrue(e.getMessage().contains("already reserved"));
+ thrown = true;
+ }
+ Assert.assertTrue(thrown);
+
+ // cancel reserved
+ memoryManager.cancelReserved(2);
+ Assert.assertEquals(memoryManager.getAvailable(), expectedBudget);
+ }
+
+ @Test
+ public void deallocate() {
+ DatasetMemoryManager memoryManager = new DatasetMemoryManager(storageProperties);
+ // deallocate reserved
+ Assert.assertTrue(memoryManager.reserve(200));
+ Assert.assertTrue(memoryManager.allocate(200));
+ memoryManager.deallocate(200);
+ long expectedBudget = GLOBAL_BUDGET - DATASET_BUDGET;
+ Assert.assertEquals(memoryManager.getAvailable(), expectedBudget);
+
+ // deallocate not allocated
+ boolean thrown = false;
+ try {
+ memoryManager.deallocate(1);
+ } catch (IllegalStateException e) {
+ Assert.assertTrue(e.getMessage().contains("No allocated"));
+ thrown = true;
+ }
+ Assert.assertTrue(thrown);
+
+ // double deallocate
+ memoryManager.allocate(2);
+ memoryManager.deallocate(2);
+ thrown = false;
+ try {
+ memoryManager.deallocate(2);
+ } catch (IllegalStateException e) {
+ Assert.assertTrue(e.getMessage().contains("No allocated"));
+ thrown = true;
+ }
+ Assert.assertTrue(thrown);
+ }
+}
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java
index dc38749..a6f1ad0 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java
@@ -302,6 +302,11 @@
*/
public static void enlistMetadataDataset(INCServiceContext ncServiceCtx, IMetadataIndex index)
throws HyracksDataException {
+ final int datasetId = index.getDatasetId().getId();
+ // reserve memory for metadata dataset to ensure it can be opened when needed
+ if (!appContext.getDatasetMemoryManager().reserve(index.getDatasetId().getId())) {
+ throw new IllegalStateException("Failed to reserve memory for metadata dataset (" + datasetId + ")");
+ }
ClusterPartition metadataPartition = appContext.getMetadataProperties().getMetadataPartition();
int metadataDeviceId = metadataPartition.getIODeviceNum();
String metadataPartitionPath = StoragePathUtil.prepareStoragePartitionPath(
@@ -317,20 +322,20 @@
// We are unable to do this since IStorageManager needs a dataset to determine the appropriate
// objects
ILSMOperationTrackerFactory opTrackerFactory =
- index.isPrimaryIndex() ? new PrimaryIndexOperationTrackerFactory(index.getDatasetId().getId())
- : new SecondaryIndexOperationTrackerFactory(index.getDatasetId().getId());
+ index.isPrimaryIndex() ? new PrimaryIndexOperationTrackerFactory(datasetId)
+ : new SecondaryIndexOperationTrackerFactory(datasetId);
ILSMIOOperationCallbackFactory ioOpCallbackFactory = LSMBTreeIOOperationCallbackFactory.INSTANCE;
IStorageComponentProvider storageComponentProvider = appContext.getStorageComponentProvider();
if (isNewUniverse()) {
LSMBTreeLocalResourceFactory lsmBtreeFactory = new LSMBTreeLocalResourceFactory(
storageComponentProvider.getStorageManager(), typeTraits, cmpFactories, null, null, null,
opTrackerFactory, ioOpCallbackFactory, storageComponentProvider.getMetadataPageManagerFactory(),
- new AsterixVirtualBufferCacheProvider(index.getDatasetId().getId()),
+ new AsterixVirtualBufferCacheProvider(datasetId),
storageComponentProvider.getIoOperationSchedulerProvider(),
appContext.getMetadataMergePolicyFactory(), GlobalConfig.DEFAULT_COMPACTION_POLICY_PROPERTIES, true,
bloomFilterKeyFields, appContext.getBloomFilterFalsePositiveRate(), true, null);
DatasetLocalResourceFactory dsLocalResourceFactory =
- new DatasetLocalResourceFactory(index.getDatasetId().getId(), lsmBtreeFactory);
+ new DatasetLocalResourceFactory(datasetId, lsmBtreeFactory);
// TODO(amoudi) Creating the index should be done through the same code path as other indexes
// This is to be done by having a metadata dataset associated with each index
IIndexBuilder indexBuilder = new IndexBuilder(ncServiceCtx, storageComponentProvider.getStorageManager(),
diff --git a/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/annotations/ThreadSafe.java b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/annotations/ThreadSafe.java
new file mode 100644
index 0000000..2766c37
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/annotations/ThreadSafe.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.util.annotations;
+
+import java.lang.annotation.Documented;
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+/**
+ * The type to which this annotation is applied is thread-safe.
+ */
+@Documented
+@Target(ElementType.TYPE)
+@Retention(RetentionPolicy.SOURCE)
+public @interface ThreadSafe {
+}
\ No newline at end of file