[ASTERIXDB-2715][STO] Dynamic Memory Component Architecture
- user model changes: yes
- storage format changes: no
- interface changes: yes
Details:
- Introduce a dynamic memory component architecture that uses a global
virtual buffer cache to manage the write memory for all LSM-trees.
- When the overall write memory is nearly full, we flush a dataset
partition at a time using a round-robin way. Additionally, we allow
users to configure the maximum size of filtered memory components
to provide better pruning capability.
- Clean up legacy code for statically allocating write memory to each
dataset.
- Remove the following parameters:
storage.metadata.memorycomponent.numpages
storage.max.active.writable.datasets
- Add the following parameters:
storage.memorycomponent.flush.threshold (default: 0.9)
storage.filtered.memorycomponent.max.size (default 0)
Change-Id: Ia6a0f4de020acd7af89ef630322526c4be5076e0
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/5824
Reviewed-by: Luo Chen <cluo8@uci.edu>
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
Contrib: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
index 13ca95d..01fa365 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
@@ -33,7 +33,6 @@
import org.apache.asterix.common.api.IConfigValidatorFactory;
import org.apache.asterix.common.api.ICoordinationService;
import org.apache.asterix.common.api.IDatasetLifecycleManager;
-import org.apache.asterix.common.api.IDatasetMemoryManager;
import org.apache.asterix.common.api.INcApplicationContext;
import org.apache.asterix.common.api.IPropertiesFactory;
import org.apache.asterix.common.api.IReceptionist;
@@ -50,7 +49,7 @@
import org.apache.asterix.common.config.StorageProperties;
import org.apache.asterix.common.config.TransactionProperties;
import org.apache.asterix.common.context.DatasetLifecycleManager;
-import org.apache.asterix.common.context.DatasetMemoryManager;
+import org.apache.asterix.common.context.GlobalVirtualBufferCache;
import org.apache.asterix.common.context.IStorageComponentProvider;
import org.apache.asterix.common.library.ILibraryManager;
import org.apache.asterix.common.replication.IReplicationChannel;
@@ -91,6 +90,7 @@
import org.apache.hyracks.ipc.impl.HyracksConnection;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationScheduler;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMMergePolicyFactory;
+import org.apache.hyracks.storage.am.lsm.common.api.IVirtualBufferCache;
import org.apache.hyracks.storage.am.lsm.common.impls.AsynchronousScheduler;
import org.apache.hyracks.storage.am.lsm.common.impls.ConcurrentMergePolicyFactory;
import org.apache.hyracks.storage.am.lsm.common.impls.GreedyScheduler;
@@ -130,9 +130,9 @@
private final MessagingProperties messagingProperties;
private final NodeProperties nodeProperties;
private ExecutorService threadExecutor;
- private IDatasetMemoryManager datasetMemoryManager;
private IDatasetLifecycleManager datasetLifecycleManager;
private IBufferCache bufferCache;
+ private IVirtualBufferCache virtualBufferCache;
private ITransactionSubsystem txnSubsystem;
private IMetadataNode metadataNodeStub;
private ILSMIOOperationScheduler lsmIOScheduler;
@@ -205,10 +205,13 @@
}
localResourceRepository.deleteStorageData();
}
- datasetMemoryManager = new DatasetMemoryManager(storageProperties);
+ virtualBufferCache = new GlobalVirtualBufferCache(allocator, storageProperties);
+ // Must start vbc now instead of by life cycle component manager (lccm) because lccm happens after
+ // the metadata bootstrap task
+ ((ILifeCycleComponent) virtualBufferCache).start();
datasetLifecycleManager =
new DatasetLifecycleManager(storageProperties, localResourceRepository, txnSubsystem.getLogManager(),
- datasetMemoryManager, indexCheckpointManagerProvider, ioManager.getIODevices().size());
+ virtualBufferCache, indexCheckpointManagerProvider, ioManager.getIODevices().size());
final String nodeId = getServiceContext().getNodeId();
final ClusterPartition[] nodePartitions = metadataProperties.getNodePartitions().get(nodeId);
final Set<Integer> nodePartitionsIds =
@@ -246,6 +249,7 @@
* managers. Notes: registered components are stopped in reversed order
*/
ILifeCycleComponentManager lccm = getServiceContext().getLifeCycleComponentManager();
+ lccm.register((ILifeCycleComponent) virtualBufferCache);
lccm.register((ILifeCycleComponent) bufferCache);
/*
* LogManager must be stopped after RecoveryManager, DatasetLifeCycleManager, and ReplicationManager
@@ -297,6 +301,11 @@
}
@Override
+ public IVirtualBufferCache getVirtualBufferCache() {
+ return virtualBufferCache;
+ }
+
+ @Override
public ITransactionSubsystem getTransactionSubsystem() {
return txnSubsystem;
}
@@ -307,11 +316,6 @@
}
@Override
- public IDatasetMemoryManager getDatasetMemoryManager() {
- return datasetMemoryManager;
- }
-
- @Override
public ILSMIOOperationScheduler getLSMIOScheduler() {
return lsmIOScheduler;
}
diff --git a/asterixdb/asterix-app/src/main/resources/cc.conf b/asterixdb/asterix-app/src/main/resources/cc.conf
index 1b8e034..d85a5ef 100644
--- a/asterixdb/asterix-app/src/main/resources/cc.conf
+++ b/asterixdb/asterix-app/src/main/resources/cc.conf
@@ -39,6 +39,7 @@
storage.buffercache.size=128MB
storage.memorycomponent.globalbudget=512MB
storage.io.scheduler=greedy
+storage.filtered.memorycomponent.max.size=16MB
[cc]
address = 127.0.0.1
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
index 9f37c9b..ebf98df 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
@@ -691,7 +691,8 @@
this.mergePolicyFactory = mergePolicyFactory;
this.mergePolicyProperties = mergePolicyProperties;
this.primaryKeyIndexes = primaryKeyIndexes;
- primaryIndexNumOfTupleFields = primaryKeyTypes.length + (1 + ((metaType == null) ? 0 : 1));
+ primaryIndexNumOfTupleFields = primaryKeyTypes.length + (1 + ((metaType == null) ? 0 : 1))
+ + (filterFields != null ? filterFields.length : 0);
primaryIndexTypeTraits =
createPrimaryIndexTypeTraits(primaryIndexNumOfTupleFields, primaryKeyTypes, recordType, metaType);
primaryIndexSerdes =
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/GlobalVirtualBufferCacheTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/GlobalVirtualBufferCacheTest.java
new file mode 100644
index 0000000..0b07bc4
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/GlobalVirtualBufferCacheTest.java
@@ -0,0 +1,289 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.test.dataflow;
+
+import java.io.File;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.asterix.app.bootstrap.TestNodeController;
+import org.apache.asterix.app.bootstrap.TestNodeController.PrimaryIndexInfo;
+import org.apache.asterix.app.data.gen.RecordTupleGenerator;
+import org.apache.asterix.app.nc.NCAppRuntimeContext;
+import org.apache.asterix.common.api.IDatasetLifecycleManager;
+import org.apache.asterix.common.config.DatasetConfig.DatasetType;
+import org.apache.asterix.common.config.StorageProperties.Option;
+import org.apache.asterix.common.transactions.ITransactionContext;
+import org.apache.asterix.common.transactions.ITransactionManager;
+import org.apache.asterix.common.transactions.TransactionOptions;
+import org.apache.asterix.external.util.DataflowUtils;
+import org.apache.asterix.metadata.entities.Dataset;
+import org.apache.asterix.metadata.entities.InternalDatasetDetails;
+import org.apache.asterix.metadata.entities.InternalDatasetDetails.PartitioningStrategy;
+import org.apache.asterix.om.types.ATypeTag;
+import org.apache.asterix.runtime.operators.LSMPrimaryInsertOperatorNodePushable;
+import org.apache.asterix.test.common.TestHelper;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.hyracks.api.comm.FixedSizeFrame;
+import org.apache.hyracks.api.config.IOption;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleReference;
+import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
+import org.apache.hyracks.dataflow.common.utils.TupleUtils;
+import org.apache.hyracks.storage.am.common.api.IIndexDataflowHelper;
+import org.apache.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
+import org.apache.hyracks.storage.am.common.dataflow.IndexDataflowHelperFactory;
+import org.apache.hyracks.storage.am.common.impls.AbstractTreeIndex;
+import org.apache.hyracks.storage.am.lsm.btree.impl.TestLsmBtree;
+import org.apache.hyracks.storage.am.lsm.common.impls.NoMergePolicyFactory;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class GlobalVirtualBufferCacheTest {
+ public static final Logger LOGGER = LogManager.getLogger();
+ private static TestNodeController nc;
+ private static Dataset dataset;
+ private static Dataset filteredDataset;
+ private static PrimaryIndexInfo[] primaryIndexInfos;
+ private static PrimaryIndexInfo[] filteredPrimaryIndexInfos;
+ private static TestLsmBtree[] primaryIndexes;
+ private static TestLsmBtree[] filteredPrimaryIndexes;
+ private static NCAppRuntimeContext ncAppCtx;
+ private static IDatasetLifecycleManager dsLifecycleMgr;
+
+ private static IHyracksTaskContext[] testCtxs;
+ private static IHyracksTaskContext[] filteredTestCtxs;
+ private static IIndexDataflowHelper[] primaryIndexDataflowHelpers;
+ private static IIndexDataflowHelper[] filteredPrimaryIndexDataflowHelpers;
+ private static ITransactionContext txnCtx;
+ private static ITransactionContext filteredTxnCtx;
+ private static RecordTupleGenerator tupleGenerator;
+
+ private static final int NUM_PARTITIONS = 2;
+ private static final long FILTERED_MEMORY_COMPONENT_SIZE = 16 * 1024l;
+
+ @BeforeClass
+ public static void setUp() throws Exception {
+ System.out.println("SetUp: ");
+ TestHelper.deleteExistingInstanceFiles();
+ String configPath = System.getProperty("user.dir") + File.separator + "src" + File.separator + "test"
+ + File.separator + "resources" + File.separator + "cc.conf";
+ nc = new TestNodeController(configPath, false);
+ }
+
+ @Before
+ public void initializeTest() throws Exception {
+ // initialize NC before each test
+ initializeNc();
+ initializeTestCtx();
+ createIndex();
+ readIndex();
+ tupleGenerator = StorageTestUtils.getTupleGenerator();
+ }
+
+ @After
+ public void deinitializeTest() throws Exception {
+ dropIndex();
+ // cleanup after each test case
+ nc.deInit(true);
+ nc.clearOpts();
+ }
+
+ @Test
+ public void testFlushes() throws Exception {
+ List<Thread> threads = new ArrayList<>();
+ int records = 16 * 1024;
+ int threadsPerPartition = 2;
+ AtomicReference<Exception> exceptionRef = new AtomicReference<>();
+ for (int p = 0; p < NUM_PARTITIONS; p++) {
+ for (int t = 0; t < threadsPerPartition; t++) {
+ threads.add(insertRecords(records, p, false, exceptionRef));
+ threads.add(insertRecords(records, p, true, exceptionRef));
+ }
+ }
+ for (Thread thread : threads) {
+ thread.join();
+ }
+ if (exceptionRef.get() != null) {
+ exceptionRef.get().printStackTrace();
+ Assert.fail();
+ }
+ for (int i = 0; i < NUM_PARTITIONS; i++) {
+ Assert.assertFalse(primaryIndexes[i].getDiskComponents().isEmpty());
+ Assert.assertTrue(
+ primaryIndexes[i].getDiskComponents().stream().anyMatch(c -> ((AbstractTreeIndex) c.getIndex())
+ .getFileReference().getFile().length() > FILTERED_MEMORY_COMPONENT_SIZE));
+
+ Assert.assertFalse(filteredPrimaryIndexes[i].getDiskComponents().isEmpty());
+ Assert.assertTrue(filteredPrimaryIndexes[i].getDiskComponents().stream()
+ .allMatch(c -> ((AbstractTreeIndex) c.getIndex()).getFileReference().getFile()
+ .length() <= FILTERED_MEMORY_COMPONENT_SIZE));
+ }
+
+ nc.getTransactionManager().commitTransaction(txnCtx.getTxnId());
+ nc.getTransactionManager().commitTransaction(filteredTxnCtx.getTxnId());
+ }
+
+ private void initializeNc() throws Exception {
+ List<Pair<IOption, Object>> opts = new ArrayList<>();
+ opts.add(Pair.of(Option.STORAGE_MEMORYCOMPONENT_GLOBALBUDGET, 128 * 1024L));
+ opts.add(Pair.of(Option.STORAGE_MEMORYCOMPONENT_PAGESIZE, 1 * 1024));
+ opts.add(Pair.of(Option.STORAGE_BUFFERCACHE_PAGESIZE, 1 * 1024));
+ opts.add(Pair.of(Option.STORAGE_FILTERED_MEMORYCOMPONENT_MAX_SIZE, FILTERED_MEMORY_COMPONENT_SIZE));
+ opts.add(Pair.of(Option.STORAGE_FILTERED_MEMORYCOMPONENT_MAX_SIZE, FILTERED_MEMORY_COMPONENT_SIZE));
+
+ nc.setOpts(opts);
+
+ nc.init(true);
+ ncAppCtx = nc.getAppRuntimeContext();
+ dsLifecycleMgr = ncAppCtx.getDatasetLifecycleManager();
+ }
+
+ private void createIndex() throws Exception {
+ dataset = new TestDataset(StorageTestUtils.DATAVERSE_NAME, "ds", StorageTestUtils.DATAVERSE_NAME,
+ StorageTestUtils.DATA_TYPE_NAME, StorageTestUtils.NODE_GROUP_NAME,
+ NoMergePolicyFactory.NAME, null, new InternalDatasetDetails(null, PartitioningStrategy.HASH,
+ StorageTestUtils.PARTITIONING_KEYS, null, null, null, false, null),
+ null, DatasetType.INTERNAL, StorageTestUtils.DATASET_ID, 0);
+
+ filteredDataset = new TestDataset(StorageTestUtils.DATAVERSE_NAME, "filtered_ds",
+ StorageTestUtils.DATAVERSE_NAME, StorageTestUtils.DATA_TYPE_NAME, StorageTestUtils.NODE_GROUP_NAME,
+ NoMergePolicyFactory.NAME, null,
+ new InternalDatasetDetails(null, PartitioningStrategy.HASH, StorageTestUtils.PARTITIONING_KEYS, null,
+ null, null, false, Collections.singletonList("value")),
+ null, DatasetType.INTERNAL, StorageTestUtils.DATASET_ID + 1, 0);
+
+ primaryIndexInfos = new PrimaryIndexInfo[NUM_PARTITIONS];
+ filteredPrimaryIndexInfos = new PrimaryIndexInfo[NUM_PARTITIONS];
+ for (int i = 0; i < NUM_PARTITIONS; i++) {
+ primaryIndexInfos[i] = StorageTestUtils.createPrimaryIndex(nc, dataset, i);
+ filteredPrimaryIndexInfos[i] = StorageTestUtils.createPrimaryIndex(nc, filteredDataset, i);
+ }
+ }
+
+ private void initializeTestCtx() throws Exception {
+ JobId jobId = nc.newJobId();
+ JobId filteredJobId = nc.newJobId();
+
+ testCtxs = new IHyracksTaskContext[NUM_PARTITIONS];
+ filteredTestCtxs = new IHyracksTaskContext[NUM_PARTITIONS];
+
+ for (int i = 0; i < NUM_PARTITIONS; i++) {
+ testCtxs[i] = nc.createTestContext(jobId, i, false);
+ filteredTestCtxs[i] = nc.createTestContext(filteredJobId, i, false);
+ }
+ txnCtx = nc.getTransactionManager().beginTransaction(nc.getTxnJobId(jobId),
+ new TransactionOptions(ITransactionManager.AtomicityLevel.ENTITY_LEVEL));
+ filteredTxnCtx = nc.getTransactionManager().beginTransaction(nc.getTxnJobId(filteredJobId),
+ new TransactionOptions(ITransactionManager.AtomicityLevel.ENTITY_LEVEL));
+ }
+
+ private void readIndex() throws HyracksDataException {
+ primaryIndexDataflowHelpers = new IIndexDataflowHelper[NUM_PARTITIONS];
+ primaryIndexes = new TestLsmBtree[NUM_PARTITIONS];
+
+ filteredPrimaryIndexDataflowHelpers = new IIndexDataflowHelper[NUM_PARTITIONS];
+ filteredPrimaryIndexes = new TestLsmBtree[NUM_PARTITIONS];
+
+ for (int i = 0; i < NUM_PARTITIONS; i++) {
+ IIndexDataflowHelperFactory factory =
+ new IndexDataflowHelperFactory(nc.getStorageManager(), primaryIndexInfos[i].getFileSplitProvider());
+ primaryIndexDataflowHelpers[i] = factory.create(testCtxs[i].getJobletContext().getServiceContext(), i);
+ primaryIndexDataflowHelpers[i].open();
+ primaryIndexes[i] = (TestLsmBtree) primaryIndexDataflowHelpers[i].getIndexInstance();
+ primaryIndexDataflowHelpers[i].close();
+
+ IIndexDataflowHelperFactory filteredFactory = new IndexDataflowHelperFactory(nc.getStorageManager(),
+ filteredPrimaryIndexInfos[i].getFileSplitProvider());
+ filteredPrimaryIndexDataflowHelpers[i] =
+ filteredFactory.create(filteredTestCtxs[i].getJobletContext().getServiceContext(), i);
+ filteredPrimaryIndexDataflowHelpers[i].open();
+ filteredPrimaryIndexes[i] = (TestLsmBtree) filteredPrimaryIndexDataflowHelpers[i].getIndexInstance();
+ filteredPrimaryIndexDataflowHelpers[i].close();
+ }
+ }
+
+ private void dropIndex() throws HyracksDataException {
+ for (int i = 0; i < NUM_PARTITIONS; i++) {
+ primaryIndexDataflowHelpers[i].destroy();
+ filteredPrimaryIndexDataflowHelpers[i].destroy();
+ }
+ }
+
+ private Thread insertRecords(int records, int partition, boolean filtered, AtomicReference<Exception> exceptionRef)
+ throws Exception {
+ Thread thread = new Thread(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ LSMPrimaryInsertOperatorNodePushable insertOp = filtered ? nc
+ .getInsertPipeline(filteredTestCtxs[partition], filteredDataset, StorageTestUtils.KEY_TYPES,
+ StorageTestUtils.RECORD_TYPE, StorageTestUtils.META_TYPE,
+ filteredPrimaryIndexes[partition].getFilterFields(), StorageTestUtils.KEY_INDEXES,
+ StorageTestUtils.KEY_INDICATORS_LIST, StorageTestUtils.STORAGE_MANAGER, null, null)
+ .getLeft()
+ : nc.getInsertPipeline(testCtxs[partition], dataset, StorageTestUtils.KEY_TYPES,
+ StorageTestUtils.RECORD_TYPE, StorageTestUtils.META_TYPE, null,
+ StorageTestUtils.KEY_INDEXES, StorageTestUtils.KEY_INDICATORS_LIST,
+ StorageTestUtils.STORAGE_MANAGER, null, null).getLeft();
+ insertOp.open();
+ FrameTupleAppender tupleAppender =
+ new FrameTupleAppender(new FixedSizeFrame(ByteBuffer.allocate(512)));
+
+ ArrayTupleBuilder tupleBuilder = new ArrayTupleBuilder(filtered ? 3 : 2);
+ ArrayTupleReference tupleRef = new ArrayTupleReference();
+ for (int i = 0; i < records; i++) {
+ synchronized (tupleGenerator) {
+ ITupleReference tuple = tupleGenerator.next();
+ TupleUtils.copyTuple(tupleBuilder, tuple, 2);
+ if (filtered) {
+ // append the filter field
+ tupleBuilder.getDataOutput().writeByte(ATypeTag.SERIALIZED_INT64_TYPE_TAG);
+ tupleBuilder.getDataOutput().writeLong(0l);
+ tupleBuilder.addFieldEndOffset();
+ }
+ tupleRef.reset(tupleBuilder.getFieldEndOffsets(), tupleBuilder.getByteArray());
+ }
+ DataflowUtils.addTupleToFrame(tupleAppender, tupleRef, insertOp);
+ }
+ tupleAppender.write(insertOp, true);
+ insertOp.close();
+ } catch (Exception e) {
+ e.printStackTrace();
+ exceptionRef.compareAndSet(null, e);
+ }
+ }
+ });
+ thread.start();
+ return thread;
+ }
+
+}
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/LSMFlushRecoveryTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/LSMFlushRecoveryTest.java
index 03ca1f0..fe73baf 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/LSMFlushRecoveryTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/LSMFlushRecoveryTest.java
@@ -132,9 +132,6 @@
List<Pair<IOption, Object>> opts = new ArrayList<>();
opts.add(Pair.of(Option.STORAGE_MEMORYCOMPONENT_GLOBALBUDGET, 20 * 1024 * 1024L));
opts.add(Pair.of(Option.STORAGE_MEMORYCOMPONENT_PAGESIZE, 1 * 1024));
- // each memory component only gets 4 pages (we have 2 partitions, 2 memory components/partition)
- // and some reserved memory for metadata dataset
- opts.add(Pair.of(Option.STORAGE_MAX_ACTIVE_WRITABLE_DATASETS, 1024));
nc.setOpts(opts);
initializeNc(false);
initializeTestCtx();
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/MultiPartitionLSMIndexTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/MultiPartitionLSMIndexTest.java
index 1e1df54..d43b6d5 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/MultiPartitionLSMIndexTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/MultiPartitionLSMIndexTest.java
@@ -35,6 +35,7 @@
import org.apache.asterix.common.api.IDatasetLifecycleManager;
import org.apache.asterix.common.config.DatasetConfig.DatasetType;
import org.apache.asterix.common.config.DatasetConfig.IndexType;
+import org.apache.asterix.common.config.StorageProperties.Option;
import org.apache.asterix.common.metadata.DataverseName;
import org.apache.asterix.common.transactions.ITransactionContext;
import org.apache.asterix.common.transactions.ITransactionManager;
@@ -51,7 +52,9 @@
import org.apache.asterix.runtime.operators.LSMPrimaryInsertOperatorNodePushable;
import org.apache.asterix.test.common.TestHelper;
import org.apache.commons.lang3.mutable.MutableBoolean;
+import org.apache.commons.lang3.tuple.Pair;
import org.apache.hyracks.api.comm.VSizeFrame;
+import org.apache.hyracks.api.config.IOption;
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.job.JobId;
@@ -132,6 +135,9 @@
String configPath = System.getProperty("user.dir") + File.separator + "src" + File.separator + "test"
+ File.separator + "resources" + File.separator + "cc-multipart.conf";
nc = new TestNodeController(configPath, false);
+ List<Pair<IOption, Object>> opts = new ArrayList<>();
+ opts.add(Pair.of(Option.STORAGE_MEMORYCOMPONENT_GLOBALBUDGET, 16 * 1024 * 1024L));
+ nc.setOpts(opts);
nc.init();
ncAppCtx = nc.getAppRuntimeContext();
dsLifecycleMgr = ncAppCtx.getDatasetLifecycleManager();
@@ -262,20 +268,23 @@
MutableBoolean proceedToScheduleFlush = new MutableBoolean(false);
primaryLsmBtrees[0].addVirtuablBufferCacheCallback(new IVirtualBufferCacheCallback() {
@Override
- public void isFullChanged(boolean newValue) {
- synchronized (isFull) {
- isFull.set(newValue);
- isFull.notifyAll();
- }
- synchronized (proceedToScheduleFlush) {
- while (!proceedToScheduleFlush.booleanValue()) {
- try {
- proceedToScheduleFlush.wait();
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- return;
+ public void isFullChanged(boolean newValue, ILSMMemoryComponent memoryComponent) {
+ if (memoryComponent != null && memoryComponent.getLsmIndex() == primaryLsmBtrees[0]) {
+ synchronized (isFull) {
+ isFull.set(newValue);
+ isFull.notifyAll();
+ }
+ synchronized (proceedToScheduleFlush) {
+ while (!proceedToScheduleFlush.booleanValue()) {
+ try {
+ proceedToScheduleFlush.wait();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ return;
+ }
}
}
+ System.out.println("Proceed to flush");
}
}
});
@@ -350,7 +359,7 @@
// Now we need to know that the flush has been scheduled
synchronized (flushStarted) {
while (!flushStarted.booleanValue()) {
- flushStarted.wait();
+ flushStarted.wait(100);
}
}
@@ -433,18 +442,20 @@
MutableBoolean proceedAfterIsFullChanged = new MutableBoolean(false);
primaryLsmBtrees[1].addVirtuablBufferCacheCallback(new IVirtualBufferCacheCallback() {
@Override
- public void isFullChanged(boolean newValue) {
- synchronized (isFull) {
- isFull.set(newValue);
- isFull.notifyAll();
- }
- synchronized (proceedAfterIsFullChanged) {
- while (!proceedAfterIsFullChanged.booleanValue()) {
- try {
- proceedAfterIsFullChanged.wait();
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- return;
+ public void isFullChanged(boolean newValue, ILSMMemoryComponent memoryComponent) {
+ if (memoryComponent != null && memoryComponent.getLsmIndex() == primaryLsmBtrees[1]) {
+ synchronized (isFull) {
+ isFull.set(newValue);
+ isFull.notifyAll();
+ }
+ synchronized (proceedAfterIsFullChanged) {
+ while (!proceedAfterIsFullChanged.booleanValue()) {
+ try {
+ proceedAfterIsFullChanged.wait();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ return;
+ }
}
}
}
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/ClusterStateDefaultParameterTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/ClusterStateDefaultParameterTest.java
index e752493..3d5dad8 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/ClusterStateDefaultParameterTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/ClusterStateDefaultParameterTest.java
@@ -87,12 +87,16 @@
Assert.assertTrue(getValue(row) == maxHeap / 4);
matchCount++;
}
- if (row.contains("storage.max.active.writable.datasets")) {
- Assert.assertTrue(getValue(row) == 8);
+ if (row.contains("storage.memorycomponent.flush.threshold")) {
+ Assert.assertTrue(getDoubleValue(row) == 0.9d);
+ matchCount++;
+ }
+ if (row.contains("storage.filtered.memorycomponent.max.size")) {
+ Assert.assertTrue(getValue(row) == 0);
matchCount++;
}
}
- Assert.assertTrue(matchCount == 3);
+ Assert.assertTrue(matchCount == 4);
}
// Parses a long value parameter.
@@ -100,4 +104,10 @@
String valueStr = row.split(":")[1].trim();
return Long.parseLong(valueStr);
}
+
+ // Parses a long value parameter.
+ private double getDoubleValue(String row) {
+ String valueStr = row.split(":")[1].trim();
+ return Double.parseDouble(valueStr);
+ }
}
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/txn/RecoveryManagerTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/txn/RecoveryManagerTest.java
index 65b10a0..59d7fae 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/txn/RecoveryManagerTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/txn/RecoveryManagerTest.java
@@ -22,7 +22,6 @@
import org.apache.asterix.api.common.AsterixHyracksIntegrationUtil;
import org.apache.asterix.common.TestDataUtil;
-import org.apache.asterix.common.config.StorageProperties;
import org.apache.asterix.metadata.bootstrap.MetadataBuiltinEntities;
import org.junit.After;
import org.junit.Assert;
@@ -42,7 +41,6 @@
@Before
public void setUp() throws Exception {
- integrationUtil.addOption(StorageProperties.Option.STORAGE_MAX_ACTIVE_WRITABLE_DATASETS, 20);
integrationUtil.setGracefulShutdown(false);
integrationUtil.init(true, TEST_CONFIG_FILE_PATH);
}
diff --git a/asterixdb/asterix-app/src/test/resources/cc-small-txn-log-partition.conf b/asterixdb/asterix-app/src/test/resources/cc-small-txn-log-partition.conf
index b456155..562dedd 100644
--- a/asterixdb/asterix-app/src/test/resources/cc-small-txn-log-partition.conf
+++ b/asterixdb/asterix-app/src/test/resources/cc-small-txn-log-partition.conf
@@ -55,5 +55,4 @@
txn.log.partitionsize=2MB
txn.log.buffer.pagesize=128KB
txn.log.checkpoint.pollfrequency=2147483647
-txn.log.checkpoint.history=0
-storage.max.active.writable.datasets=50
\ No newline at end of file
+txn.log.checkpoint.history=0
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1/cluster_state_1.1.regexadm b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1/cluster_state_1.1.regexadm
index acfc5e0..50a901f 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1/cluster_state_1.1.regexadm
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1/cluster_state_1.1.regexadm
@@ -37,7 +37,6 @@
"replication\.strategy" : "none",
"replication\.timeout" : 30,
"ssl\.enabled" : false,
- "storage.max.active.writable.datasets" : 8,
"txn\.commitprofiler\.enabled" : false,
"txn\.commitprofiler\.reportinterval" : 5,
"txn\.dataset\.checkpoint\.interval" : 3600,
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_full/cluster_state_1_full.1.regexadm b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_full/cluster_state_1_full.1.regexadm
index 6d28223..318206d 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_full/cluster_state_1_full.1.regexadm
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_full/cluster_state_1_full.1.regexadm
@@ -37,7 +37,6 @@
"replication\.strategy" : "none",
"replication\.timeout" : 30,
"ssl\.enabled" : false,
- "storage.max.active.writable.datasets" : 8,
"txn\.commitprofiler\.enabled" : false,
"txn\.commitprofiler\.reportinterval" : 5,
"txn\.dataset\.checkpoint\.interval" : 3600,
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_less/cluster_state_1_less.1.regexadm b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_less/cluster_state_1_less.1.regexadm
index 0724b25..69e9b0e 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_less/cluster_state_1_less.1.regexadm
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_less/cluster_state_1_less.1.regexadm
@@ -37,7 +37,6 @@
"replication\.strategy" : "none",
"replication\.timeout" : 30,
"ssl\.enabled" : false,
- "storage.max.active.writable.datasets" : 8,
"txn\.commitprofiler\.enabled" : false,
"txn\.commitprofiler\.reportinterval" : 5,
"txn\.dataset\.checkpoint\.interval" : 3600,
diff --git a/asterixdb/asterix-common/pom.xml b/asterixdb/asterix-common/pom.xml
index 2d354a7..ae2e77a 100644
--- a/asterixdb/asterix-common/pom.xml
+++ b/asterixdb/asterix-common/pom.xml
@@ -295,5 +295,9 @@
<groupId>org.apache.hyracks</groupId>
<artifactId>hyracks-control-nc</artifactId>
</dependency>
+ <dependency>
+ <groupId>it.unimi.dsi</groupId>
+ <artifactId>fastutil</artifactId>
+ </dependency>
</dependencies>
</project>
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetMemoryManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetMemoryManager.java
deleted file mode 100644
index fde2c80..0000000
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetMemoryManager.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.common.api;
-
-public interface IDatasetMemoryManager {
-
- /**
- * Allocates memory for dataset {@code datasetId}.
- *
- * @param datasetId
- * @return true, if the allocation is successful, otherwise false.
- */
- boolean allocate(int datasetId);
-
- /**
- * Deallocates memory of dataset {@code datasetId}.
- *
- * @param datasetId
- */
- void deallocate(int datasetId);
-
- /**
- * Reserves memory for dataset {@code datasetId}. The reserved memory
- * is guaranteed to be allocatable when needed for the dataset. Reserve
- * maybe called after allocation to reserve the allocated budget
- * on deallocation.
- *
- * @param datasetId
- * @return true, if the allocation is successful, otherwise false.
- */
- boolean reserve(int datasetId);
-
- /**
- * Cancels the reserved memory for dataset {@code datasetId}.
- *
- * @param datasetId
- */
- void cancelReserved(int datasetId);
-
- /**
- * @return The remaining memory budget that can be used for datasets.
- */
- long getAvailable();
-
- /**
- * @param datasetId
- * @return The number of virtual buffer cache pages that should be allocated for dataset {@code datasetId}.
- */
- int getNumPages(int datasetId);
-}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/INcApplicationContext.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/INcApplicationContext.java
index 79828e1..eed186c 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/INcApplicationContext.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/INcApplicationContext.java
@@ -37,6 +37,7 @@
import org.apache.hyracks.api.io.IPersistedResourceRegistry;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationScheduler;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMMergePolicyFactory;
+import org.apache.hyracks.storage.am.lsm.common.api.IVirtualBufferCache;
import org.apache.hyracks.storage.common.ILocalResourceRepository;
import org.apache.hyracks.storage.common.buffercache.IBufferCache;
import org.apache.hyracks.storage.common.file.IResourceIdFactory;
@@ -60,12 +61,12 @@
IBufferCache getBufferCache();
+ IVirtualBufferCache getVirtualBufferCache();
+
ILocalResourceRepository getLocalResourceRepository();
IDatasetLifecycleManager getDatasetLifecycleManager();
- IDatasetMemoryManager getDatasetMemoryManager();
-
IResourceIdFactory getResourceIdFactory();
void initialize(IRecoveryManagerFactory recoveryManagerFactory, IReceptionistFactory receptionistFactory,
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/StorageProperties.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/StorageProperties.java
index 58bc828..958d5ae 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/StorageProperties.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/StorageProperties.java
@@ -47,9 +47,9 @@
STORAGE_MEMORYCOMPONENT_GLOBALBUDGET(LONG_BYTE_UNIT, Runtime.getRuntime().maxMemory() / 4),
STORAGE_MEMORYCOMPONENT_PAGESIZE(INTEGER_BYTE_UNIT, StorageUtil.getIntSizeInBytes(128, KILOBYTE)),
STORAGE_MEMORYCOMPONENT_NUMCOMPONENTS(POSITIVE_INTEGER, 2),
- STORAGE_METADATA_MEMORYCOMPONENT_NUMPAGES(POSITIVE_INTEGER, 8),
+ STORAGE_MEMORYCOMPONENT_FLUSH_THRESHOLD(DOUBLE, 0.9d),
+ STORAGE_FILTERED_MEMORYCOMPONENT_MAX_SIZE(LONG_BYTE_UNIT, 0L),
STORAGE_LSM_BLOOMFILTER_FALSEPOSITIVERATE(DOUBLE, 0.01d),
- STORAGE_MAX_ACTIVE_WRITABLE_DATASETS(UNSIGNED_INTEGER, 8),
STORAGE_COMPRESSION_BLOCK(STRING, "snappy"),
STORAGE_DISK_FORCE_BYTES(LONG_BYTE_UNIT, StorageUtil.getLongSizeInBytes(16, MEGABYTE)),
STORAGE_IO_SCHEDULER(STRING, "greedy");
@@ -64,9 +64,6 @@
@Override
public Section section() {
- if (this == STORAGE_MAX_ACTIVE_WRITABLE_DATASETS) {
- return Section.COMMON;
- }
return Section.NC;
}
@@ -87,12 +84,13 @@
return "The page size in bytes for pages allocated to memory components";
case STORAGE_MEMORYCOMPONENT_NUMCOMPONENTS:
return "The number of memory components to be used per lsm index";
- case STORAGE_METADATA_MEMORYCOMPONENT_NUMPAGES:
- return "The number of pages to allocate for a metadata memory component";
+ case STORAGE_MEMORYCOMPONENT_FLUSH_THRESHOLD:
+ return "The memory usage threshold when memory components should be flushed";
+ case STORAGE_FILTERED_MEMORYCOMPONENT_MAX_SIZE:
+ return "The maximum size of a filtered memory component. 0 means that the memory component "
+ + "does not have a maximum size";
case STORAGE_LSM_BLOOMFILTER_FALSEPOSITIVERATE:
return "The maximum acceptable false positive rate for bloom filters associated with LSM indexes";
- case STORAGE_MAX_ACTIVE_WRITABLE_DATASETS:
- return "The maximum number of datasets that can be concurrently modified";
case STORAGE_COMPRESSION_BLOCK:
return "The default compression scheme for the storage";
case STORAGE_DISK_FORCE_BYTES:
@@ -116,9 +114,6 @@
@Override
public String usageDefaultOverride(IApplicationConfig accessor, Function<IOption, String> optionPrinter) {
- if (this == STORAGE_METADATA_MEMORYCOMPONENT_NUMPAGES) {
- return "8 pages";
- }
return null;
}
}
@@ -145,16 +140,13 @@
return accessor.getInt(Option.STORAGE_MEMORYCOMPONENT_PAGESIZE);
}
- public int getMemoryComponentNumPages() {
- final long metadataReservedMem = getMetadataReservedMemory();
- final long globalUserDatasetMem = getMemoryComponentGlobalBudget() - metadataReservedMem;
- final long userDatasetMem =
- globalUserDatasetMem / (getMaxActiveWritableDatasets() + geSystemReservedDatasets());
- return (int) (userDatasetMem / getMemoryComponentPageSize());
+ public double getMemoryComponentFlushThreshold() {
+ return accessor.getDouble(Option.STORAGE_MEMORYCOMPONENT_FLUSH_THRESHOLD);
}
- public int getMetadataMemoryComponentNumPages() {
- return accessor.getInt(Option.STORAGE_METADATA_MEMORYCOMPONENT_NUMPAGES);
+ public int getFilteredMemoryComponentMaxNumPages() {
+ return (int) (accessor.getLong(Option.STORAGE_FILTERED_MEMORYCOMPONENT_MAX_SIZE)
+ / getMemoryComponentPageSize());
}
public int getMemoryComponentsNum() {
@@ -186,10 +178,6 @@
return jobExecutionMemory;
}
- public int getMaxActiveWritableDatasets() {
- return accessor.getInt(Option.STORAGE_MAX_ACTIVE_WRITABLE_DATASETS);
- }
-
public String getCompressionScheme() {
return accessor.getString(Option.STORAGE_COMPRESSION_BLOCK);
}
@@ -206,10 +194,6 @@
return SYSTEM_RESERVED_DATASETS;
}
- private long getMetadataReservedMemory() {
- return (getMetadataMemoryComponentNumPages() * (long) getMemoryComponentPageSize()) * getMetadataDatasets();
- }
-
public int getDiskForcePages() {
return (int) (accessor.getLong(Option.STORAGE_DISK_FORCE_BYTES) / getBufferCachePageSize());
}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetInfo.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetInfo.java
index 4ccb0cc..3fcc528 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetInfo.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetInfo.java
@@ -45,7 +45,6 @@
private long lastAccess;
private boolean isExternal;
private boolean isRegistered;
- private boolean memoryAllocated;
private boolean durable;
public DatasetInfo(int datasetID, ILogManager logManager) {
@@ -54,7 +53,6 @@
this.setLastAccess(-1);
this.datasetID = datasetID;
this.setRegistered(false);
- this.setMemoryAllocated(false);
this.logManager = logManager;
waitLog.setLogType(LogType.WAIT_FOR_FLUSHES);
waitLog.computeAndSetLogSize();
@@ -144,8 +142,8 @@
@Override
public String toString() {
return "DatasetID: " + getDatasetID() + ", isOpen: " + isOpen() + ", refCount: " + getReferenceCount()
- + ", lastAccess: " + getLastAccess() + ", isRegistered: " + isRegistered() + ", memoryAllocated: "
- + isMemoryAllocated() + ", isDurable: " + isDurable();
+ + ", lastAccess: " + getLastAccess() + ", isRegistered: " + isRegistered() + ", isDurable: "
+ + isDurable();
}
public boolean isDurable() {
@@ -192,14 +190,6 @@
return datasetID;
}
- public boolean isMemoryAllocated() {
- return memoryAllocated;
- }
-
- public void setMemoryAllocated(boolean memoryAllocated) {
- this.memoryAllocated = memoryAllocated;
- }
-
public long getLastAccess() {
return lastAccess;
}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
index d396d9b..0750749 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
@@ -18,25 +18,21 @@
*/
package org.apache.asterix.common.context;
-import static org.apache.asterix.common.metadata.MetadataIndexImmutableProperties.METADATA_DATASETS_PARTITIONS;
import static org.apache.hyracks.storage.am.lsm.common.impls.LSMComponentId.MIN_VALID_COMPONENT_ID;
import java.io.IOException;
import java.io.OutputStream;
import java.util.ArrayList;
-import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Predicate;
import org.apache.asterix.common.api.IDatasetLifecycleManager;
-import org.apache.asterix.common.api.IDatasetMemoryManager;
import org.apache.asterix.common.config.StorageProperties;
import org.apache.asterix.common.dataflow.DatasetLocalResource;
import org.apache.asterix.common.dataflow.LSMIndexUtil;
import org.apache.asterix.common.ioopcallbacks.LSMIOOperationCallback;
-import org.apache.asterix.common.metadata.MetadataIndexImmutableProperties;
import org.apache.asterix.common.replication.IReplicationStrategy;
import org.apache.asterix.common.storage.DatasetResourceReference;
import org.apache.asterix.common.storage.IIndexCheckpointManager;
@@ -67,22 +63,27 @@
private final Map<Integer, DatasetResource> datasets = new ConcurrentHashMap<>();
private final StorageProperties storageProperties;
private final ILocalResourceRepository resourceRepository;
- private final IDatasetMemoryManager memoryManager;
+ private final IVirtualBufferCache vbc;
private final ILogManager logManager;
private final LogRecord waitLog;
- private final int numPartitions;
private volatile boolean stopped = false;
private final IIndexCheckpointManagerProvider indexCheckpointManagerProvider;
+ // all LSM-trees share the same virtual buffer cache list
+ private final List<IVirtualBufferCache> vbcs;
public DatasetLifecycleManager(StorageProperties storageProperties, ILocalResourceRepository resourceRepository,
- ILogManager logManager, IDatasetMemoryManager memoryManager,
+ ILogManager logManager, IVirtualBufferCache vbc,
IIndexCheckpointManagerProvider indexCheckpointManagerProvider, int numPartitions) {
this.logManager = logManager;
this.storageProperties = storageProperties;
this.resourceRepository = resourceRepository;
- this.memoryManager = memoryManager;
+ this.vbc = vbc;
+ int numMemoryComponents = storageProperties.getMemoryComponentsNum();
+ this.vbcs = new ArrayList<>(numMemoryComponents);
+ for (int i = 0; i < numMemoryComponents; i++) {
+ vbcs.add(vbc);
+ }
this.indexCheckpointManagerProvider = indexCheckpointManagerProvider;
- this.numPartitions = numPartitions;
waitLog = new LogRecord();
waitLog.setLogType(LogType.WAIT_FOR_FLUSHES);
waitLog.computeAndSetLogSize();
@@ -204,38 +205,6 @@
iInfo.touch();
}
- private boolean evictCandidateDataset() throws HyracksDataException {
- /**
- * We will take a dataset that has no active transactions, it is open (a dataset consuming memory),
- * that is not being used (refcount == 0) and has been least recently used, excluding metadata datasets.
- * The sort order defined for DatasetInfo maintains this. See DatasetInfo.compareTo().
- */
- List<DatasetResource> datasetsResources = new ArrayList<>(datasets.values());
- Collections.sort(datasetsResources);
- for (DatasetResource dsr : datasetsResources) {
- if (isCandidateDatasetForEviction(dsr)) {
- closeDataset(dsr);
- LOGGER.info(() -> "Evicted Dataset" + dsr.getDatasetID());
- return true;
- }
- }
- return false;
- }
-
- private boolean isCandidateDatasetForEviction(DatasetResource dsr) {
- for (PrimaryIndexOperationTracker opTracker : dsr.getOpTrackers()) {
- if (opTracker.getNumActiveOperations() != 0) {
- return false;
- }
- }
- if (dsr.getDatasetInfo().getReferenceCount() != 0 || !dsr.getDatasetInfo().isOpen()
- || dsr.isMetadataDataset()) {
- return false;
- }
-
- return true;
- }
-
public DatasetResource getDatasetLifecycle(int did) {
DatasetResource dsr = datasets.get(did);
if (dsr != null) {
@@ -245,11 +214,7 @@
dsr = datasets.get(did);
if (dsr == null) {
DatasetInfo dsInfo = new DatasetInfo(did, logManager);
- int partitions = MetadataIndexImmutableProperties.isMetadataDataset(did) ? METADATA_DATASETS_PARTITIONS
- : numPartitions;
- DatasetVirtualBufferCaches vbcs = new DatasetVirtualBufferCaches(did, storageProperties,
- memoryManager.getNumPages(did), partitions);
- dsr = new DatasetResource(dsInfo, vbcs);
+ dsr = new DatasetResource(dsInfo);
datasets.put(did, dsr);
}
return dsr;
@@ -312,24 +277,18 @@
return openIndexesInfo;
}
- private DatasetVirtualBufferCaches getVirtualBufferCaches(int datasetID) {
- return getDatasetLifecycle(datasetID).getVirtualBufferCaches();
- }
-
@Override
public List<IVirtualBufferCache> getVirtualBufferCaches(int datasetID, int ioDeviceNum) {
- DatasetVirtualBufferCaches dvbcs = getVirtualBufferCaches(datasetID);
- return dvbcs.getVirtualBufferCaches(this, ioDeviceNum);
+ return vbcs;
}
private void removeDatasetFromCache(int datasetID) throws HyracksDataException {
- deallocateDatasetMemory(datasetID);
datasets.remove(datasetID);
}
@Override
public synchronized PrimaryIndexOperationTracker getOperationTracker(int datasetId, int partition, String path) {
- DatasetResource dataset = datasets.get(datasetId);
+ DatasetResource dataset = getDatasetLifecycle(datasetId);
PrimaryIndexOperationTracker opTracker = dataset.getOpTracker(partition);
if (opTracker == null) {
populateOpTrackerAndIdGenerator(dataset, partition, path);
@@ -513,7 +472,9 @@
StringBuilder sb = new StringBuilder();
sb.append(String.format("Memory budget = %d%n", storageProperties.getMemoryComponentGlobalBudget()));
- sb.append(String.format("Memory available = %d%n", memoryManager.getAvailable()));
+ long avaialbleMemory = storageProperties.getMemoryComponentGlobalBudget()
+ - (long) vbc.getUsage() * storageProperties.getMemoryComponentPageSize();
+ sb.append(String.format("Memory available = %d%n", avaialbleMemory));
sb.append("\n");
String dsHeaderFormat = "%-10s %-6s %-16s %-12s\n";
@@ -540,51 +501,6 @@
outputStream.write(sb.toString().getBytes());
}
- private synchronized void deallocateDatasetMemory(int datasetId) throws HyracksDataException {
- DatasetResource dsr = datasets.get(datasetId);
- if (dsr == null) {
- throw new HyracksDataException(
- "Failed to allocate memory for dataset with ID " + datasetId + " since it is not open.");
- }
- DatasetInfo dsInfo = dsr.getDatasetInfo();
- if (dsInfo == null) {
- throw new HyracksDataException(
- "Failed to deallocate memory for dataset with ID " + datasetId + " since it is not open.");
- }
- synchronized (dsInfo) {
- if (dsInfo.isOpen() && dsInfo.isMemoryAllocated()) {
- memoryManager.deallocate(datasetId);
- dsInfo.setMemoryAllocated(false);
- }
- }
- }
-
- @Override
- public synchronized void allocateMemory(String resourcePath) throws HyracksDataException {
- //a resource name in the case of DatasetLifecycleManager is a dataset id which is passed to the ResourceHeapBufferAllocator.
- int datasetId = Integer.parseInt(resourcePath);
- DatasetResource dsr = datasets.get(datasetId);
- if (dsr == null) {
- throw new HyracksDataException(
- "Failed to allocate memory for dataset with ID " + datasetId + " since it is not open.");
- }
- DatasetInfo dsInfo = dsr.getDatasetInfo();
- synchronized (dsInfo) {
- // This is not needed for external datasets' indexes since they never use the virtual buffer cache.
- if (!dsInfo.isMemoryAllocated() && !dsInfo.isExternal()) {
- while (!memoryManager.allocate(datasetId)) {
- if (!evictCandidateDataset()) {
- LOGGER.warn("failed to allocate memory for dataset {}. Currently allocated {}",
- dsInfo::getDatasetID, ((DatasetMemoryManager) memoryManager)::getState);
- throw new HyracksDataException("Cannot allocate dataset " + dsInfo.getDatasetID()
- + " memory since memory budget would be exceeded.");
- }
- }
- dsInfo.setMemoryAllocated(true);
- }
- }
- }
-
@Override
public void flushDataset(IReplicationStrategy replicationStrategy) throws HyracksDataException {
for (DatasetResource dsr : datasets.values()) {
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetMemoryManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetMemoryManager.java
deleted file mode 100644
index ded6fb9..0000000
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetMemoryManager.java
+++ /dev/null
@@ -1,154 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.common.context;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.asterix.common.api.IDatasetMemoryManager;
-import org.apache.asterix.common.config.StorageProperties;
-import org.apache.asterix.common.metadata.MetadataIndexImmutableProperties;
-import org.apache.hyracks.util.JSONUtil;
-import org.apache.hyracks.util.annotations.ThreadSafe;
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
-
-import com.fasterxml.jackson.databind.JsonNode;
-import com.fasterxml.jackson.databind.node.ArrayNode;
-import com.fasterxml.jackson.databind.node.ObjectNode;
-
-@ThreadSafe
-public class DatasetMemoryManager implements IDatasetMemoryManager {
-
- private static final Logger LOGGER = LogManager.getLogger();
- private final Map<Integer, Long> allocatedMap = new HashMap<>();
- private final Map<Integer, Long> reservedMap = new HashMap<>();
- private long available;
- private final StorageProperties storageProperties;
-
- public DatasetMemoryManager(StorageProperties storageProperties) {
- this.storageProperties = storageProperties;
- available = storageProperties.getMemoryComponentGlobalBudget();
- }
-
- @Override
- public synchronized boolean allocate(int datasetId) {
- if (allocatedMap.containsKey(datasetId)) {
- throw new IllegalStateException("Memory is already allocated for dataset: " + datasetId);
- }
- if (reservedMap.containsKey(datasetId)) {
- allocateReserved(datasetId);
- return true;
- }
- final long required = getTotalSize(datasetId);
- if (!isAllocatable(required)) {
- return false;
- }
- allocatedMap.put(datasetId, required);
- available -= required;
- LOGGER.info(() -> "Allocated(" + required + ") for dataset(" + datasetId + ")");
- return true;
- }
-
- @Override
- public synchronized void deallocate(int datasetId) {
- if (!allocatedMap.containsKey(datasetId) && !reservedMap.containsKey(datasetId)) {
- throw new IllegalStateException("No allocated or reserved memory for dataset: " + datasetId);
- }
- final Long allocated = allocatedMap.remove(datasetId);
- // return the allocated budget if it is not reserved
- if (allocated != null && !reservedMap.containsKey(datasetId)) {
- available += allocated;
- LOGGER.info(() -> "Deallocated(" + allocated + ") from dataset(" + datasetId + ")");
- }
- }
-
- @Override
- public synchronized boolean reserve(int datasetId) {
- if (reservedMap.containsKey(datasetId)) {
- LOGGER.info("Memory is already reserved for dataset: {}", () -> datasetId);
- return true;
- }
- final long required = getTotalSize(datasetId);
- if (!isAllocatable(required) && !allocatedMap.containsKey(datasetId)) {
- return false;
- }
- reservedMap.put(datasetId, required);
- // if the budget is already allocated, no need to reserve it again
- if (!allocatedMap.containsKey(datasetId)) {
- available -= required;
- }
- LOGGER.info(() -> "Reserved(" + required + ") for dataset(" + datasetId + ")");
- return true;
- }
-
- @Override
- public synchronized void cancelReserved(int datasetId) {
- final Long reserved = reservedMap.remove(datasetId);
- if (reserved == null) {
- throw new IllegalStateException("No reserved memory for dataset: " + datasetId);
- }
- available += reserved;
- LOGGER.info(() -> "Cancelled reserved(" + reserved + ") from dataset(" + datasetId + ")");
- }
-
- @Override
- public long getAvailable() {
- return available;
- }
-
- @Override
- public int getNumPages(int datasetId) {
- return MetadataIndexImmutableProperties.isMetadataDataset(datasetId)
- ? storageProperties.getMetadataMemoryComponentNumPages()
- : storageProperties.getMemoryComponentNumPages();
- }
-
- public JsonNode getState() {
- final ObjectNode state = JSONUtil.createObject();
- state.put("availableBudget", available);
- state.set("allocated", budgetMapToJsonArray(allocatedMap));
- state.set("reserved", budgetMapToJsonArray(reservedMap));
- return state;
- }
-
- private long getTotalSize(int datasetId) {
- return storageProperties.getMemoryComponentPageSize() * (long) getNumPages(datasetId);
- }
-
- private boolean isAllocatable(long required) {
- return available - required >= 0;
- }
-
- private void allocateReserved(int datasetId) {
- final Long reserved = reservedMap.get(datasetId);
- allocatedMap.put(datasetId, reserved);
- }
-
- private static ArrayNode budgetMapToJsonArray(Map<Integer, Long> memorytMap) {
- final ArrayNode array = JSONUtil.createArray();
- memorytMap.forEach((k, v) -> {
- final ObjectNode dataset = JSONUtil.createObject();
- dataset.put("datasetId", k);
- dataset.put("budget", v);
- array.add(dataset);
- });
- return array;
- }
-}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetResource.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetResource.java
index 8dcae23..8844d41 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetResource.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetResource.java
@@ -43,14 +43,12 @@
*/
public class DatasetResource implements Comparable<DatasetResource> {
private final DatasetInfo datasetInfo;
- private final DatasetVirtualBufferCaches datasetVirtualBufferCaches;
private final Map<Integer, PrimaryIndexOperationTracker> datasetPrimaryOpTrackers;
private final Map<Integer, ILSMComponentIdGenerator> datasetComponentIdGenerators;
- public DatasetResource(DatasetInfo datasetInfo, DatasetVirtualBufferCaches datasetVirtualBufferCaches) {
+ public DatasetResource(DatasetInfo datasetInfo) {
this.datasetInfo = datasetInfo;
- this.datasetVirtualBufferCaches = datasetVirtualBufferCaches;
this.datasetPrimaryOpTrackers = new HashMap<>();
this.datasetComponentIdGenerators = new HashMap<>();
}
@@ -83,10 +81,6 @@
datasetInfo.untouch();
}
- public DatasetVirtualBufferCaches getVirtualBufferCaches() {
- return datasetVirtualBufferCaches;
- }
-
public ILSMIndex getIndex(long resourceID) {
IndexInfo iInfo = getIndexInfo(resourceID);
return (iInfo == null) ? null : iInfo.getIndex();
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetVirtualBufferCaches.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetVirtualBufferCaches.java
deleted file mode 100644
index c9b9698..0000000
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetVirtualBufferCaches.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.common.context;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.asterix.common.config.StorageProperties;
-import org.apache.hyracks.storage.am.lsm.common.api.IVirtualBufferCache;
-import org.apache.hyracks.storage.am.lsm.common.impls.MultitenantVirtualBufferCache;
-import org.apache.hyracks.storage.am.lsm.common.impls.VirtualBufferCache;
-import org.apache.hyracks.storage.common.IResourceMemoryManager;
-import org.apache.hyracks.storage.common.buffercache.ResourceHeapBufferAllocator;
-
-public class DatasetVirtualBufferCaches {
- private final int datasetID;
- private final StorageProperties storageProperties;
- private final int numPartitions;
- private final int numPages;
- private final Map<Integer, List<IVirtualBufferCache>> ioDeviceVirtualBufferCaches = new HashMap<>();
-
- public DatasetVirtualBufferCaches(int datasetID, StorageProperties storageProperties, int numPages,
- int numPartitions) {
- this.datasetID = datasetID;
- this.storageProperties = storageProperties;
- this.numPartitions = numPartitions;
- this.numPages = numPages;
- }
-
- public List<IVirtualBufferCache> getVirtualBufferCaches(IResourceMemoryManager memoryManager, int ioDeviceNum) {
- synchronized (ioDeviceVirtualBufferCaches) {
- List<IVirtualBufferCache> vbcs = ioDeviceVirtualBufferCaches.get(ioDeviceNum);
- if (vbcs == null) {
- vbcs = initializeVirtualBufferCaches(memoryManager, ioDeviceNum, numPages);
- }
- return vbcs;
- }
- }
-
- private List<IVirtualBufferCache> initializeVirtualBufferCaches(IResourceMemoryManager memoryManager,
- int ioDeviceNum, int numPages) {
- List<IVirtualBufferCache> vbcs = new ArrayList<>();
- for (int i = 0; i < storageProperties.getMemoryComponentsNum(); i++) {
- MultitenantVirtualBufferCache vbc = new MultitenantVirtualBufferCache(
- new VirtualBufferCache(new ResourceHeapBufferAllocator(memoryManager, Integer.toString(datasetID)),
- storageProperties.getMemoryComponentPageSize(),
- numPages / storageProperties.getMemoryComponentsNum() / numPartitions));
- vbcs.add(vbc);
- }
- ioDeviceVirtualBufferCaches.put(ioDeviceNum, vbcs);
- return vbcs;
- }
-}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/GlobalVirtualBufferCache.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/GlobalVirtualBufferCache.java
new file mode 100644
index 0000000..6e97d64
--- /dev/null
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/GlobalVirtualBufferCache.java
@@ -0,0 +1,485 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.context;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.asterix.common.config.StorageProperties;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.io.FileReference;
+import org.apache.hyracks.api.lifecycle.ILifeCycleComponent;
+import org.apache.hyracks.api.replication.IIOReplicationManager;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMMemoryComponent;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMOperationTracker;
+import org.apache.hyracks.storage.am.lsm.common.api.IVirtualBufferCache;
+import org.apache.hyracks.storage.am.lsm.common.impls.VirtualBufferCache;
+import org.apache.hyracks.storage.common.buffercache.ICacheMemoryAllocator;
+import org.apache.hyracks.storage.common.buffercache.ICachedPage;
+import org.apache.hyracks.storage.common.buffercache.IExtraPageBlockHelper;
+import org.apache.hyracks.storage.common.buffercache.IFIFOPageWriter;
+import org.apache.hyracks.storage.common.buffercache.IPageWriteCallback;
+import org.apache.hyracks.storage.common.buffercache.IPageWriteFailureCallback;
+import org.apache.hyracks.storage.common.buffercache.VirtualPage;
+import org.apache.hyracks.storage.common.file.BufferedFileHandle;
+import org.apache.hyracks.storage.common.file.IFileMapManager;
+import org.apache.hyracks.util.ExitUtil;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
+import it.unimi.dsi.fastutil.ints.Int2ObjectMaps;
+import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
+
+public class GlobalVirtualBufferCache implements IVirtualBufferCache, ILifeCycleComponent {
+ private static final Logger LOGGER = LogManager.getLogger();
+
+ // keep track of the memory usage of each filtered memory component
+ private final Map<ILSMMemoryComponent, AtomicInteger> memoryComponentUsageMap =
+ Collections.synchronizedMap(new HashMap<>());
+ private final Map<FileReference, AtomicInteger> fileRefUsageMap = Collections.synchronizedMap(new HashMap<>());
+ private final Int2ObjectMap<AtomicInteger> fileIdUsageMap =
+ Int2ObjectMaps.synchronize(new Int2ObjectOpenHashMap<>());
+
+ private final List<ILSMIndex> primaryIndexes = new ArrayList<>();
+ private volatile int flushPtr;
+ private volatile ILSMIndex flushingIndex;
+
+ private final int filteredMemoryComponentMaxNumPages;
+ private final int flushPageBudget;
+ private final VirtualBufferCache vbc;
+ private final AtomicBoolean isOpen = new AtomicBoolean(false);
+ private final FlushThread flushThread = new FlushThread();
+
+ public GlobalVirtualBufferCache(ICacheMemoryAllocator allocator, StorageProperties storageProperties) {
+ this.vbc = new VirtualBufferCache(allocator, storageProperties.getBufferCachePageSize(),
+ (int) (storageProperties.getMemoryComponentGlobalBudget()
+ / storageProperties.getMemoryComponentPageSize()));
+ this.flushPageBudget = (int) (storageProperties.getMemoryComponentGlobalBudget()
+ / storageProperties.getMemoryComponentPageSize()
+ * storageProperties.getMemoryComponentFlushThreshold());
+ this.filteredMemoryComponentMaxNumPages = storageProperties.getFilteredMemoryComponentMaxNumPages();
+ }
+
+ @Override
+ public int getPageSize() {
+ return vbc.getPageSize();
+ }
+
+ @Override
+ public int getPageSizeWithHeader() {
+ return vbc.getPageSizeWithHeader();
+ }
+
+ @Override
+ public synchronized void register(ILSMMemoryComponent memoryComponent) {
+ ILSMIndex index = memoryComponent.getLsmIndex();
+ if (index.isPrimaryIndex()) {
+ if (!primaryIndexes.contains(index)) {
+ // make sure only add index once
+ primaryIndexes.add(index);
+ }
+ if (index.getNumOfFilterFields() > 0) {
+ // handle filtered primary index
+ AtomicInteger usage = new AtomicInteger();
+ memoryComponentUsageMap.put(memoryComponent, usage);
+ for (FileReference ref : memoryComponent.getComponentFileRefs().getFileReferences()) {
+ if (ref != null) {
+ fileRefUsageMap.put(ref, usage);
+ }
+ }
+ }
+ }
+ }
+
+ @Override
+ public synchronized void unregister(ILSMMemoryComponent memoryComponent) {
+ ILSMIndex index = memoryComponent.getLsmIndex();
+ if (index.isPrimaryIndex()) {
+ int pos = primaryIndexes.indexOf(index);
+ if (pos >= 0) {
+ primaryIndexes.remove(index);
+ if (flushPtr > pos) {
+ // If the removed index is before flushPtr, we should decrement flushPtr by 1 so that
+ // it still points to the same index.
+ flushPtr = (flushPtr - 1) % primaryIndexes.size();
+ }
+ }
+ if (index.getNumOfFilterFields() > 0) {
+ memoryComponentUsageMap.remove(memoryComponent);
+ for (FileReference ref : memoryComponent.getComponentFileRefs().getFileReferences()) {
+ if (ref != null) {
+ fileRefUsageMap.remove(ref);
+ }
+ }
+ }
+ }
+ }
+
+ @Override
+ public void flushed(ILSMMemoryComponent memoryComponent) throws HyracksDataException {
+ if (memoryComponent.getLsmIndex() == flushingIndex) {
+ synchronized (this) {
+ if (memoryComponent.getLsmIndex() == flushingIndex) {
+ flushingIndex = null;
+ // After the flush operation is completed, we may have 2 cases:
+ // 1. there is no active reader on this memory component and memory is reclaimed;
+ // 2. there are still some active readers and memory cannot be reclaimed.
+ // But for both cases, we will notify all primary index op trackers to let their writers retry,
+ // if they have been blocked. Moreover, we will check whether more flushes are needed.
+ final int size = primaryIndexes.size();
+ for (int i = 0; i < size; i++) {
+ ILSMOperationTracker opTracker = primaryIndexes.get(i).getOperationTracker();
+ synchronized (opTracker) {
+ opTracker.notifyAll();
+ }
+ }
+ }
+ }
+ checkAndNotifyFlushThread();
+ }
+ if (memoryComponent.getLsmIndex().getNumOfFilterFields() > 0
+ && memoryComponent.getLsmIndex().isPrimaryIndex()) {
+ AtomicInteger usage = memoryComponentUsageMap.get(memoryComponent);
+ if (usage != null) {
+ // reset usage to 0 after the memory component is flushed
+ usage.set(0);
+ }
+ }
+ }
+
+ @Override
+ public int getPageBudget() {
+ return vbc.getPageBudget();
+ }
+
+ @Override
+ public boolean isFull() {
+ return vbc.isFull();
+ }
+
+ @Override
+ public boolean isFull(ILSMMemoryComponent memoryComponent) {
+ return memoryComponent.getLsmIndex() == flushingIndex || isFilteredMemoryComponentFull(memoryComponent);
+ }
+
+ private boolean isFilteredMemoryComponentFull(ILSMMemoryComponent memoryComponent) {
+ if (filteredMemoryComponentMaxNumPages <= 0 || memoryComponent.getLsmIndex().getNumOfFilterFields() == 0
+ || !memoryComponent.getLsmIndex().isPrimaryIndex()) {
+ return false;
+ }
+ AtomicInteger usage = memoryComponentUsageMap.get(memoryComponent);
+ return usage.get() >= filteredMemoryComponentMaxNumPages;
+ }
+
+ @Override
+ public int createFile(FileReference fileRef) throws HyracksDataException {
+ int fileId = vbc.createFile(fileRef);
+ updateFileIdUsageMap(fileRef, fileId);
+ return fileId;
+ }
+
+ @Override
+ public int openFile(FileReference fileRef) throws HyracksDataException {
+ int fileId = vbc.openFile(fileRef);
+ updateFileIdUsageMap(fileRef, fileId);
+ return fileId;
+ }
+
+ private void updateFileIdUsageMap(FileReference fileRef, int fileId) {
+ AtomicInteger usage = fileRefUsageMap.get(fileRef);
+ if (usage != null) {
+ fileIdUsageMap.put(fileId, usage);
+ }
+ }
+
+ @Override
+ public void openFile(int fileId) throws HyracksDataException {
+ vbc.openFile(fileId);
+ }
+
+ @Override
+ public void closeFile(int fileId) throws HyracksDataException {
+ vbc.closeFile(fileId);
+ }
+
+ @Override
+ public void deleteFile(FileReference fileRef) throws HyracksDataException {
+ vbc.deleteFile(fileRef);
+ }
+
+ @Override
+ public void deleteFile(int fileId) throws HyracksDataException {
+ vbc.deleteFile(fileId);
+ }
+
+ @Override
+ public ICachedPage pin(long dpid, boolean newPage) throws HyracksDataException {
+ ICachedPage page = vbc.pin(dpid, newPage);
+ if (newPage) {
+ incrementFilteredMemoryComponentUsage(dpid, 1);
+ checkAndNotifyFlushThread();
+ }
+ return page;
+ }
+
+ private void incrementFilteredMemoryComponentUsage(long dpid, int pages) {
+ if (filteredMemoryComponentMaxNumPages > 0) {
+ // update memory usage of filtered index
+ AtomicInteger usage = fileIdUsageMap.get(BufferedFileHandle.getFileId(dpid));
+ if (usage != null) {
+ usage.addAndGet(pages);
+ // We do not need extra code to flush this filtered memory component when it becomes full.
+ // This method is only called when there are active writers on this memory component.
+ // When the writer exits, it'll automatically flush this memory component when it finds out
+ // that this memory component becomes full.
+ }
+ }
+ }
+
+ private void checkAndNotifyFlushThread() {
+ if (vbc.getUsage() < flushPageBudget || flushingIndex != null) {
+ // For better performance, we only flush one dataset partition at a time.
+ // After reclaiming memory from this dataset partition, its memory can be used by other indexes.
+ // Thus, given N dataset partitions, each dataset partition will approximately receive 2/N of
+ // the total memory instead of 1/N, which doubles the memory utilization.
+ return;
+ }
+ // Notify the flush thread to schedule flushes. This is used to avoid deadlocks because page pins can be
+ // called while synchronizing on op trackers.
+ synchronized (flushThread.flushLock) {
+ flushThread.flushLock.notifyAll();
+ }
+ }
+
+ @Override
+ public void resizePage(ICachedPage cPage, int multiplier, IExtraPageBlockHelper extraPageBlockHelper)
+ throws HyracksDataException {
+ vbc.resizePage(cPage, multiplier, extraPageBlockHelper);
+ int delta = multiplier - cPage.getFrameSizeMultiplier();
+ incrementFilteredMemoryComponentUsage(((VirtualPage) cPage).dpid(), delta);
+ if (delta > 0) {
+ checkAndNotifyFlushThread();
+ }
+ }
+
+ @Override
+ public void unpin(ICachedPage page) throws HyracksDataException {
+ vbc.unpin(page);
+ }
+
+ @Override
+ public void flush(ICachedPage page) throws HyracksDataException {
+ vbc.flush(page);
+ }
+
+ @Override
+ public void force(int fileId, boolean metadata) throws HyracksDataException {
+ vbc.force(fileId, metadata);
+ }
+
+ @Override
+ public void open() throws HyracksDataException {
+
+ }
+
+ @Override
+ public void close() throws HyracksDataException {
+ // no op
+ }
+
+ @Override
+ public void start() {
+ if (isOpen.compareAndSet(false, true)) {
+ try {
+ vbc.open();
+ } catch (HyracksDataException e) {
+ throw new IllegalStateException("Fail to open virtual buffer cache ", e);
+ }
+ flushThread.start();
+ }
+ }
+
+ @Override
+ public void stop(boolean dumpState, OutputStream ouputStream) throws IOException {
+ if (isOpen.compareAndSet(true, false)) {
+ if (dumpState) {
+ dumpState(ouputStream);
+ }
+ vbc.close();
+ synchronized (flushThread.flushLock) {
+ flushThread.flushLock.notifyAll();
+ }
+ try {
+ flushThread.join();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw HyracksDataException.create(e);
+ }
+ }
+ }
+
+ @Override
+ public void dumpState(OutputStream os) throws IOException {
+ os.write(vbc.toString().getBytes());
+ }
+
+ @Override
+ public IFileMapManager getFileMapProvider() {
+ return vbc.getFileMapProvider();
+ }
+
+ @Override
+ public int getNumPagesOfFile(int fileId) throws HyracksDataException {
+ return vbc.getNumPagesOfFile(fileId);
+ }
+
+ @Override
+ public void returnPage(ICachedPage page) {
+ vbc.returnPage(page);
+ }
+
+ @Override
+ public IFIFOPageWriter createFIFOWriter(IPageWriteCallback callback, IPageWriteFailureCallback failureCallback) {
+ return vbc.createFIFOWriter(callback, failureCallback);
+ }
+
+ @Override
+ public ICachedPage confiscatePage(long dpid) throws HyracksDataException {
+ return vbc.confiscatePage(dpid);
+ }
+
+ @Override
+ public ICachedPage confiscateLargePage(long dpid, int multiplier, int extraBlockPageId)
+ throws HyracksDataException {
+ return vbc.confiscateLargePage(dpid, multiplier, extraBlockPageId);
+ }
+
+ @Override
+ public void returnPage(ICachedPage page, boolean reinsert) {
+ vbc.returnPage(page, reinsert);
+ }
+
+ @Override
+ public int getFileReferenceCount(int fileId) {
+ return vbc.getFileReferenceCount(fileId);
+ }
+
+ @Override
+ public boolean isReplicationEnabled() {
+ return vbc.isReplicationEnabled();
+ }
+
+ @Override
+ public IIOReplicationManager getIOReplicationManager() {
+ return vbc.getIOReplicationManager();
+ }
+
+ @Override
+ public void purgeHandle(int fileId) throws HyracksDataException {
+ vbc.purgeHandle(fileId);
+ }
+
+ @Override
+ public String toString() {
+ return vbc.toString();
+ }
+
+ @Override
+ public void closeFileIfOpen(FileReference fileRef) {
+ vbc.closeFileIfOpen(fileRef);
+ }
+
+ @Override
+ public int getUsage() {
+ return vbc.getUsage();
+ }
+
+ /**
+ * We use a dedicated thread to schedule flushes to avoid deadlock. We cannot schedule flushes directly during
+ * page pins because page pins can be called while synchronized on op trackers (e.g., when resetting a
+ * memory component).
+ */
+ private class FlushThread extends Thread {
+ private final Object flushLock = new Object();
+
+ @Override
+ public void run() {
+ while (isOpen.get()) {
+ synchronized (flushLock) {
+ try {
+ flushLock.wait();
+ } catch (InterruptedException e) {
+ LOGGER.error("Flushing thread is interrupted unexpectedly.", e);
+ }
+ }
+ if (isOpen.get()) {
+ try {
+ scheduleFlush();
+ } catch (Throwable e) {
+ LOGGER.error("Unexpected exception when trying to schedule flushes.", e);
+ ExitUtil.halt(ExitUtil.EC_FLUSH_FAILED);
+ }
+ }
+ }
+ }
+
+ private void scheduleFlush() throws HyracksDataException {
+ synchronized (GlobalVirtualBufferCache.this) {
+ if (vbc.getUsage() < flushPageBudget || flushingIndex != null) {
+ return;
+ }
+ int cycles = 0;
+ // find the first modified memory component while avoiding infinite loops
+ while (cycles <= primaryIndexes.size()
+ && !primaryIndexes.get(flushPtr).getCurrentMemoryComponent().isModified()) {
+ flushPtr = (flushPtr + 1) % primaryIndexes.size();
+ cycles++;
+ }
+ if (primaryIndexes.get(flushPtr).getCurrentMemoryComponent().isModified()) {
+ // flush the current memory component
+ flushingIndex = primaryIndexes.get(flushPtr);
+ flushPtr = (flushPtr + 1) % primaryIndexes.size();
+ // we need to manually flush this memory component because it may be idle at this point
+ // note that this is different from flushing a filtered memory component
+ PrimaryIndexOperationTracker opTracker =
+ (PrimaryIndexOperationTracker) flushingIndex.getOperationTracker();
+ synchronized (opTracker) {
+ opTracker.setFlushOnExit(true);
+ opTracker.flushIfNeeded();
+ // If the flush cannot be scheduled at this time, then there must be active writers.
+ // The flush will be eventually scheduled when writers exit
+ }
+ } else {
+ throw new IllegalStateException(
+ "Cannot find modified memory component after checking all primary indexes");
+ }
+ }
+ }
+ }
+
+}
diff --git a/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/context/DatasetMemoryManagerTest.java b/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/context/DatasetMemoryManagerTest.java
deleted file mode 100644
index 2a26f08..0000000
--- a/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/context/DatasetMemoryManagerTest.java
+++ /dev/null
@@ -1,125 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.test.context;
-
-import org.apache.asterix.common.config.StorageProperties;
-import org.apache.asterix.common.context.DatasetMemoryManager;
-import org.junit.Assert;
-import org.junit.Test;
-import org.mockito.Mockito;
-
-public class DatasetMemoryManagerTest {
-
- private static final StorageProperties storageProperties;
- private static final long GLOBAL_BUDGET = 1000L;
- private static final long METADATA_DATASET_BUDGET = 200L;
- private static final long DATASET_BUDGET = 400L;
-
- static {
- storageProperties = Mockito.mock(StorageProperties.class);
- Mockito.when(storageProperties.getMemoryComponentGlobalBudget()).thenReturn(GLOBAL_BUDGET);
- Mockito.when(storageProperties.getMemoryComponentNumPages()).thenReturn(8);
- Mockito.when(storageProperties.getMetadataMemoryComponentNumPages()).thenReturn(4);
- Mockito.when(storageProperties.getMemoryComponentPageSize()).thenReturn(50);
- Mockito.when(storageProperties.getMemoryComponentsNum()).thenReturn(2);
- }
-
- @Test
- public void allocate() {
- DatasetMemoryManager memoryManager = new DatasetMemoryManager(storageProperties);
- // double allocate
- Assert.assertTrue(memoryManager.allocate(1));
- boolean thrown = false;
- try {
- memoryManager.allocate(1);
- } catch (IllegalStateException e) {
- Assert.assertTrue(e.getMessage().contains("already allocated"));
- thrown = true;
- }
- Assert.assertTrue(thrown);
-
- // allocate metadata and non-metadata datasets
- Assert.assertTrue(memoryManager.allocate(400));
-
- long expectedBudget = GLOBAL_BUDGET - METADATA_DATASET_BUDGET - DATASET_BUDGET;
- Assert.assertEquals(memoryManager.getAvailable(), expectedBudget);
-
- // reserve after allocate shouldn't allocate the budget again
- Assert.assertTrue(memoryManager.allocate(401));
- Assert.assertTrue(memoryManager.reserve(401));
-
- // deallocate should still keep the reserved memory
- memoryManager.deallocate(401);
- expectedBudget = GLOBAL_BUDGET - METADATA_DATASET_BUDGET - (DATASET_BUDGET * 2);
- Assert.assertEquals(memoryManager.getAvailable(), expectedBudget);
-
- // exceed budget should return false
- Assert.assertFalse(memoryManager.allocate(402));
- }
-
- @Test
- public void reserve() {
- DatasetMemoryManager memoryManager = new DatasetMemoryManager(storageProperties);
- // reserve then allocate budget
- Assert.assertTrue(memoryManager.reserve(1));
- Assert.assertTrue(memoryManager.allocate(1));
- long expectedBudget = GLOBAL_BUDGET - METADATA_DATASET_BUDGET;
- Assert.assertEquals(memoryManager.getAvailable(), expectedBudget);
-
- // double reserve
- Assert.assertTrue(memoryManager.reserve(2));
-
- // cancel reserved
- memoryManager.cancelReserved(2);
- Assert.assertEquals(memoryManager.getAvailable(), expectedBudget);
- }
-
- @Test
- public void deallocate() {
- DatasetMemoryManager memoryManager = new DatasetMemoryManager(storageProperties);
- // deallocate reserved
- Assert.assertTrue(memoryManager.reserve(200));
- Assert.assertTrue(memoryManager.allocate(200));
- memoryManager.deallocate(200);
- long expectedBudget = GLOBAL_BUDGET - DATASET_BUDGET;
- Assert.assertEquals(memoryManager.getAvailable(), expectedBudget);
-
- // deallocate not allocated
- boolean thrown = false;
- try {
- memoryManager.deallocate(1);
- } catch (IllegalStateException e) {
- Assert.assertTrue(e.getMessage().contains("No allocated"));
- thrown = true;
- }
- Assert.assertTrue(thrown);
-
- // double deallocate
- memoryManager.allocate(2);
- memoryManager.deallocate(2);
- thrown = false;
- try {
- memoryManager.deallocate(2);
- } catch (IllegalStateException e) {
- Assert.assertTrue(e.getMessage().contains("No allocated"));
- thrown = true;
- }
- Assert.assertTrue(thrown);
- }
-}
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java
index 69fc396..c9de359 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java
@@ -325,10 +325,6 @@
public static void enlistMetadataDataset(INCServiceContext ncServiceCtx, IMetadataIndex index)
throws HyracksDataException {
final int datasetId = index.getDatasetId().getId();
- // reserve memory for metadata dataset to ensure it can be opened when needed
- if (!appContext.getDatasetMemoryManager().reserve(index.getDatasetId().getId())) {
- throw new IllegalStateException("Failed to reserve memory for metadata dataset (" + datasetId + ")");
- }
String metadataPartitionPath =
StoragePathUtil.prepareStoragePartitionPath(MetadataNode.INSTANCE.getMetadataStoragePartition());
String resourceName = metadataPartitionPath + File.separator + index.getFileNameRelativePath();
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexLifecycleManager.java b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexLifecycleManager.java
index fe64e9f..c9505a6 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexLifecycleManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexLifecycleManager.java
@@ -21,11 +21,9 @@
import java.io.IOException;
import java.io.OutputStream;
import java.util.ArrayList;
-import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.Map.Entry;
import org.apache.hyracks.api.exceptions.ErrorCode;
import org.apache.hyracks.api.exceptions.HyracksDataException;
@@ -50,42 +48,17 @@
this.memoryUsed = 0;
}
- private boolean evictCandidateIndex() throws HyracksDataException {
- // Why min()? As a heuristic for eviction, we will take an open index
- // (an index consuming memory)
- // that is not being used (refcount == 0) and has been least recently
- // used. The sort order defined
- // for IndexInfo maintains this. See IndexInfo.compareTo().
- IndexInfo info = Collections.min(indexInfos.values());
- if (info.referenceCount != 0 || !info.isOpen) {
- return false;
- }
-
- info.index.deactivate();
- //find resource name and deallocate its memory
- for (Entry<String, IndexInfo> entry : indexInfos.entrySet()) {
- if (entry.getValue() == info) {
- deallocateMemory(entry.getKey());
- break;
- }
- }
- info.isOpen = false;
- return true;
- }
-
private class IndexInfo implements Comparable<IndexInfo> {
private final IIndex index;
private int referenceCount;
private long lastAccess;
private boolean isOpen;
- private boolean memoryAllocated;
public IndexInfo(IIndex index) {
this.index = index;
this.lastAccess = -1;
this.referenceCount = 0;
this.isOpen = false;
- this.memoryAllocated = false;
}
public void touch() {
@@ -202,7 +175,6 @@
}
if (!info.isOpen) {
- allocateMemory(resourcePath);
info.index.activate();
info.isOpen = true;
}
@@ -234,40 +206,7 @@
if (info.isOpen) {
info.index.deactivate();
- deallocateMemory(resourcePath);
}
indexInfos.remove(resourcePath);
}
-
- @Override
- public void allocateMemory(String resourcePath) throws HyracksDataException {
- IndexInfo info = indexInfos.get(resourcePath);
- if (info == null) {
- throw new HyracksDataException("Failed to allocate memory for index with resource ID " + resourcePath
- + " since it does not exist.");
- }
- if (!info.memoryAllocated) {
- long inMemorySize = info.index.getMemoryAllocationSize();
- while (memoryUsed + inMemorySize > memoryBudget) {
- if (!evictCandidateIndex()) {
- throw new HyracksDataException(
- "Cannot allocate memory for index since memory budget would be exceeded.");
- }
- }
- memoryUsed += inMemorySize;
- info.memoryAllocated = true;
- }
- }
-
- private void deallocateMemory(String resourcePath) throws HyracksDataException {
- IndexInfo info = indexInfos.get(resourcePath);
- if (info == null) {
- throw new HyracksDataException("Failed to deallocate memory for index with resource name " + resourcePath
- + " since it does not exist.");
- }
- if (info.isOpen && info.memoryAllocated) {
- memoryUsed -= info.index.getMemoryAllocationSize();
- info.memoryAllocated = false;
- }
- }
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/impls/AbstractTreeIndex.java b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/impls/AbstractTreeIndex.java
index 3158b79..5e922b1 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/impls/AbstractTreeIndex.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/impls/AbstractTreeIndex.java
@@ -337,11 +337,6 @@
}
- @Override
- public long getMemoryAllocationSize() {
- return 0;
- }
-
public IBinaryComparatorFactory[] getCmpFactories() {
return cmpFactories;
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeMemoryComponent.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeMemoryComponent.java
index 831562c..a66a890 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeMemoryComponent.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeMemoryComponent.java
@@ -23,6 +23,7 @@
import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentFilter;
import org.apache.hyracks.storage.am.lsm.common.api.IVirtualBufferCache;
import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMMemoryComponent;
+import org.apache.hyracks.storage.am.lsm.common.impls.LSMComponentFileReferences;
public class LSMBTreeMemoryComponent extends AbstractLSMMemoryComponent {
@@ -38,4 +39,9 @@
public BTree getIndex() {
return btree;
}
+
+ @Override
+ public LSMComponentFileReferences getComponentFileRefs() {
+ return new LSMComponentFileReferences(btree.getFileReference(), null, null);
+ }
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeWithBuddyMemoryComponent.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeWithBuddyMemoryComponent.java
index ed2ee70..feed07e 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeWithBuddyMemoryComponent.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeWithBuddyMemoryComponent.java
@@ -24,6 +24,7 @@
import org.apache.hyracks.storage.am.lsm.common.api.IVirtualBufferCache;
import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndex;
import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMWithBuddyMemoryComponent;
+import org.apache.hyracks.storage.am.lsm.common.impls.LSMComponentFileReferences;
/*
* This class is also not needed at the moment but is implemented anyway
@@ -56,7 +57,7 @@
}
@Override
- public long getSize() {
- return 0L;
+ public LSMComponentFileReferences getComponentFileRefs() {
+ return new LSMComponentFileReferences(btree.getFileReference(), buddyBtree.getFileReference(), null);
}
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMMemoryComponent.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMMemoryComponent.java
index c86f7b9..064ab64 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMMemoryComponent.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMMemoryComponent.java
@@ -19,6 +19,7 @@
package org.apache.hyracks.storage.am.lsm.common.api;
import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.storage.am.lsm.common.impls.LSMComponentFileReferences;
import org.apache.hyracks.storage.am.lsm.common.impls.MemoryComponentMetadata;
public interface ILSMMemoryComponent extends ILSMComponent {
@@ -48,11 +49,6 @@
void reset() throws HyracksDataException;
/**
- * @return true if the memory budget has been exceeded
- */
- boolean isFull();
-
- /**
* @return true if there are data in the memory component, false otherwise
*/
boolean isModified();
@@ -86,11 +82,6 @@
void validate() throws HyracksDataException;
/**
- * @return the size of the memory component
- */
- long getSize();
-
- /**
* Reset the component Id of the memory component after it's recycled
*
* @param newId
@@ -105,4 +96,17 @@
* entry to the component
*/
void setUnwritable();
+
+ /**
+ *
+ * @return the file references of the component
+ */
+ LSMComponentFileReferences getComponentFileRefs();
+
+ /**
+ * Called when the memory component is flushed to disk
+ * @throws HyracksDataException
+ */
+ void flushed() throws HyracksDataException;
+
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/IVirtualBufferCache.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/IVirtualBufferCache.java
index ff7bfb5..bbe6051 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/IVirtualBufferCache.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/IVirtualBufferCache.java
@@ -25,9 +25,42 @@
public interface IVirtualBufferCache extends IBufferCache {
void open() throws HyracksDataException;
+ /**
+ *
+ * @return true if the overall memory usage exceeds the budget
+ */
boolean isFull();
- void reset();
+ /**
+ * @param memoryComponent
+ * @return true if the memory component's memory usage exceeds its budget
+ */
+ boolean isFull(ILSMMemoryComponent memoryComponent);
IFileMapManager getFileMapProvider();
+
+ /**
+ *
+ * @return the number of in-use pages
+ */
+ int getUsage();
+
+ /**
+ * Register the memory component when it is allocated
+ * @param memoryComponent
+ */
+ void register(ILSMMemoryComponent memoryComponent);
+
+ /**
+ * Unregister the memory component when it is deallocated
+ * @param memoryComponent
+ */
+ void unregister(ILSMMemoryComponent memoryComponent);
+
+ /**
+ * Notify that virtual buffer cache that the memory component has been flushed to disk
+ * @param memoryComponent
+ * @throws HyracksDataException
+ */
+ void flushed(ILSMMemoryComponent memoryComponent) throws HyracksDataException;
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java
index 01a140f..6c7b1f8 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java
@@ -809,15 +809,6 @@
}
@Override
- public long getMemoryAllocationSize() {
- long size = 0;
- for (ILSMMemoryComponent c : memoryComponents) {
- size += c.getSize();
- }
- return size;
- }
-
- @Override
public void resetCurrentComponentIndex() {
synchronized (lsmHarness.getOperationTracker()) {
// validate no reader in any of the memory components and that all of them are INVALID
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMMemoryComponent.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMMemoryComponent.java
index 9440648..a6c82bc 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMMemoryComponent.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMMemoryComponent.java
@@ -29,7 +29,6 @@
import org.apache.hyracks.storage.am.lsm.common.api.IVirtualBufferCache;
import org.apache.hyracks.storage.am.lsm.common.api.LSMOperationType;
import org.apache.hyracks.storage.am.lsm.common.util.LSMComponentIdUtils;
-import org.apache.hyracks.storage.common.buffercache.IBufferCache;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -60,7 +59,7 @@
*/
@Override
public void schedule(LSMIOOperationType ioOperationType) throws HyracksDataException {
- activeate();
+ activate();
if (ioOperationType == LSMIOOperationType.FLUSH) {
if (state == ComponentState.READABLE_WRITABLE || state == ComponentState.READABLE_UNWRITABLE) {
if (writerCount != 0) {
@@ -79,7 +78,7 @@
}
}
- private void activeate() throws HyracksDataException {
+ private void activate() throws HyracksDataException {
if (state == ComponentState.INACTIVE) {
state = ComponentState.READABLE_WRITABLE;
lsmIndex.getIOOperationCallback().recycled(this);
@@ -88,7 +87,7 @@
@Override
public boolean threadEnter(LSMOperationType opType, boolean isMutableComponent) throws HyracksDataException {
- activeate();
+ activate();
switch (opType) {
case FORCE_MODIFICATION:
if (isMutableComponent) {
@@ -108,7 +107,9 @@
break;
case MODIFICATION:
if (isMutableComponent) {
- if (state == ComponentState.READABLE_WRITABLE) {
+ if (state == ComponentState.READABLE_WRITABLE && !vbc.isFull(this) && !vbc.isFull()) {
+ // Even when the memory component has the writable state, vbc may be temporarily full
+ // or this memory component may be full.
writerCount++;
} else {
return false;
@@ -159,7 +160,9 @@
writerCount--;
// A failed operation should not change the component state since it's better for
// the failed operation's effect to be no-op.
- if (state == ComponentState.READABLE_WRITABLE && !failedOperation && isFull()) {
+ if (state == ComponentState.READABLE_WRITABLE && !failedOperation && vbc.isFull(this)) {
+ // only mark the component state as unwritable when this memory component
+ // is full
state = ComponentState.READABLE_UNWRITABLE;
}
} else {
@@ -186,6 +189,7 @@
}
// operation succeeded
if (readerCount == 0) {
+ // TODO: move reset() outside of the synchronized block (on op tracker)
reset();
} else {
state = ComponentState.UNREADABLE_UNWRITABLE;
@@ -224,11 +228,6 @@
}
@Override
- public boolean isFull() {
- return vbc.isFull();
- }
-
- @Override
public final void reset() throws HyracksDataException {
state = ComponentState.INACTIVE;
isModified.set(false);
@@ -269,7 +268,8 @@
@Override
public final void allocate() throws HyracksDataException {
boolean allocated = false;
- ((IVirtualBufferCache) getIndex().getBufferCache()).open();
+ vbc.open();
+ vbc.register(this);
try {
doAllocate();
allocated = true;
@@ -300,6 +300,7 @@
try {
state = ComponentState.INACTIVE;
doDeallocate();
+ vbc.unregister(this);
} finally {
getIndex().getBufferCache().close();
}
@@ -317,12 +318,6 @@
}
@Override
- public long getSize() {
- IBufferCache virtualBufferCache = getIndex().getBufferCache();
- return virtualBufferCache.getPageBudget() * (long) virtualBufferCache.getPageSize();
- }
-
- @Override
public ILSMComponentId getId() {
return componentId;
}
@@ -344,6 +339,11 @@
}
@Override
+ public void flushed() throws HyracksDataException {
+ vbc.flushed(this);
+ }
+
+ @Override
public String toString() {
return "{\"class\":\"" + getClass().getSimpleName() + "\", \"state\":\"" + state + "\", \"writers\":"
+ writerCount + ", \"readers\":" + readerCount + ", \"pendingFlushes\":" + pendingFlushes
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java
index 65d2fcf..20a2555 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java
@@ -256,6 +256,11 @@
throw e; // NOSONAR: The last call in the finally clause
}
}
+ if (opType == LSMOperationType.FLUSH) {
+ ILSMMemoryComponent flushingComponent = (ILSMMemoryComponent) ctx.getComponentHolder().get(0);
+ // We must call flushed without synchronizing on opTracker to avoid deadlocks
+ flushingComponent.flushed();
+ }
}
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/MultitenantVirtualBufferCache.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/MultitenantVirtualBufferCache.java
index 5edcec8..6f3d2eb 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/MultitenantVirtualBufferCache.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/MultitenantVirtualBufferCache.java
@@ -24,6 +24,7 @@
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.io.FileReference;
import org.apache.hyracks.api.replication.IIOReplicationManager;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMMemoryComponent;
import org.apache.hyracks.storage.am.lsm.common.api.IVirtualBufferCache;
import org.apache.hyracks.storage.common.buffercache.ICachedPage;
import org.apache.hyracks.storage.common.buffercache.IExtraPageBlockHelper;
@@ -128,11 +129,6 @@
}
@Override
- public void reset() {
- vbc.reset();
- }
-
- @Override
public IFileMapManager getFileMapProvider() {
return vbc.getFileMapProvider();
}
@@ -221,4 +217,29 @@
public void closeFileIfOpen(FileReference fileRef) {
throw new UnsupportedOperationException();
}
-}
+
+ @Override
+ public boolean isFull(ILSMMemoryComponent memoryComponent) {
+ return vbc.isFull(memoryComponent);
+ }
+
+ @Override
+ public int getUsage() {
+ return vbc.getUsage();
+ }
+
+ @Override
+ public void register(ILSMMemoryComponent memoryComponent) {
+ vbc.register(memoryComponent);
+ }
+
+ @Override
+ public void unregister(ILSMMemoryComponent memoryComponent) {
+ vbc.unregister(memoryComponent);
+ }
+
+ @Override
+ public void flushed(ILSMMemoryComponent memoryComponent) throws HyracksDataException {
+ vbc.flushed(memoryComponent);
+ }
+}
\ No newline at end of file
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/VirtualBufferCache.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/VirtualBufferCache.java
index f24289f..5871b31 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/VirtualBufferCache.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/VirtualBufferCache.java
@@ -30,6 +30,7 @@
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.io.FileReference;
import org.apache.hyracks.api.replication.IIOReplicationManager;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMMemoryComponent;
import org.apache.hyracks.storage.am.lsm.common.api.IVirtualBufferCache;
import org.apache.hyracks.storage.common.buffercache.ICacheMemoryAllocator;
import org.apache.hyracks.storage.common.buffercache.ICachedPage;
@@ -46,8 +47,15 @@
import org.apache.logging.log4j.Logger;
public class VirtualBufferCache implements IVirtualBufferCache {
+ /**
+ * Based on {@code HashMap}
+ * Note that when a memory component is flushed, it scans vbc to clean up its pages while synchronized on
+ * the op tracker. Thus, it may not be a good idea to set the map factor too large because otherwise it
+ * would increase the blocking time.
+ */
+ private static final float MAP_FACTOR = 0.75f;
private static final Logger LOGGER = LogManager.getLogger();
- private static final boolean DEBUG = true;
+ private static final boolean DEBUG = false;
private final ICacheMemoryAllocator allocator;
private final IFileMapManager fileMapManager;
private final int pageSize;
@@ -66,7 +74,7 @@
throw new IllegalArgumentException("Page Budget Cannot be 0");
}
this.pageBudget = pageBudget;
- buckets = new CacheBucket[this.pageBudget];
+ buckets = new CacheBucket[(int) (this.pageBudget / MAP_FACTOR)];
freePages = new ArrayBlockingQueue<>(this.pageBudget);
largePages = new AtomicInteger(0);
used = new AtomicInteger(0);
@@ -87,6 +95,7 @@
return largePages.get();
}
+ @Override
public int getUsage() {
return used.get();
}
@@ -106,6 +115,12 @@
}
@Override
+ public boolean isFull(ILSMMemoryComponent memoryComponent) {
+ // the memory component needs to be flushed when the vbc is full
+ return isFull();
+ }
+
+ @Override
public int createFile(FileReference fileRef) throws HyracksDataException {
synchronized (fileMapManager) {
return fileMapManager.registerFile(fileRef);
@@ -320,7 +335,7 @@
throw HyracksDataException.create(ErrorCode.VBC_ALREADY_OPEN);
}
allocator.reserveAllocation(pageSize, pageBudget);
- for (int i = 0; i < pageBudget; i++) {
+ for (int i = 0; i < buckets.length; i++) {
buckets[i] = new CacheBucket();
}
largePages.set(0);
@@ -329,36 +344,12 @@
}
@Override
- public void reset() {
- recycleAllPages();
- used.set(0);
- largePages.set(0);
- }
-
- private void recycleAllPages() {
- for (int i = 0; i < buckets.length; i++) {
- final CacheBucket bucket = buckets[i];
- bucket.bucketLock.lock();
- try {
- VirtualPage curr = bucket.cachedPage;
- while (curr != null) {
- bucket.cachedPage = curr.next();
- recycle(curr);
- curr = bucket.cachedPage;
- }
- } finally {
- bucket.bucketLock.unlock();
- }
- }
- }
-
- @Override
public void close() throws HyracksDataException {
if (!open) {
throw HyracksDataException.create(ErrorCode.VBC_ALREADY_CLOSED);
}
freePages.clear();
- for (int i = 0; i < pageBudget; i++) {
+ for (int i = 0; i < buckets.length; i++) {
buckets[i].cachedPage = null;
}
open = false;
@@ -464,4 +455,19 @@
throw new UnsupportedOperationException();
}
+ @Override
+ public void register(ILSMMemoryComponent memoryComponent) {
+ // no op
+ }
+
+ @Override
+ public void unregister(ILSMMemoryComponent memoryComponent) {
+ // no op
+ }
+
+ @Override
+ public void flushed(ILSMMemoryComponent memoryComponent) throws HyracksDataException {
+ // no op
+ }
+
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexMemoryComponent.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexMemoryComponent.java
index a8005bc..3b6c1b2 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexMemoryComponent.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexMemoryComponent.java
@@ -23,6 +23,7 @@
import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentFilter;
import org.apache.hyracks.storage.am.lsm.common.api.IVirtualBufferCache;
import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMWithBuddyMemoryComponent;
+import org.apache.hyracks.storage.am.lsm.common.impls.LSMComponentFileReferences;
import org.apache.hyracks.storage.am.lsm.invertedindex.inmemory.InMemoryInvertedIndex;
public class LSMInvertedIndexMemoryComponent extends AbstractLSMWithBuddyMemoryComponent {
@@ -46,4 +47,10 @@
public BTree getBuddyIndex() {
return deletedKeysBTree;
}
+
+ @Override
+ public LSMComponentFileReferences getComponentFileRefs() {
+ return new LSMComponentFileReferences(invIndex.getBTree().getFileReference(),
+ deletedKeysBTree.getFileReference(), null);
+ }
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/inmemory/InMemoryInvertedIndex.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/inmemory/InMemoryInvertedIndex.java
index a7ce35c..bfbb141 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/inmemory/InMemoryInvertedIndex.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/inmemory/InMemoryInvertedIndex.java
@@ -148,12 +148,6 @@
}
@Override
- public long getMemoryAllocationSize() {
- IBufferCache virtualBufferCache = btree.getBufferCache();
- return (long) virtualBufferCache.getPageBudget() * virtualBufferCache.getPageSize();
- }
-
- @Override
public InvertedListCursor createInvertedListCursor(IHyracksTaskContext ctx) {
return new InMemoryInvertedListCursor(invListTypeTraits.length, tokenTypeTraits.length);
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/ondisk/OnDiskInvertedIndex.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/ondisk/OnDiskInvertedIndex.java
index 42171d1..ec1f143 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/ondisk/OnDiskInvertedIndex.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/ondisk/OnDiskInvertedIndex.java
@@ -661,11 +661,6 @@
}
}
- @Override
- public long getMemoryAllocationSize() {
- return 0;
- }
-
protected static ITypeTraits[] getBTreeTypeTraits(ITypeTraits[] tokenTypeTraits) {
ITypeTraits[] btreeTypeTraits = new ITypeTraits[tokenTypeTraits.length + btreeValueTypeTraits.length];
// Set key type traits.
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeMemoryComponent.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeMemoryComponent.java
index ef7b815..7f72d71 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeMemoryComponent.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeMemoryComponent.java
@@ -24,6 +24,7 @@
import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentFilter;
import org.apache.hyracks.storage.am.lsm.common.api.IVirtualBufferCache;
import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMWithBuddyMemoryComponent;
+import org.apache.hyracks.storage.am.lsm.common.impls.LSMComponentFileReferences;
import org.apache.hyracks.storage.am.rtree.impls.RTree;
public class LSMRTreeMemoryComponent extends AbstractLSMWithBuddyMemoryComponent {
@@ -53,4 +54,9 @@
throw new UnsupportedOperationException("Validation not implemented for LSM R-Trees.");
}
+ @Override
+ public LSMComponentFileReferences getComponentFileRefs() {
+ return new LSMComponentFileReferences(rtree.getFileReference(), btree.getFileReference(), null);
+ }
+
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/IIndex.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/IIndex.java
index 861fd2d..b715609 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/IIndex.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/IIndex.java
@@ -128,11 +128,6 @@
IBufferCache getBufferCache();
/**
- * @return the size, in bytes, of pre-allocated memory space that this index was allotted.
- */
- public long getMemoryAllocationSize();
-
- /**
* @param fillFactor
* @param verifyInput
* @throws HyracksDataException
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/IResourceLifecycleManager.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/IResourceLifecycleManager.java
index b653842..6200680 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/IResourceLifecycleManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/IResourceLifecycleManager.java
@@ -28,7 +28,7 @@
* @param <R>
* resource class
*/
-public interface IResourceLifecycleManager<R> extends IResourceMemoryManager {
+public interface IResourceLifecycleManager<R> {
/**
* get a list of all resources which are opened
*
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/IResourceMemoryManager.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/IResourceMemoryManager.java
deleted file mode 100644
index 036520a..0000000
--- a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/IResourceMemoryManager.java
+++ /dev/null
@@ -1,25 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hyracks.storage.common;
-
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-
-public interface IResourceMemoryManager {
- void allocateMemory(String resourcePath) throws HyracksDataException;
-}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/ResourceHeapBufferAllocator.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/ResourceHeapBufferAllocator.java
deleted file mode 100644
index 3b2e3cb..0000000
--- a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/ResourceHeapBufferAllocator.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hyracks.storage.common.buffercache;
-
-import java.nio.ByteBuffer;
-
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.storage.common.IResourceMemoryManager;
-
-public class ResourceHeapBufferAllocator implements ICacheMemoryAllocator {
-
- final IResourceMemoryManager memoryManager;
- final String resourceName;
-
- public ResourceHeapBufferAllocator(IResourceMemoryManager memoryManager, String resourceName) {
- this.memoryManager = memoryManager;
- this.resourceName = resourceName;
- }
-
- @Override
- public ByteBuffer[] allocate(int pageSize, int numPages) {
- ByteBuffer[] buffers = new ByteBuffer[numPages];
- for (int i = 0; i < numPages; ++i) {
- buffers[i] = ByteBuffer.allocate(pageSize);
- }
- return buffers;
- }
-
- @Override
- public ByteBuffer[] ensureAvailabilityThenAllocate(int pageSize, int numPages) throws HyracksDataException {
- reserveAllocation(pageSize, numPages);
- return allocate(pageSize, numPages);
- }
-
- @Override
- public void reserveAllocation(int pageSize, int numPages) throws HyracksDataException {
- memoryManager.allocateMemory(resourceName);
- }
-}
diff --git a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/impl/IVirtualBufferCacheCallback.java b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/impl/IVirtualBufferCacheCallback.java
index d5f3fb2..811987c 100644
--- a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/impl/IVirtualBufferCacheCallback.java
+++ b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/impl/IVirtualBufferCacheCallback.java
@@ -18,6 +18,8 @@
*/
package org.apache.hyracks.storage.am.lsm.btree.impl;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMMemoryComponent;
+
public interface IVirtualBufferCacheCallback {
- void isFullChanged(boolean newValue);
+ void isFullChanged(boolean newValue, ILSMMemoryComponent memoryComponent);
}
diff --git a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/impl/TestVirtualBufferCache.java b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/impl/TestVirtualBufferCache.java
index 63a51b9..8cad497 100644
--- a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/impl/TestVirtualBufferCache.java
+++ b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/impl/TestVirtualBufferCache.java
@@ -20,11 +20,13 @@
import java.util.ArrayList;
import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.io.FileReference;
import org.apache.hyracks.api.replication.IIOReplicationManager;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMMemoryComponent;
import org.apache.hyracks.storage.am.lsm.common.api.IVirtualBufferCache;
import org.apache.hyracks.storage.common.buffercache.ICachedPage;
import org.apache.hyracks.storage.common.buffercache.IExtraPageBlockHelper;
@@ -35,6 +37,7 @@
public class TestVirtualBufferCache implements IVirtualBufferCache {
private final IVirtualBufferCache vbc;
+ private final ConcurrentHashMap<ILSMMemoryComponent, AtomicBoolean> isFullMap = new ConcurrentHashMap<>();
private final AtomicBoolean isFull = new AtomicBoolean(false);
private final List<IVirtualBufferCacheCallback> callbacks;
@@ -87,7 +90,14 @@
@Override
public ICachedPage pin(long dpid, boolean newPage) throws HyracksDataException {
- return vbc.pin(dpid, newPage);
+ ICachedPage page = vbc.pin(dpid, newPage);
+ // the memory component can be full after each but, but isFull may not be called by the memory component
+ // for correctness, we call isFull here after each pin
+ for (ILSMMemoryComponent component : isFullMap.keySet()) {
+ isFull(component);
+ }
+
+ return page;
}
@Override
@@ -190,19 +200,27 @@
@Override
public boolean isFull() {
boolean newValue = vbc.isFull();
- if (isFull.compareAndSet(!newValue, newValue)) {
- synchronized (callbacks) {
- for (int i = 0; i < callbacks.size(); i++) {
- callbacks.get(i).isFullChanged(newValue);
- }
- }
- }
+ updateFullValue(newValue, null);
return newValue;
}
@Override
- public void reset() {
- vbc.reset();
+ public boolean isFull(ILSMMemoryComponent memoryComponent) {
+ boolean newValue = vbc.isFull(memoryComponent);
+ updateFullValue(newValue, memoryComponent);
+ return newValue;
+ }
+
+ private void updateFullValue(boolean newValue, ILSMMemoryComponent memoryComponent) {
+ AtomicBoolean isFull = memoryComponent != null
+ ? isFullMap.computeIfAbsent(memoryComponent, m -> new AtomicBoolean()) : this.isFull;
+ if (isFull.compareAndSet(!newValue, newValue)) {
+ synchronized (callbacks) {
+ for (int i = 0; i < callbacks.size(); i++) {
+ callbacks.get(i).isFullChanged(newValue, memoryComponent);
+ }
+ }
+ }
}
@Override
@@ -215,4 +233,29 @@
throw new UnsupportedOperationException();
}
+ @Override
+ public int getUsage() {
+ return vbc.getUsage();
+ }
+
+ @Override
+ public void register(ILSMMemoryComponent memoryComponent) {
+ vbc.register(memoryComponent);
+ }
+
+ @Override
+ public void unregister(ILSMMemoryComponent memoryComponent) {
+ vbc.unregister(memoryComponent);
+ }
+
+ @Override
+ public void flushed(ILSMMemoryComponent memoryComponent) throws HyracksDataException {
+ vbc.flushed(memoryComponent);
+ }
+
+ public void reset() {
+ isFull.set(false);
+ isFullMap.clear();
+ }
+
}