[NO ISSUE][STO] Fix memory leaks in storage
- user model changes: no
- storage format changes: no
- interface changes: yes
- Added javadocs to:
-- IBufferCache
-- IExtraPageBlockHelper
- Moved IBufferCache.setPageDiskId -> ICachedPage.setDiskPageId
- Renamed:
-- IBufferCache.flushDirtyPage -> IBufferCache.flush
-- IBufferCache.getNumPages -> IBufferCache.getPageBudget
- Removed:
-- IBufferCache.adviseWontNeed [not used]
-- IBufferCache.tryPin [not used]
details:
- Previously, when adding a kv pair to the metadata of a memory
component, we add a new Pair item to the ArrayList. After
this change, we only update it if it exists.
- VirtualBufferCache used to leak pages when reclaiming pages
of a file after deletion. This has also been fixed.
- New tests for VirtualBufferCache added:
- Checks for memory budget after end of testDisjointPins
- Concurrent Users pinning pages concurrently
- Test for large pages and ensuring allocated large
pages are accounted for through removal of cached
free pages.
Change-Id: I4ae9736c9b5fdba5795245bdf835c023e3f73b15
Reviewed-on: https://asterix-gerrit.ics.uci.edu/2115
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Contrib: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveNotificationHandler.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveNotificationHandler.java
index cbcc44f..d13a15d 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveNotificationHandler.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveNotificationHandler.java
@@ -31,7 +31,6 @@
import org.apache.asterix.active.EntityId;
import org.apache.asterix.active.IActiveEntityEventsListener;
import org.apache.asterix.active.IActiveNotificationHandler;
-import org.apache.asterix.active.SingleThreadEventProcessor;
import org.apache.asterix.active.message.ActivePartitionMessage;
import org.apache.asterix.common.api.IMetadataLockManager;
import org.apache.asterix.common.exceptions.AsterixException;
@@ -48,6 +47,7 @@
import org.apache.hyracks.api.job.JobId;
import org.apache.hyracks.api.job.JobSpecification;
import org.apache.hyracks.api.job.JobStatus;
+import org.apache.hyracks.api.util.SingleThreadEventProcessor;
public class ActiveNotificationHandler extends SingleThreadEventProcessor<ActiveEvent>
implements IActiveNotificationHandler, IJobLifecycleListener {
diff --git a/asterixdb/asterix-app/src/main/resources/asterix-build-configuration.xml b/asterixdb/asterix-app/src/main/resources/asterix-build-configuration.xml
index ce7eb3d..7eba9eb 100644
--- a/asterixdb/asterix-app/src/main/resources/asterix-build-configuration.xml
+++ b/asterixdb/asterix-app/src/main/resources/asterix-build-configuration.xml
@@ -77,7 +77,7 @@
</property>
<property>
<name>storage.memorycomponent.numpages</name>
- <value>8</value>
+ <value>16</value>
<description>The number of pages to allocate for a memory component.
This budget is shared by all the memory components of the primary
index and all its secondary indexes across all I/O devices on a node.
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
index 104f80b..b155b51 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
@@ -19,6 +19,7 @@
package org.apache.asterix.app.bootstrap;
import java.io.File;
+import java.rmi.RemoteException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
@@ -36,20 +37,27 @@
import org.apache.asterix.common.context.TransactionSubsystemProvider;
import org.apache.asterix.common.dataflow.ICcApplicationContext;
import org.apache.asterix.common.dataflow.LSMInsertDeleteOperatorNodePushable;
+import org.apache.asterix.common.exceptions.ACIDException;
import org.apache.asterix.common.transactions.IRecoveryManager.ResourceType;
import org.apache.asterix.common.transactions.ITransactionManager;
+import org.apache.asterix.dataflow.data.nontagged.MissingWriterFactory;
import org.apache.asterix.file.StorageComponentProvider;
import org.apache.asterix.formats.nontagged.SerializerDeserializerProvider;
import org.apache.asterix.formats.nontagged.TypeTraitProvider;
+import org.apache.asterix.metadata.MetadataManager;
+import org.apache.asterix.metadata.MetadataTransactionContext;
import org.apache.asterix.metadata.declared.MetadataProvider;
import org.apache.asterix.metadata.entities.Dataset;
import org.apache.asterix.metadata.entities.Dataverse;
import org.apache.asterix.metadata.entities.Index;
+import org.apache.asterix.metadata.utils.DatasetUtil;
import org.apache.asterix.metadata.utils.MetadataUtil;
import org.apache.asterix.metadata.utils.SplitsAndConstraintsUtil;
import org.apache.asterix.om.types.ARecordType;
import org.apache.asterix.om.types.IAType;
+import org.apache.asterix.runtime.formats.FormatUtils;
import org.apache.asterix.runtime.formats.NonTaggedDataFormat;
+import org.apache.asterix.runtime.operators.LSMPrimaryUpsertOperatorNodePushable;
import org.apache.asterix.runtime.utils.CcApplicationContext;
import org.apache.asterix.test.runtime.ExecutionTestUtil;
import org.apache.asterix.transaction.management.opcallbacks.AbstractIndexModificationOperationCallback.Operation;
@@ -81,11 +89,13 @@
import org.apache.hyracks.storage.am.btree.dataflow.BTreeSearchOperatorNodePushable;
import org.apache.hyracks.storage.am.common.api.IIndexBuilder;
import org.apache.hyracks.storage.am.common.api.IModificationOperationCallbackFactory;
+import org.apache.hyracks.storage.am.common.api.ISearchOperationCallbackFactory;
import org.apache.hyracks.storage.am.common.build.IndexBuilderFactory;
import org.apache.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
import org.apache.hyracks.storage.am.common.dataflow.IndexDataflowHelperFactory;
import org.apache.hyracks.storage.am.common.impls.NoOpOperationCallbackFactory;
import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation;
+import org.apache.hyracks.storage.am.lsm.common.api.IFrameOperationCallbackFactory;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMMergePolicyFactory;
import org.apache.hyracks.storage.am.lsm.common.impls.NoMergePolicyFactory;
import org.apache.hyracks.storage.common.IResourceFactory;
@@ -113,9 +123,7 @@
public static final double BLOOM_FILTER_FALSE_POSITIVE_RATE = 0.01;
public static final TransactionSubsystemProvider TXN_SUBSYSTEM_PROVIDER = TransactionSubsystemProvider.INSTANCE;
// Mutables
- private JobId jobId;
private long jobCounter = 0L;
- private IHyracksJobletContext jobletCtx;
private final String testConfigFileName;
private final boolean runHDFS;
@@ -137,9 +145,6 @@
th.printStackTrace();
throw th;
}
- jobletCtx = Mockito.mock(IHyracksJobletContext.class);
- Mockito.when(jobletCtx.getServiceContext()).thenReturn(ExecutionTestUtil.integrationUtil.ncs[0].getContext());
- Mockito.when(jobletCtx.getJobId()).thenReturn(jobId);
}
public void deInit() throws Exception {
@@ -147,20 +152,24 @@
ExecutionTestUtil.tearDown(cleanupOnStop);
}
- public org.apache.asterix.common.transactions.JobId getTxnJobId() {
- return new org.apache.asterix.common.transactions.JobId((int) jobId.getId());
+ public org.apache.asterix.common.transactions.JobId getTxnJobId(IHyracksTaskContext ctx) {
+ return new org.apache.asterix.common.transactions.JobId((int) ctx.getJobletContext().getJobId().getId());
}
public Pair<LSMInsertDeleteOperatorNodePushable, CommitRuntime> getInsertPipeline(IHyracksTaskContext ctx,
- Dataset dataset, IAType[] primaryKeyTypes, ARecordType recordType, ARecordType metaType,
- ILSMMergePolicyFactory mergePolicyFactory, Map<String, String> mergePolicyProperties, int[] filterFields,
+ Dataset dataset, IAType[] primaryKeyTypes, ARecordType recordType, ARecordType metaType, int[] filterFields,
int[] primaryKeyIndexes, List<Integer> primaryKeyIndicators,
- StorageComponentProvider storageComponentProvider) throws AlgebricksException, HyracksDataException {
+ StorageComponentProvider storageComponentProvider)
+ throws AlgebricksException, HyracksDataException, RemoteException, ACIDException {
+ MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
+ org.apache.hyracks.algebricks.common.utils.Pair<ILSMMergePolicyFactory, Map<String, String>> mergePolicy =
+ DatasetUtil.getMergePolicyFactory(dataset, mdTxnCtx);
+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
PrimaryIndexInfo primaryIndexInfo = new PrimaryIndexInfo(dataset, primaryKeyTypes, recordType, metaType,
- mergePolicyFactory, mergePolicyProperties, filterFields, primaryKeyIndexes, primaryKeyIndicators);
+ mergePolicy.first, mergePolicy.second, filterFields, primaryKeyIndexes, primaryKeyIndicators);
IndexOperation op = IndexOperation.INSERT;
IModificationOperationCallbackFactory modOpCallbackFactory =
- new PrimaryIndexModificationOperationCallbackFactory(getTxnJobId(), dataset.getDatasetId(),
+ new PrimaryIndexModificationOperationCallbackFactory(getTxnJobId(ctx), dataset.getDatasetId(),
primaryIndexInfo.primaryKeyIndexes, TXN_SUBSYSTEM_PROVIDER, Operation.get(op),
ResourceType.LSM_BTREE);
IRecordDescriptorProvider recordDescProvider = primaryIndexInfo.getInsertRecordDescriptorProvider();
@@ -170,7 +179,7 @@
primaryIndexInfo.primaryIndexInsertFieldsPermutations,
recordDescProvider.getInputRecordDescriptor(new ActivityId(new OperatorDescriptorId(0), 0), 0), op,
true, indexHelperFactory, modOpCallbackFactory, null);
- CommitRuntime commitOp = new CommitRuntime(ctx, getTxnJobId(), dataset.getDatasetId(),
+ CommitRuntime commitOp = new CommitRuntime(ctx, getTxnJobId(ctx), dataset.getDatasetId(),
primaryIndexInfo.primaryKeyIndexes, false, true, PARTITION, true);
insertOp.setOutputFrameWriter(0, commitOp, primaryIndexInfo.rDesc);
commitOp.setInputRecordDescriptor(0, primaryIndexInfo.rDesc);
@@ -204,8 +213,7 @@
}
public JobId newJobId() {
- jobId = new JobId(jobCounter++);
- return jobId;
+ return new JobId(jobCounter++);
}
public IResourceFactory getPrimaryResourceFactory(IHyracksTaskContext ctx, PrimaryIndexInfo primaryIndexInfo,
@@ -225,18 +233,22 @@
}
public PrimaryIndexInfo createPrimaryIndex(Dataset dataset, IAType[] primaryKeyTypes, ARecordType recordType,
- ARecordType metaType, ILSMMergePolicyFactory mergePolicyFactory, Map<String, String> mergePolicyProperties,
- int[] filterFields, IStorageComponentProvider storageComponentProvider, int[] primaryKeyIndexes,
- List<Integer> primaryKeyIndicators) throws AlgebricksException, HyracksDataException {
+ ARecordType metaType, int[] filterFields, IStorageComponentProvider storageComponentProvider,
+ int[] primaryKeyIndexes, List<Integer> primaryKeyIndicators)
+ throws AlgebricksException, HyracksDataException, RemoteException, ACIDException {
+ MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
+ org.apache.hyracks.algebricks.common.utils.Pair<ILSMMergePolicyFactory, Map<String, String>> mergePolicy =
+ DatasetUtil.getMergePolicyFactory(dataset, mdTxnCtx);
+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
PrimaryIndexInfo primaryIndexInfo = new PrimaryIndexInfo(dataset, primaryKeyTypes, recordType, metaType,
- mergePolicyFactory, mergePolicyProperties, filterFields, primaryKeyIndexes, primaryKeyIndicators);
+ mergePolicy.first, mergePolicy.second, filterFields, primaryKeyIndexes, primaryKeyIndicators);
Dataverse dataverse = new Dataverse(dataset.getDataverseName(), NonTaggedDataFormat.class.getName(),
MetadataUtil.PENDING_NO_OP);
MetadataProvider mdProvider = new MetadataProvider(
(ICcApplicationContext) ExecutionTestUtil.integrationUtil.cc.getApplicationContext(), dataverse);
try {
IResourceFactory resourceFactory = dataset.getResourceFactory(mdProvider, primaryIndexInfo.index,
- recordType, metaType, mergePolicyFactory, mergePolicyProperties);
+ recordType, metaType, mergePolicy.first, mergePolicy.second);
IndexBuilderFactory indexBuilderFactory =
new IndexBuilderFactory(storageComponentProvider.getStorageManager(),
primaryIndexInfo.getFileSplitProvider(), resourceFactory, !dataset.isTemp());
@@ -283,6 +295,10 @@
if (withMessaging) {
TaskUtil.put(HyracksConstants.KEY_MESSAGE, new VSizeFrame(ctx), ctx);
}
+ JobId jobId = newJobId();
+ IHyracksJobletContext jobletCtx = Mockito.mock(IHyracksJobletContext.class);
+ Mockito.when(jobletCtx.getServiceContext()).thenReturn(ExecutionTestUtil.integrationUtil.ncs[0].getContext());
+ Mockito.when(jobletCtx.getJobId()).thenReturn(jobId);
ctx = Mockito.spy(ctx);
Mockito.when(ctx.getJobletContext()).thenReturn(jobletCtx);
Mockito.when(ctx.getIoManager()).thenReturn(ExecutionTestUtil.integrationUtil.ncs[0].getIoManager());
@@ -410,4 +426,84 @@
(CcApplicationContext) ExecutionTestUtil.integrationUtil.cc.getApplicationContext();
return appCtx.getStorageManager();
}
+
+ public Pair<LSMPrimaryUpsertOperatorNodePushable, CommitRuntime> getUpsertPipeline(IHyracksTaskContext ctx,
+ Dataset dataset, IAType[] keyTypes, ARecordType recordType, ARecordType metaType, int[] filterFields,
+ int[] keyIndexes, List<Integer> keyIndicators, StorageComponentProvider storageComponentProvider,
+ IFrameOperationCallbackFactory frameOpCallbackFactory, boolean hasSecondaries) throws Exception {
+ MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
+ org.apache.hyracks.algebricks.common.utils.Pair<ILSMMergePolicyFactory, Map<String, String>> mergePolicy =
+ DatasetUtil.getMergePolicyFactory(dataset, mdTxnCtx);
+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+ PrimaryIndexInfo primaryIndexInfo = new PrimaryIndexInfo(dataset, keyTypes, recordType, metaType,
+ mergePolicy.first, mergePolicy.second, filterFields, keyIndexes, keyIndicators);
+ IModificationOperationCallbackFactory modificationCallbackFactory = dataset.getModificationCallbackFactory(
+ storageComponentProvider, primaryIndexInfo.index, getTxnJobId(ctx), IndexOperation.UPSERT, keyIndexes);
+ ISearchOperationCallbackFactory searchCallbackFactory = dataset.getSearchCallbackFactory(
+ storageComponentProvider, primaryIndexInfo.index, getTxnJobId(ctx), IndexOperation.UPSERT, keyIndexes);
+ IRecordDescriptorProvider recordDescProvider = primaryIndexInfo.getInsertRecordDescriptorProvider();
+ IIndexDataflowHelperFactory indexHelperFactory = new IndexDataflowHelperFactory(
+ storageComponentProvider.getStorageManager(), primaryIndexInfo.getFileSplitProvider());
+ LSMPrimaryUpsertOperatorNodePushable insertOp = new LSMPrimaryUpsertOperatorNodePushable(ctx, PARTITION,
+ indexHelperFactory, primaryIndexInfo.primaryIndexInsertFieldsPermutations,
+ recordDescProvider.getInputRecordDescriptor(new ActivityId(new OperatorDescriptorId(0), 0), 0),
+ modificationCallbackFactory, searchCallbackFactory, keyIndexes.length, recordType, -1,
+ frameOpCallbackFactory == null ? dataset.getFrameOpCallbackFactory() : frameOpCallbackFactory,
+ MissingWriterFactory.INSTANCE, hasSecondaries);
+ RecordDescriptor upsertOutRecDesc = getUpsertOutRecDesc(primaryIndexInfo.rDesc, dataset,
+ filterFields == null ? 0 : filterFields.length, recordType, metaType);
+ // fix pk fields
+ int diff = upsertOutRecDesc.getFieldCount() - primaryIndexInfo.rDesc.getFieldCount();
+ int[] pkFieldsInCommitOp = new int[dataset.getPrimaryKeys().size()];
+ for (int i = 0; i < pkFieldsInCommitOp.length; i++) {
+ pkFieldsInCommitOp[i] = diff + i;
+ }
+ CommitRuntime commitOp = new CommitRuntime(ctx, getTxnJobId(ctx), dataset.getDatasetId(), pkFieldsInCommitOp,
+ false, true, PARTITION, true);
+ insertOp.setOutputFrameWriter(0, commitOp, upsertOutRecDesc);
+ commitOp.setInputRecordDescriptor(0, upsertOutRecDesc);
+ return Pair.of(insertOp, commitOp);
+ }
+
+ private RecordDescriptor getUpsertOutRecDesc(RecordDescriptor inputRecordDesc, Dataset dataset, int numFilterFields,
+ ARecordType itemType, ARecordType metaItemType) throws Exception {
+ ITypeTraits[] outputTypeTraits =
+ new ITypeTraits[inputRecordDesc.getFieldCount() + (dataset.hasMetaPart() ? 2 : 1) + numFilterFields];
+ ISerializerDeserializer<?>[] outputSerDes = new ISerializerDeserializer[inputRecordDesc.getFieldCount()
+ + (dataset.hasMetaPart() ? 2 : 1) + numFilterFields];
+
+ // add the previous record first
+ int f = 0;
+ outputSerDes[f] = FormatUtils.getDefaultFormat().getSerdeProvider().getSerializerDeserializer(itemType);
+ f++;
+ // add the previous meta second
+ if (dataset.hasMetaPart()) {
+ outputSerDes[f] = FormatUtils.getDefaultFormat().getSerdeProvider().getSerializerDeserializer(metaItemType);
+ outputTypeTraits[f] = FormatUtils.getDefaultFormat().getTypeTraitProvider().getTypeTrait(metaItemType);
+ f++;
+ }
+ // add the previous filter third
+ int fieldIdx = -1;
+ if (numFilterFields > 0) {
+ String filterField = DatasetUtil.getFilterField(dataset).get(0);
+ String[] fieldNames = itemType.getFieldNames();
+ int i = 0;
+ for (; i < fieldNames.length; i++) {
+ if (fieldNames[i].equals(filterField)) {
+ break;
+ }
+ }
+ fieldIdx = i;
+ outputTypeTraits[f] = FormatUtils.getDefaultFormat().getTypeTraitProvider()
+ .getTypeTrait(itemType.getFieldTypes()[fieldIdx]);
+ outputSerDes[f] = FormatUtils.getDefaultFormat().getSerdeProvider()
+ .getSerializerDeserializer(itemType.getFieldTypes()[fieldIdx]);
+ f++;
+ }
+ for (int j = 0; j < inputRecordDesc.getFieldCount(); j++) {
+ outputTypeTraits[j + f] = inputRecordDesc.getTypeTraits()[j];
+ outputSerDes[j + f] = inputRecordDesc.getFields()[j];
+ }
+ return new RecordDescriptor(outputSerDes, outputTypeTraits);
+ }
}
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/Actor.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/Actor.java
index 8d21b55..c50a4a2 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/Actor.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/Actor.java
@@ -18,8 +18,8 @@
*/
package org.apache.asterix.test.active;
-import org.apache.asterix.active.SingleThreadEventProcessor;
import org.apache.asterix.metadata.declared.MetadataProvider;
+import org.apache.hyracks.api.util.SingleThreadEventProcessor;
public class Actor extends SingleThreadEventProcessor<Action> {
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/ComponentRollbackTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/ComponentRollbackTest.java
index 76bec8c..36cb4bb 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/ComponentRollbackTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/ComponentRollbackTest.java
@@ -132,12 +132,12 @@
public void createIndex() throws Exception {
List<List<String>> partitioningKeys = new ArrayList<>();
partitioningKeys.add(Collections.singletonList("key"));
- dataset = new TestDataset(DATAVERSE_NAME, DATASET_NAME, DATAVERSE_NAME, DATA_TYPE_NAME,
- NODE_GROUP_NAME, null, null, new InternalDatasetDetails(null, PartitioningStrategy.HASH,
+ dataset = new TestDataset(DATAVERSE_NAME, DATASET_NAME, DATAVERSE_NAME, DATA_TYPE_NAME, NODE_GROUP_NAME,
+ NoMergePolicyFactory.NAME, null, new InternalDatasetDetails(null, PartitioningStrategy.HASH,
partitioningKeys, null, null, null, false, null, false),
null, DatasetType.INTERNAL, DATASET_ID, 0);
- PrimaryIndexInfo primaryIndexInfo = nc.createPrimaryIndex(dataset, KEY_TYPES, RECORD_TYPE, META_TYPE,
- new NoMergePolicyFactory(), null, null, storageManager, KEY_INDEXES, KEY_INDICATORS_LIST);
+ PrimaryIndexInfo primaryIndexInfo = nc.createPrimaryIndex(dataset, KEY_TYPES, RECORD_TYPE, META_TYPE, null,
+ storageManager, KEY_INDEXES, KEY_INDICATORS_LIST);
IndexDataflowHelperFactory iHelperFactory =
new IndexDataflowHelperFactory(nc.getStorageManager(), primaryIndexInfo.getFileSplitProvider());
ctx = nc.createTestContext(false);
@@ -146,9 +146,9 @@
lsmBtree = (TestLsmBtree) indexDataflowHelper.getIndexInstance();
indexDataflowHelper.close();
nc.newJobId();
- txnCtx = nc.getTransactionManager().getTransactionContext(nc.getTxnJobId(), true);
- insertOp = nc.getInsertPipeline(ctx, dataset, KEY_TYPES, RECORD_TYPE, META_TYPE, new NoMergePolicyFactory(),
- null, null, KEY_INDEXES, KEY_INDICATORS_LIST, storageManager).getLeft();
+ txnCtx = nc.getTransactionManager().getTransactionContext(nc.getTxnJobId(ctx), true);
+ insertOp = nc.getInsertPipeline(ctx, dataset, KEY_TYPES, RECORD_TYPE, META_TYPE, null, KEY_INDEXES,
+ KEY_INDICATORS_LIST, storageManager).getLeft();
}
@After
@@ -174,7 +174,7 @@
VSizeFrame frame = new VSizeFrame(ctx);
FrameTupleAppender tupleAppender = new FrameTupleAppender(frame);
for (int j = 0; j < TOTAL_NUM_OF_RECORDS; j++) {
- // flush every 1000 records
+ // flush every RECORDS_PER_COMPONENT records
if (j % RECORDS_PER_COMPONENT == 0 && j + 1 != TOTAL_NUM_OF_RECORDS) {
if (tupleAppender.getTupleCount() > 0) {
tupleAppender.write(insertOp, true);
@@ -224,7 +224,7 @@
VSizeFrame frame = new VSizeFrame(ctx);
FrameTupleAppender tupleAppender = new FrameTupleAppender(frame);
for (int j = 0; j < TOTAL_NUM_OF_RECORDS; j++) {
- // flush every 1000 records
+ // flush every RECORDS_PER_COMPONENT records
if (j % RECORDS_PER_COMPONENT == 0 && j + 1 != TOTAL_NUM_OF_RECORDS) {
if (tupleAppender.getTupleCount() > 0) {
tupleAppender.write(insertOp, true);
@@ -254,9 +254,9 @@
// insert again
nc.newJobId();
- txnCtx = nc.getTransactionManager().getTransactionContext(nc.getTxnJobId(), true);
- insertOp = nc.getInsertPipeline(ctx, dataset, KEY_TYPES, RECORD_TYPE, META_TYPE, new NoMergePolicyFactory(),
- null, null, KEY_INDEXES, KEY_INDICATORS_LIST, storageManager).getLeft();
+ txnCtx = nc.getTransactionManager().getTransactionContext(nc.getTxnJobId(ctx), true);
+ insertOp = nc.getInsertPipeline(ctx, dataset, KEY_TYPES, RECORD_TYPE, META_TYPE, null, KEY_INDEXES,
+ KEY_INDICATORS_LIST, storageManager).getLeft();
insertOp.open();
for (int j = 0; j < RECORDS_PER_COMPONENT; j++) {
ITupleReference tuple = tupleGenerator.next();
@@ -291,7 +291,7 @@
VSizeFrame frame = new VSizeFrame(ctx);
FrameTupleAppender tupleAppender = new FrameTupleAppender(frame);
for (int j = 0; j < TOTAL_NUM_OF_RECORDS; j++) {
- // flush every 1000 records
+ // flush every RECORDS_PER_COMPONENT records
if (j % RECORDS_PER_COMPONENT == 0 && j + 1 != TOTAL_NUM_OF_RECORDS) {
if (tupleAppender.getTupleCount() > 0) {
tupleAppender.write(insertOp, true);
@@ -360,7 +360,7 @@
VSizeFrame frame = new VSizeFrame(ctx);
FrameTupleAppender tupleAppender = new FrameTupleAppender(frame);
for (int j = 0; j < TOTAL_NUM_OF_RECORDS; j++) {
- // flush every 1000 records
+ // flush every RECORDS_PER_COMPONENT records
if (j % RECORDS_PER_COMPONENT == 0 && j + 1 != TOTAL_NUM_OF_RECORDS) {
if (tupleAppender.getTupleCount() > 0) {
tupleAppender.write(insertOp, true);
@@ -416,7 +416,7 @@
VSizeFrame frame = new VSizeFrame(ctx);
FrameTupleAppender tupleAppender = new FrameTupleAppender(frame);
for (int j = 0; j < TOTAL_NUM_OF_RECORDS; j++) {
- // flush every 1000 records
+ // flush every RECORDS_PER_COMPONENT records
if (j % RECORDS_PER_COMPONENT == 0 && j + 1 != TOTAL_NUM_OF_RECORDS) {
if (tupleAppender.getTupleCount() > 0) {
tupleAppender.write(insertOp, true);
@@ -481,7 +481,7 @@
VSizeFrame frame = new VSizeFrame(ctx);
FrameTupleAppender tupleAppender = new FrameTupleAppender(frame);
for (int j = 0; j < TOTAL_NUM_OF_RECORDS; j++) {
- // flush every 1000 records
+ // flush every RECORDS_PER_COMPONENT records
if (j % RECORDS_PER_COMPONENT == 0 && j + 1 != TOTAL_NUM_OF_RECORDS) {
if (tupleAppender.getTupleCount() > 0) {
tupleAppender.write(insertOp, true);
@@ -541,7 +541,7 @@
VSizeFrame frame = new VSizeFrame(ctx);
FrameTupleAppender tupleAppender = new FrameTupleAppender(frame);
for (int j = 0; j < TOTAL_NUM_OF_RECORDS; j++) {
- // flush every 1000 records
+ // flush every RECORDS_PER_COMPONENT records
if (j % RECORDS_PER_COMPONENT == 0 && j + 1 != TOTAL_NUM_OF_RECORDS) {
if (tupleAppender.getTupleCount() > 0) {
tupleAppender.write(insertOp, true);
@@ -603,7 +603,7 @@
VSizeFrame frame = new VSizeFrame(ctx);
FrameTupleAppender tupleAppender = new FrameTupleAppender(frame);
for (int j = 0; j < TOTAL_NUM_OF_RECORDS; j++) {
- // flush every 1000 records
+ // flush every RECORDS_PER_COMPONENT records
if (j % RECORDS_PER_COMPONENT == 0 && j + 1 != TOTAL_NUM_OF_RECORDS) {
if (tupleAppender.getTupleCount() > 0) {
tupleAppender.write(insertOp, true);
@@ -675,7 +675,7 @@
VSizeFrame frame = new VSizeFrame(ctx);
FrameTupleAppender tupleAppender = new FrameTupleAppender(frame);
for (int j = 0; j < TOTAL_NUM_OF_RECORDS; j++) {
- // flush every 1000 records
+ // flush every RECORDS_PER_COMPONENT records
if (j % RECORDS_PER_COMPONENT == 0 && j + 1 != TOTAL_NUM_OF_RECORDS) {
if (tupleAppender.getTupleCount() > 0) {
tupleAppender.write(insertOp, true);
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/LogMarkerTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/LogMarkerTest.java
index a0e3aa9..82eb16a 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/LogMarkerTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/LogMarkerTest.java
@@ -110,20 +110,19 @@
StorageComponentProvider storageManager = new StorageComponentProvider();
List<List<String>> partitioningKeys = new ArrayList<>();
partitioningKeys.add(Collections.singletonList("key"));
- Dataset dataset = new Dataset(DATAVERSE_NAME, DATASET_NAME, DATAVERSE_NAME, DATA_TYPE_NAME,
- NODE_GROUP_NAME, null, null, new InternalDatasetDetails(null, PartitioningStrategy.HASH,
+ Dataset dataset = new Dataset(DATAVERSE_NAME, DATASET_NAME, DATAVERSE_NAME, DATA_TYPE_NAME, NODE_GROUP_NAME,
+ NoMergePolicyFactory.NAME, null, new InternalDatasetDetails(null, PartitioningStrategy.HASH,
partitioningKeys, null, null, null, false, null, false),
null, DatasetType.INTERNAL, DATASET_ID, 0);
try {
- PrimaryIndexInfo indexInfo = nc.createPrimaryIndex(dataset, KEY_TYPES, RECORD_TYPE, META_TYPE,
- new NoMergePolicyFactory(), null, null, storageManager, KEY_INDEXES, KEY_INDICATORS_LIST);
+ PrimaryIndexInfo indexInfo = nc.createPrimaryIndex(dataset, KEY_TYPES, RECORD_TYPE, META_TYPE, null,
+ storageManager, KEY_INDEXES, KEY_INDICATORS_LIST);
IHyracksTaskContext ctx = nc.createTestContext(true);
nc.newJobId();
- ITransactionContext txnCtx = nc.getTransactionManager().getTransactionContext(nc.getTxnJobId(), true);
- LSMInsertDeleteOperatorNodePushable insertOp = nc
- .getInsertPipeline(ctx, dataset, KEY_TYPES, RECORD_TYPE, META_TYPE, new NoMergePolicyFactory(),
- null, null, KEY_INDEXES, KEY_INDICATORS_LIST, storageManager)
- .getLeft();
+ ITransactionContext txnCtx =
+ nc.getTransactionManager().getTransactionContext(nc.getTxnJobId(ctx), true);
+ LSMInsertDeleteOperatorNodePushable insertOp = nc.getInsertPipeline(ctx, dataset, KEY_TYPES,
+ RECORD_TYPE, META_TYPE, null, KEY_INDEXES, KEY_INDICATORS_LIST, storageManager).getLeft();
insertOp.open();
TupleGenerator tupleGenerator = new TupleGenerator(RECORD_TYPE, META_TYPE, KEY_INDEXES, KEY_INDICATORS,
RECORD_GEN_FUNCTION, UNIQUE_RECORD_FIELDS, META_GEN_FUNCTION, UNIQUE_META_FIELDS);
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/logging/CheckpointingTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/logging/CheckpointingTest.java
index 41b3d38..5384c92 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/logging/CheckpointingTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/logging/CheckpointingTest.java
@@ -119,21 +119,20 @@
nc.init();
List<List<String>> partitioningKeys = new ArrayList<>();
partitioningKeys.add(Collections.singletonList("key"));
- Dataset dataset = new Dataset(DATAVERSE_NAME, DATASET_NAME, DATAVERSE_NAME, DATA_TYPE_NAME,
- NODE_GROUP_NAME, null, null, new InternalDatasetDetails(null, PartitioningStrategy.HASH,
+ Dataset dataset = new Dataset(DATAVERSE_NAME, DATASET_NAME, DATAVERSE_NAME, DATA_TYPE_NAME, NODE_GROUP_NAME,
+ NoMergePolicyFactory.NAME, null, new InternalDatasetDetails(null, PartitioningStrategy.HASH,
partitioningKeys, null, null, null, false, null, false),
null, DatasetType.INTERNAL, DATASET_ID, 0);
try {
- nc.createPrimaryIndex(dataset, KEY_TYPES, RECORD_TYPE, META_TYPE, new NoMergePolicyFactory(), null,
- null, storageManager, KEY_INDEXES, KEY_INDICATOR_LIST);
+ nc.createPrimaryIndex(dataset, KEY_TYPES, RECORD_TYPE, META_TYPE, null, storageManager, KEY_INDEXES,
+ KEY_INDICATOR_LIST);
IHyracksTaskContext ctx = nc.createTestContext(false);
nc.newJobId();
- ITransactionContext txnCtx = nc.getTransactionManager().getTransactionContext(nc.getTxnJobId(), true);
+ ITransactionContext txnCtx =
+ nc.getTransactionManager().getTransactionContext(nc.getTxnJobId(ctx), true);
// Prepare insert operation
- LSMInsertDeleteOperatorNodePushable insertOp = nc
- .getInsertPipeline(ctx, dataset, KEY_TYPES, RECORD_TYPE, META_TYPE, new NoMergePolicyFactory(),
- null, null, KEY_INDEXES, KEY_INDICATOR_LIST, storageManager)
- .getLeft();
+ LSMInsertDeleteOperatorNodePushable insertOp = nc.getInsertPipeline(ctx, dataset, KEY_TYPES,
+ RECORD_TYPE, META_TYPE, null, KEY_INDEXES, KEY_INDICATOR_LIST, storageManager).getLeft();
insertOp.open();
TupleGenerator tupleGenerator = new TupleGenerator(RECORD_TYPE, META_TYPE, KEY_INDEXES, KEY_INDICATOR,
RECORD_GEN_FUNCTION, UNIQUE_RECORD_FIELDS, META_GEN_FUNCTION, UNIQUE_META_FIELDS);
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/storage/DiskIsFullTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/storage/DiskIsFullTest.java
index 58697a9..eb47248 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/storage/DiskIsFullTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/storage/DiskIsFullTest.java
@@ -119,26 +119,23 @@
StorageComponentProvider storageManager = new StorageComponentProvider();
List<List<String>> partitioningKeys = new ArrayList<>();
partitioningKeys.add(Collections.singletonList("key"));
- Dataset dataset =
- new Dataset(DATAVERSE_NAME, DATASET_NAME, DATAVERSE_NAME, DATA_TYPE_NAME, NODE_GROUP_NAME, null,
- null,
- new InternalDatasetDetails(null, PartitioningStrategy.HASH, partitioningKeys, null, null,
- null, false, null, false), null, DatasetType.INTERNAL, DATASET_ID, 0);
+ Dataset dataset = new Dataset(DATAVERSE_NAME, DATASET_NAME, DATAVERSE_NAME, DATA_TYPE_NAME, NODE_GROUP_NAME,
+ NoMergePolicyFactory.NAME, null, new InternalDatasetDetails(null, PartitioningStrategy.HASH,
+ partitioningKeys, null, null, null, false, null, false),
+ null, DatasetType.INTERNAL, DATASET_ID, 0);
try {
- nc.createPrimaryIndex(dataset, KEY_TYPES, RECORD_TYPE, META_TYPE, new NoMergePolicyFactory(), null,
- null, storageManager, KEY_INDEXES, KEY_INDICATOR_LIST);
+ nc.createPrimaryIndex(dataset, KEY_TYPES, RECORD_TYPE, META_TYPE, null, storageManager, KEY_INDEXES,
+ KEY_INDICATOR_LIST);
IHyracksTaskContext ctx = nc.createTestContext(false);
nc.newJobId();
- ITransactionContext txnCtx = nc.getTransactionManager().getTransactionContext(nc.getTxnJobId(), true);
+ ITransactionContext txnCtx =
+ nc.getTransactionManager().getTransactionContext(nc.getTxnJobId(ctx), true);
// Prepare insert operation
- LSMInsertDeleteOperatorNodePushable insertOp =
- nc.getInsertPipeline(ctx, dataset, KEY_TYPES, RECORD_TYPE, META_TYPE,
- new NoMergePolicyFactory(), null, null, KEY_INDEXES, KEY_INDICATOR_LIST, storageManager)
- .getLeft();
+ LSMInsertDeleteOperatorNodePushable insertOp = nc.getInsertPipeline(ctx, dataset, KEY_TYPES,
+ RECORD_TYPE, META_TYPE, null, KEY_INDEXES, KEY_INDICATOR_LIST, storageManager).getLeft();
insertOp.open();
- TupleGenerator tupleGenerator =
- new TupleGenerator(RECORD_TYPE, META_TYPE, KEY_INDEXES, KEY_INDICATOR, RECORD_GEN_FUNCTION,
- UNIQUE_RECORD_FIELDS, META_GEN_FUNCTION, UNIQUE_META_FIELDS);
+ TupleGenerator tupleGenerator = new TupleGenerator(RECORD_TYPE, META_TYPE, KEY_INDEXES, KEY_INDICATOR,
+ RECORD_GEN_FUNCTION, UNIQUE_RECORD_FIELDS, META_GEN_FUNCTION, UNIQUE_META_FIELDS);
VSizeFrame frame = new VSizeFrame(ctx);
FrameTupleAppender tupleAppender = new FrameTupleAppender(frame);
// Insert records until disk becomes full
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
index 895cd55..6d3b6c2 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
@@ -229,7 +229,6 @@
public static final int ACTIVE_ENTITY_LISTENER_IS_NOT_REGISTERED = 3097;
public static final int CANNOT_DERIGESTER_ACTIVE_ENTITY_LISTENER = 3098;
public static final int DOUBLE_INITIALIZATION_OF_ACTIVE_NOTIFICATION_HANDLER = 3099;
- public static final int FAILED_TO_SHUTDOWN_EVENT_PROCESSOR = 3100;
public static final int DOUBLE_RECOVERY_ATTEMPTS = 3101;
public static final int UNREPORTED_TASK_FAILURE_EXCEPTION = 3102;
public static final int ACTIVE_ENTITY_ALREADY_SUSPENDED = 3103;
diff --git a/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties b/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
index 98b8e2f..e428721 100644
--- a/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
+++ b/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
@@ -218,7 +218,6 @@
3097 = Active Entity %1$s has not been registered
3098 = Cannot deregister %1$s because it is active
3099 = Attempt to initialize an initialized Active Notification Handler
-3100 = Failed to shutdown event processor for %1$s
3101 = Recovery request while recovery is currently ongoing
3102 = Unreported exception causing task failure
3103 = %1$s is already suspended and has state %2$s
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java
index a071345..42da1d7 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java
@@ -133,6 +133,10 @@
public static final int ILLEGAL_ATTEMPT_TO_EXIT_EMPTY_COMPONENT = 97;
public static final int A_FLUSH_OPERATION_HAS_FAILED = 98;
public static final int A_MERGE_OPERATION_HAS_FAILED = 99;
+ public static final int FAILED_TO_SHUTDOWN_EVENT_PROCESSOR = 100;
+ public static final int PAGE_DOES_NOT_EXIST_IN_FILE = 101;
+ public static final int VBC_ALREADY_OPEN = 102;
+ public static final int VBC_ALREADY_CLOSED = 103;
// Compilation error codes.
public static final int RULECOLLECTION_NOT_INSTANCE_OF_LIST = 10000;
diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/SingleThreadEventProcessor.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/SingleThreadEventProcessor.java
similarity index 91%
rename from asterixdb/asterix-active/src/main/java/org/apache/asterix/active/SingleThreadEventProcessor.java
rename to hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/SingleThreadEventProcessor.java
index de6682d..21965c7 100644
--- a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/SingleThreadEventProcessor.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/SingleThreadEventProcessor.java
@@ -16,14 +16,13 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.asterix.active;
+package org.apache.hyracks.api.util;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.logging.Level;
import java.util.logging.Logger;
-import org.apache.asterix.common.exceptions.ErrorCode;
-import org.apache.asterix.common.exceptions.RuntimeDataException;
+import org.apache.hyracks.api.exceptions.ErrorCode;
import org.apache.hyracks.api.exceptions.HyracksDataException;
public abstract class SingleThreadEventProcessor<T> implements Runnable {
@@ -75,7 +74,7 @@
LOGGER.log(Level.WARNING,
"Failed to stop event processor after " + attempt + " attempts. Interrupted exception swallowed?");
if (attempt == 10) {
- throw new RuntimeDataException(ErrorCode.FAILED_TO_SHUTDOWN_EVENT_PROCESSOR, name);
+ throw HyracksDataException.create(ErrorCode.FAILED_TO_SHUTDOWN_EVENT_PROCESSOR, name);
}
executorThread.interrupt();
executorThread.join(1000);
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties b/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties
index ae579b4..bc83f8c 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties
@@ -116,5 +116,9 @@
97 = Illegal attempt to exit empty component
98 = A flush operation has failed
99 = A merge operation has failed
+100 = Failed to shutdown event processor for %1$s
+101 = Page %1$s does not exist in file %2$s
+102 = Failed to open virtual buffer cache since it is already open
+103 = Failed to close virtual buffer cache since it is already closed
10000 = The given rule collection %1$s is not an instance of the List class.
diff --git a/hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/api/AbstractPointable.java b/hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/api/AbstractPointable.java
index 77f18ea..05417a8 100644
--- a/hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/api/AbstractPointable.java
+++ b/hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/api/AbstractPointable.java
@@ -25,19 +25,6 @@
protected int length;
- /**
- * copies the content of this pointable to the passed byte array.
- * the array is expected to be at least of length = length of this pointable
- *
- * @param copy
- * the array to write into
- * @throws ArrayIndexOutOfBoundsException
- * if the passed array size is smaller than length
- */
- public void copyInto(byte[] copy) {
- System.arraycopy(bytes, start, copy, 0, length);
- }
-
@Override
public void set(byte[] bytes, int start, int length) {
this.bytes = bytes;
diff --git a/hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/util/DataUtils.java b/hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/util/DataUtils.java
new file mode 100644
index 0000000..5de0b84
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/util/DataUtils.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.data.std.util;
+
+import org.apache.hyracks.data.std.api.IValueReference;
+
+public class DataUtils {
+
+ private DataUtils() {
+ }
+
+ /**
+ * Copies the content of this pointable to the passed byte array.
+ * the array is expected to be at least of length = length of this pointable
+ *
+ * @param value
+ * the value to be copied
+ * @param copy
+ * the array to write into
+ * @throws ArrayIndexOutOfBoundsException
+ * if the passed array size is smaller than length
+ */
+ public static void copyInto(IValueReference value, byte[] copy) {
+ System.arraycopy(value.getByteArray(), value.getStartOffset(), copy, 0, value.getLength());
+ }
+
+ /**
+ * Copies the content of this pointable to the passed byte array.
+ * the array is expected to be at least of length = offset + length of this pointable
+ *
+ * @param value
+ * the value to be copied
+ * @param copy
+ * the array to write into
+ * @param offset
+ * the offset to start writing from
+ * @throws ArrayIndexOutOfBoundsException
+ * if the passed array size - offset is smaller than length
+ */
+ public static void copyInto(IValueReference value, byte[] copy, int offset) {
+ System.arraycopy(value.getByteArray(), value.getStartOffset(), copy, offset, value.getLength());
+ }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/impls/BTree.java b/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/impls/BTree.java
index 5e3042e..1fbcbb0 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/impls/BTree.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/impls/BTree.java
@@ -1135,7 +1135,7 @@
((IBTreeInteriorFrame) interiorFrame).deleteGreatest();
int finalPageId = freePageManager.takePage(metaFrame);
- bufferCache.setPageDiskId(frontier.page, BufferedFileHandle.getDiskPageId(getFileId(), finalPageId));
+ frontier.page.setDiskPageId(BufferedFileHandle.getDiskPageId(getFileId(), finalPageId));
pagesToWrite.add(frontier.page);
splitKey.setLeftPage(finalPageId);
@@ -1156,7 +1156,7 @@
if (level < 1) {
ICachedPage lastLeaf = nodeFrontiers.get(level).page;
int lastLeafPage = nodeFrontiers.get(level).pageId;
- setPageDpid(lastLeaf, nodeFrontiers.get(level).pageId);
+ lastLeaf.setDiskPageId(BufferedFileHandle.getDiskPageId(getFileId(), nodeFrontiers.get(level).pageId));
queue.put(lastLeaf);
nodeFrontiers.get(level).page = null;
persistFrontiers(level + 1, lastLeafPage);
@@ -1171,7 +1171,7 @@
}
((IBTreeInteriorFrame) interiorFrame).setRightmostChildPageId(rightPage);
int finalPageId = freePageManager.takePage(metaFrame);
- setPageDpid(frontier.page, finalPageId);
+ frontier.page.setDiskPageId(BufferedFileHandle.getDiskPageId(getFileId(), finalPageId));
queue.put(frontier.page);
frontier.pageId = finalPageId;
@@ -1193,10 +1193,6 @@
public void abort() throws HyracksDataException {
super.handleException();
}
-
- private void setPageDpid(ICachedPage page, int pageId) {
- bufferCache.setPageDiskId(page, BufferedFileHandle.getDiskPageId(getFileId(), pageId));
- }
}
@SuppressWarnings("rawtypes")
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/freepage/AppendOnlyLinkedMetadataPageManager.java b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/freepage/AppendOnlyLinkedMetadataPageManager.java
index 1ee48f6..a051364 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/freepage/AppendOnlyLinkedMetadataPageManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/freepage/AppendOnlyLinkedMetadataPageManager.java
@@ -219,7 +219,7 @@
confiscatedPage.releaseWriteLatch(false);
}
int finalMetaPage = getMaxPageId(metaFrame) + 1;
- bufferCache.setPageDiskId(confiscatedPage, BufferedFileHandle.getDiskPageId(fileId, finalMetaPage));
+ confiscatedPage.setDiskPageId(BufferedFileHandle.getDiskPageId(fileId, finalMetaPage));
queue.put(confiscatedPage);
bufferCache.finishQueue();
metadataPage = getMetadataPageId();
@@ -345,8 +345,10 @@
try {
frame.setPage(page);
int inPageOffset = frame.getOffset(key);
- return inPageOffset >= 0 ? ((long) pageId * bufferCache.getPageSizeWithHeader()) + frame.getOffset(key)
- + IBufferCache.RESERVED_HEADER_BYTES : -1L;
+ return inPageOffset >= 0
+ ? ((long) pageId * bufferCache.getPageSizeWithHeader()) + frame.getOffset(key)
+ + IBufferCache.RESERVED_HEADER_BYTES
+ : -1L;
} finally {
page.releaseReadLatch();
unpinPage(page);
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/freepage/LinkedMetaDataPageManager.java b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/freepage/LinkedMetaDataPageManager.java
index 686fe78..d8afd12 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/freepage/LinkedMetaDataPageManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/freepage/LinkedMetaDataPageManager.java
@@ -195,7 +195,7 @@
metaFrame.setMaxPage(1);
} finally {
metaNode.releaseWriteLatch(true);
- bufferCache.flushDirtyPage(metaNode);
+ bufferCache.flush(metaNode);
bufferCache.unpin(metaNode);
}
int rootPage = getRootPageId();
@@ -207,7 +207,7 @@
leafFrame.initBuffer((byte) 0);
} finally {
rootNode.releaseWriteLatch(true);
- bufferCache.flushDirtyPage(rootNode);
+ bufferCache.flush(rootNode);
bufferCache.unpin(rootNode);
}
}
@@ -249,7 +249,7 @@
metaFrame.setValid(true);
} finally {
metaNode.releaseWriteLatch(true);
- bufferCache.flushDirtyPage(metaNode);
+ bufferCache.flush(metaNode);
bufferCache.unpin(metaNode);
ready = true;
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/impls/AbstractTreeIndex.java b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/impls/AbstractTreeIndex.java
index ae66402..f03a358 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/impls/AbstractTreeIndex.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/impls/AbstractTreeIndex.java
@@ -342,4 +342,9 @@
public boolean hasMemoryComponents() {
return true;
}
+
+ @Override
+ public String toString() {
+ return "{\"class\":\"" + getClass().getSimpleName() + "\",\"file\":\"" + file.getRelativePath() + "\"}";
+ }
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMDiskComponent.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMDiskComponent.java
index 280cc52..0b59e91 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMDiskComponent.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMDiskComponent.java
@@ -197,4 +197,9 @@
.addBulkLoader(createIndexBulkLoader(fillFactor, verifyInput, numElementsHint, checkIfEmptyIndex));
return chainedBulkLoader;
}
+
+ @Override
+ public String toString() {
+ return "{\"class\":" + getClass().getSimpleName() + "\", \"index\":" + getIndex().toString() + "}";
+ }
}
\ No newline at end of file
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMMemoryComponent.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMMemoryComponent.java
index a4fe35c..57db635 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMMemoryComponent.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMMemoryComponent.java
@@ -255,6 +255,6 @@
@Override
public long getSize() {
IBufferCache virtualBufferCache = getIndex().getBufferCache();
- return virtualBufferCache.getNumPages() * (long) virtualBufferCache.getPageSize();
+ return virtualBufferCache.getPageBudget() * (long) virtualBufferCache.getPageSize();
}
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/DiskComponentMetadata.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/DiskComponentMetadata.java
index b7d2ea3..d1244ce 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/DiskComponentMetadata.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/DiskComponentMetadata.java
@@ -45,7 +45,7 @@
@Override
public IValueReference get(IValueReference key) throws HyracksDataException {
- IPointable value = VoidPointable.FACTORY.createPointable();
+ VoidPointable value = VoidPointable.FACTORY.createPointable();
get(key, value);
return value;
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/MemoryComponentMetadata.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/MemoryComponentMetadata.java
index dcc9355..1b827b7 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/MemoryComponentMetadata.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/MemoryComponentMetadata.java
@@ -20,6 +20,8 @@
import java.util.ArrayList;
import java.util.List;
+import java.util.logging.Level;
+import java.util.logging.Logger;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.hyracks.api.exceptions.HyracksDataException;
@@ -31,6 +33,7 @@
import org.apache.hyracks.storage.am.lsm.common.api.IComponentMetadata;
public class MemoryComponentMetadata implements IComponentMetadata {
+ private static final Logger LOGGER = Logger.getLogger(MemoryComponentMetadata.class.getName());
private static final byte[] empty = new byte[0];
private final List<org.apache.commons.lang3.tuple.Pair<IValueReference, ArrayBackedValueStorage>> store =
new ArrayList<>();
@@ -43,9 +46,9 @@
ArrayBackedValueStorage stored = get(key);
if (stored == null) {
stored = new ArrayBackedValueStorage();
+ store.add(Pair.of(key, stored));
}
stored.assign(value);
- store.add(Pair.of(key, stored));
}
/**
@@ -71,8 +74,12 @@
}
public void copy(IMetadataPageManager mdpManager) throws HyracksDataException {
+ LOGGER.log(Level.INFO, "Copying Metadata into a different component");
ITreeIndexMetadataFrame frame = mdpManager.createMetadataFrame();
for (Pair<IValueReference, ArrayBackedValueStorage> pair : store) {
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.log(Level.INFO, "Copying " + pair.getKey() + " : " + pair.getValue().getLength() + " bytes");
+ }
mdpManager.put(frame, pair.getKey(), pair.getValue());
}
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/MultitenantVirtualBufferCache.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/MultitenantVirtualBufferCache.java
index 83e140c..7a3d58b 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/MultitenantVirtualBufferCache.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/MultitenantVirtualBufferCache.java
@@ -20,8 +20,6 @@
import java.util.HashMap;
import java.util.Map;
-import java.util.logging.Level;
-import java.util.logging.Logger;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.io.FileReference;
@@ -34,7 +32,6 @@
import org.apache.hyracks.util.JSONUtil;
public class MultitenantVirtualBufferCache implements IVirtualBufferCache {
- private static final Logger LOGGER = Logger.getLogger(ExternalIndexHarness.class.getName());
private final IVirtualBufferCache vbc;
private int openCount;
@@ -65,11 +62,6 @@
}
@Override
- public ICachedPage tryPin(long dpid) throws HyracksDataException {
- return vbc.tryPin(dpid);
- }
-
- @Override
public ICachedPage pin(long dpid, boolean newPage) throws HyracksDataException {
return vbc.pin(dpid, newPage);
}
@@ -80,8 +72,8 @@
}
@Override
- public void flushDirtyPage(ICachedPage page) throws HyracksDataException {
- vbc.flushDirtyPage(page);
+ public void flush(ICachedPage page) throws HyracksDataException {
+ vbc.flush(page);
}
@Override
@@ -100,8 +92,8 @@
}
@Override
- public int getNumPages() {
- return vbc.getNumPages();
+ public int getPageBudget() {
+ return vbc.getPageBudget();
}
@Override
@@ -141,14 +133,6 @@
}
@Override
- public void adviseWontNeed(ICachedPage page) {
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.log(Level.INFO, "Calling adviseWontNeed on " + this.getClass().getName()
- + " makes no sense as this BufferCache cannot evict pages");
- }
- }
-
- @Override
public ICachedPage confiscatePage(long dpid) throws HyracksDataException {
return vbc.confiscatePage(dpid);
}
@@ -175,11 +159,6 @@
}
@Override
- public void setPageDiskId(ICachedPage page, long dpid) {
-
- }
-
- @Override
public void returnPage(ICachedPage page, boolean reinsert) {
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/NoMergePolicyFactory.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/NoMergePolicyFactory.java
index 3195f57..8ce636b 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/NoMergePolicyFactory.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/NoMergePolicyFactory.java
@@ -31,13 +31,13 @@
public class NoMergePolicyFactory implements ILSMMergePolicyFactory {
private static final long serialVersionUID = 1L;
-
private static final String[] SET_VALUES = new String[] {};
private static final Set<String> PROPERTIES_NAMES = new HashSet<>(Arrays.asList(SET_VALUES));
+ public static final String NAME = "no-merge";
@Override
public String getName() {
- return "no-merge";
+ return NAME;
}
@Override
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/VirtualBufferCache.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/VirtualBufferCache.java
index 83e377a..3a22793 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/VirtualBufferCache.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/VirtualBufferCache.java
@@ -19,15 +19,16 @@
package org.apache.hyracks.storage.am.lsm.common.impls;
import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantLock;
import java.util.logging.Level;
import java.util.logging.Logger;
+import org.apache.hyracks.api.exceptions.ErrorCode;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.io.FileReference;
import org.apache.hyracks.api.replication.IIOReplicationManager;
@@ -38,42 +39,71 @@
import org.apache.hyracks.storage.common.buffercache.IFIFOPageQueue;
import org.apache.hyracks.storage.common.buffercache.VirtualPage;
import org.apache.hyracks.storage.common.file.BufferedFileHandle;
-import org.apache.hyracks.storage.common.file.IFileMapManager;
import org.apache.hyracks.storage.common.file.FileMapManager;
+import org.apache.hyracks.storage.common.file.IFileMapManager;
import org.apache.hyracks.util.JSONUtil;
public class VirtualBufferCache implements IVirtualBufferCache {
- private static final Logger LOGGER = Logger.getLogger(ExternalIndexHarness.class.getName());
-
- private static final int OVERFLOW_PADDING = 8;
+ private static final Logger LOGGER = Logger.getLogger(VirtualBufferCache.class.getName());
private final ICacheMemoryAllocator allocator;
private final IFileMapManager fileMapManager;
private final int pageSize;
- private final int numPages;
-
+ private final int pageBudget;
private final CacheBucket[] buckets;
- private final ArrayList<VirtualPage> pages;
-
- private volatile int nextFree;
+ private final BlockingQueue<VirtualPage> freePages;
private final AtomicInteger largePages;
-
+ private final AtomicInteger used;
private boolean open;
- public VirtualBufferCache(ICacheMemoryAllocator allocator, int pageSize, int numPages) {
+ public VirtualBufferCache(ICacheMemoryAllocator allocator, int pageSize, int pageBudget) {
this.allocator = allocator;
this.fileMapManager = new FileMapManager();
this.pageSize = pageSize;
- this.numPages = 2 * (numPages / 2) + 1;
-
- buckets = new CacheBucket[this.numPages];
- pages = new ArrayList<>();
- nextFree = 0;
+ if (pageBudget == 0) {
+ throw new IllegalArgumentException("Page Budget Cannot be 0");
+ }
+ this.pageBudget = pageBudget;
+ buckets = new CacheBucket[this.pageBudget];
+ freePages = new ArrayBlockingQueue<>(this.pageBudget);
largePages = new AtomicInteger(0);
+ used = new AtomicInteger(0);
open = false;
}
@Override
+ public int getPageSize() {
+ return pageSize;
+ }
+
+ @Override
+ public int getPageSizeWithHeader() {
+ return pageSize;
+ }
+
+ public int getLargePages() {
+ return largePages.get();
+ }
+
+ public int getUsage() {
+ return used.get();
+ }
+
+ public int getPreAllocatedPages() {
+ return freePages.size();
+ }
+
+ @Override
+ public int getPageBudget() {
+ return pageBudget;
+ }
+
+ @Override
+ public boolean isFull() {
+ return used.get() >= pageBudget;
+ }
+
+ @Override
public int createFile(FileReference fileRef) throws HyracksDataException {
synchronized (fileMapManager) {
return fileMapManager.registerFile(fileRef);
@@ -82,16 +112,28 @@
@Override
public int openFile(FileReference fileRef) throws HyracksDataException {
- synchronized (fileMapManager) {
- if (fileMapManager.isMapped(fileRef)) {
- return fileMapManager.lookupFileId(fileRef);
+ try {
+ synchronized (fileMapManager) {
+ if (fileMapManager.isMapped(fileRef)) {
+ return fileMapManager.lookupFileId(fileRef);
+ }
+ return fileMapManager.registerFile(fileRef);
}
- return fileMapManager.registerFile(fileRef);
+ } finally {
+ logStats();
+ }
+ }
+
+ private void logStats() {
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.log(Level.INFO, "Free (allocated) pages = " + freePages.size() + ". Budget = " + pageBudget
+ + ". Large pages = " + largePages.get() + ". Overall usage = " + used.get());
}
}
@Override
public void openFile(int fileId) throws HyracksDataException {
+ logStats();
}
@Override
@@ -111,6 +153,7 @@
synchronized (fileMapManager) {
fileMapManager.unregisterFile(fileId);
}
+ int reclaimedPages = 0;
for (int i = 0; i < buckets.length; i++) {
final CacheBucket bucket = buckets[i];
bucket.bucketLock.lock();
@@ -119,16 +162,20 @@
VirtualPage curr = bucket.cachedPage;
while (curr != null) {
if (BufferedFileHandle.getFileId(curr.dpid()) == fileId) {
- if (curr.getFrameSizeMultiplier() > 1) {
+ reclaimedPages++;
+ if (curr.isLargePage()) {
largePages.getAndAdd(-curr.getFrameSizeMultiplier());
+ used.addAndGet(-curr.getFrameSizeMultiplier());
+ } else {
+ used.decrementAndGet();
}
if (prev == null) {
bucket.cachedPage = curr.next();
- curr.reset();
+ recycle(curr);
curr = bucket.cachedPage;
} else {
prev.next(curr.next());
- curr.reset();
+ recycle(curr);
curr = prev.next();
}
} else {
@@ -140,54 +187,27 @@
bucket.bucketLock.unlock();
}
}
- defragPageList();
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.log(Level.INFO, "Reclaimed pages = " + reclaimedPages);
+ }
+ logStats();
}
- private void defragPageList() {
- synchronized (pages) {
- int start = 0;
- int end = nextFree - 1;
- while (start < end) {
- VirtualPage lastUsed = pages.get(end);
- while (end > 0 && lastUsed.dpid() == -1) {
- --end;
- lastUsed = pages.get(end);
- }
-
- if (end == 0) {
- nextFree = lastUsed.dpid() == -1 ? 0 : 1;
- break;
- }
-
- VirtualPage firstUnused = pages.get(start);
- while (start < end && firstUnused.dpid() != -1) {
- ++start;
- firstUnused = pages.get(start);
- }
-
- if (start >= end) {
- break;
- }
-
- Collections.swap(pages, start, end);
- nextFree = end;
- --end;
- ++start;
- }
+ private void recycle(VirtualPage page) {
+ // recycle only if
+ // 1. not a large page
+ // 2. allocation is not above budget
+ if (used.get() < pageBudget && !page.isLargePage()) {
+ page.reset();
+ freePages.offer(page);
}
}
@Override
- public ICachedPage tryPin(long dpid) throws HyracksDataException {
- return pin(dpid, false);
- }
-
- @Override
public ICachedPage pin(long dpid, boolean newPage) throws HyracksDataException {
- VirtualPage page = null;
+ VirtualPage page;
int hash = hash(dpid);
CacheBucket bucket = buckets[hash];
-
bucket.bucketLock.lock();
try {
page = bucket.cachedPage;
@@ -197,15 +217,15 @@
}
page = page.next();
}
-
if (!newPage) {
+ int fileId = BufferedFileHandle.getFileId(dpid);
+ FileReference fileRef;
synchronized (fileMapManager) {
- throw new HyracksDataException(
- "Page " + BufferedFileHandle.getPageId(dpid) + " does not exist in file "
- + fileMapManager.lookupFileName(BufferedFileHandle.getFileId(dpid)));
+ fileRef = fileMapManager.lookupFileName(fileId);
}
+ throw HyracksDataException.create(ErrorCode.PAGE_DOES_NOT_EXIST_IN_FILE,
+ BufferedFileHandle.getPageId(dpid), fileRef);
}
-
page = getOrAllocPage(dpid);
page.next(bucket.cachedPage);
bucket.cachedPage = page;
@@ -222,18 +242,13 @@
}
private VirtualPage getOrAllocPage(long dpid) {
- VirtualPage page;
- synchronized (pages) {
- if (nextFree >= pages.size()) {
- page = new VirtualPage(allocator.allocate(pageSize, 1)[0], pageSize);
- page.multiplier(1);
- pages.add(page);
- } else {
- page = pages.get(nextFree);
- }
- ++nextFree;
- page.dpid(dpid);
+ VirtualPage page = freePages.poll();
+ if (page == null) {
+ page = new VirtualPage(allocator.allocate(pageSize, 1)[0], pageSize);
+ page.multiplier(1);
}
+ page.dpid(dpid);
+ used.incrementAndGet();
return page;
}
@@ -245,10 +260,26 @@
// no-op
return;
}
+ // Maintain counters
+ // In addition, discard pre-allocated pages as the multiplier of the large page
+ // This is done before actual resizing in order to allow GC for the same budget out of
+ // the available free pages first
if (origMultiplier == 1) {
- synchronized (pages) {
- pages.remove(cPage);
- nextFree--;
+ largePages.getAndAdd(multiplier);
+ int diff = multiplier - 1;
+ used.getAndAdd(diff);
+ for (int i = 0; i < diff; i++) {
+ freePages.poll();
+ }
+ } else if (multiplier == 1) {
+ largePages.getAndAdd(-origMultiplier);
+ used.addAndGet(-origMultiplier + 1);
+ } else {
+ int diff = multiplier - origMultiplier;
+ largePages.getAndAdd(diff);
+ used.getAndAdd(diff);
+ for (int i = 0; i < diff; i++) {
+ freePages.poll();
}
}
ByteBuffer newBuffer = allocator.allocate(pageSize * multiplier, 1)[0];
@@ -257,15 +288,6 @@
oldBuffer.limit(newBuffer.capacity());
}
newBuffer.put(oldBuffer);
- if (origMultiplier == 1) {
- largePages.getAndAdd(multiplier);
- } else if (multiplier == 1) {
- largePages.getAndAdd(-origMultiplier);
- pages.add(0, (VirtualPage) cPage);
- nextFree++;
- } else {
- largePages.getAndAdd(multiplier - origMultiplier);
- }
((VirtualPage) cPage).buffer(newBuffer);
((VirtualPage) cPage).multiplier(multiplier);
}
@@ -275,7 +297,8 @@
}
@Override
- public void flushDirtyPage(ICachedPage page) throws HyracksDataException {
+ public void flush(ICachedPage page) throws HyracksDataException {
+ throw new UnsupportedOperationException();
}
@Override
@@ -283,59 +306,50 @@
}
@Override
- public int getPageSize() {
- return pageSize;
- }
-
- @Override
- public int getPageSizeWithHeader() {
- return pageSize;
- }
-
- @Override
- public int getNumPages() {
- return numPages;
- }
-
- @Override
public void open() throws HyracksDataException {
if (open) {
- throw new HyracksDataException("Failed to open virtual buffercache since it is already open.");
+ throw HyracksDataException.create(ErrorCode.VBC_ALREADY_OPEN);
}
- pages.trimToSize();
- pages.ensureCapacity(numPages + OVERFLOW_PADDING);
- allocator.reserveAllocation(pageSize, numPages);
- for (int i = 0; i < numPages; i++) {
+ allocator.reserveAllocation(pageSize, pageBudget);
+ for (int i = 0; i < pageBudget; i++) {
buckets[i] = new CacheBucket();
}
- nextFree = 0;
largePages.set(0);
+ used.set(0);
open = true;
}
@Override
public void reset() {
- for (int i = 0; i < numPages; i++) {
- buckets[i].cachedPage = null;
- }
- int excess = pages.size() - numPages;
- if (excess > 0) {
- for (int i = numPages + excess - 1; i >= numPages; i--) {
- pages.remove(i);
+ recycleAllPages();
+ used.set(0);
+ largePages.set(0);
+ }
+
+ private void recycleAllPages() {
+ for (int i = 0; i < buckets.length; i++) {
+ final CacheBucket bucket = buckets[i];
+ bucket.bucketLock.lock();
+ try {
+ VirtualPage curr = bucket.cachedPage;
+ while (curr != null) {
+ bucket.cachedPage = curr.next();
+ recycle(curr);
+ curr = bucket.cachedPage;
+ }
+ } finally {
+ bucket.bucketLock.unlock();
}
}
- nextFree = 0;
- largePages.set(0);
}
@Override
public void close() throws HyracksDataException {
if (!open) {
- throw new HyracksDataException("Failed to close virtual buffercache since it is already closed.");
+ throw HyracksDataException.create(ErrorCode.VBC_ALREADY_CLOSED);
}
-
- pages.clear();
- for (int i = 0; i < numPages; i++) {
+ freePages.clear();
+ for (int i = 0; i < pageBudget; i++) {
buckets[i].cachedPage = null;
}
open = false;
@@ -343,11 +357,11 @@
public String dumpState() {
StringBuilder sb = new StringBuilder();
- sb.append(String.format("Page size = %d\n", pageSize));
- sb.append(String.format("Capacity = %d\n", numPages));
- sb.append(String.format("Allocated pages = %d\n", pages.size()));
- sb.append(String.format("Allocated large pages = %d\n", largePages.get()));
- sb.append(String.format("Next free page = %d\n", nextFree));
+ sb.append(String.format("Page size = %d%n", pageSize));
+ sb.append(String.format("Page budget = %d%n", pageBudget));
+ sb.append(String.format("Used pages = %d%n", used.get()));
+ sb.append(String.format("Used large pages = %d%n", largePages.get()));
+ sb.append(String.format("Available free pages = %d%n", freePages.size()));
return sb.toString();
}
@@ -356,11 +370,6 @@
return fileMapManager;
}
- @Override
- public boolean isFull() {
- return (nextFree + largePages.get()) >= numPages;
- }
-
private static class CacheBucket {
private final ReentrantLock bucketLock;
private VirtualPage cachedPage;
@@ -376,14 +385,6 @@
}
@Override
- public void adviseWontNeed(ICachedPage page) {
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.log(Level.INFO, "Calling adviseWontNeed on " + this.getClass().getName()
- + " makes no sense as this BufferCache cannot evict pages");
- }
- }
-
- @Override
public void returnPage(ICachedPage page) {
}
@@ -410,11 +411,6 @@
}
@Override
- public void setPageDiskId(ICachedPage page, long dpid) {
-
- }
-
- @Override
public void returnPage(ICachedPage page, boolean reinsert) {
throw new UnsupportedOperationException("Virtual buffer caches don't have FIFO writers");
}
@@ -449,7 +445,7 @@
map.put("class", getClass().getSimpleName());
map.put("allocator", allocator.toString());
map.put("pageSize", pageSize);
- map.put("numPages", numPages);
+ map.put("pageBudget", pageBudget);
map.put("open", open);
return map;
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/inmemory/InMemoryInvertedIndex.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/inmemory/InMemoryInvertedIndex.java
index cf40a7a..77dc751 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/inmemory/InMemoryInvertedIndex.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/inmemory/InMemoryInvertedIndex.java
@@ -147,7 +147,7 @@
@Override
public long getMemoryAllocationSize() {
IBufferCache virtualBufferCache = btree.getBufferCache();
- return virtualBufferCache.getNumPages() * virtualBufferCache.getPageSize();
+ return (long) virtualBufferCache.getPageBudget() * virtualBufferCache.getPageSize();
}
@Override
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-rtree/src/main/java/org/apache/hyracks/storage/am/rtree/impls/RTree.java b/hyracks-fullstack/hyracks/hyracks-storage-am-rtree/src/main/java/org/apache/hyracks/storage/am/rtree/impls/RTree.java
index ee7c827..750a2fa 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-rtree/src/main/java/org/apache/hyracks/storage/am/rtree/impls/RTree.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-rtree/src/main/java/org/apache/hyracks/storage/am/rtree/impls/RTree.java
@@ -762,7 +762,7 @@
}
public RTreeAccessor createAccessor(IModificationOperationCallback modificationCallback,
- ISearchOperationCallback searchCallback, int[] nonIndexFields) {
+ ISearchOperationCallback searchCallback, int[] nonIndexFields) {
return new RTreeAccessor(this, modificationCallback, searchCallback, nonIndexFields);
}
@@ -1008,7 +1008,7 @@
int finalPageId = freePageManager.takePage(metaFrame);
n.pageId = finalPageId;
- bufferCache.setPageDiskId(n.page, BufferedFileHandle.getDiskPageId(getFileId(), finalPageId));
+ n.page.setDiskPageId(BufferedFileHandle.getDiskPageId(getFileId(), finalPageId));
//else we are looking at a leaf
}
//set next guide MBR
@@ -1071,9 +1071,8 @@
} else {
prevNodeFrontierPages.set(level, finalPageId);
}
- bufferCache.setPageDiskId(frontier.page, BufferedFileHandle.getDiskPageId(getFileId(), finalPageId));
+ frontier.page.setDiskPageId(BufferedFileHandle.getDiskPageId(getFileId(), finalPageId));
pagesToWrite.add(frontier.page);
-
lowerFrame = prevInteriorFrame;
lowerFrame.setPage(frontier.page);
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/BufferCache.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/BufferCache.java
index fcea8e0..d0f4965 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/BufferCache.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/BufferCache.java
@@ -138,7 +138,7 @@
}
@Override
- public int getNumPages() {
+ public int getPageBudget() {
return pageReplacementStrategy.getMaxAllowedNumPages();
}
@@ -161,33 +161,6 @@
}
@Override
- public ICachedPage tryPin(long dpid) throws HyracksDataException {
- // Calling the pinSanityCheck should be used only for debugging, since
- // the synchronized block over the fileInfoMap is a hot spot.
- if (DEBUG) {
- pinSanityCheck(dpid);
- }
- CachedPage cPage = null;
- int hash = hash(dpid);
- CacheBucket bucket = pageMap[hash];
- bucket.bucketLock.lock();
- try {
- cPage = bucket.cachedPage;
- while (cPage != null) {
- if (cPage.dpid == dpid) {
- cPage.pinCount.incrementAndGet();
- pageReplacementStrategy.notifyCachePageAccess(cPage);
- return cPage;
- }
- cPage = cPage.next;
- }
- } finally {
- bucket.bucketLock.unlock();
- }
- return cPage;
- }
-
- @Override
public ICachedPage pin(long dpid, boolean newPage) throws HyracksDataException {
// Calling the pinSanityCheck should be used only for debugging, since
// the synchronized block over the fileInfoMap is a hot spot.
@@ -978,7 +951,7 @@
}
@Override
- public void flushDirtyPage(ICachedPage page) throws HyracksDataException {
+ public void flush(ICachedPage page) throws HyracksDataException {
// Assumes the caller has pinned the page.
cleanerThread.cleanPage((CachedPage) page, true);
}
@@ -1177,11 +1150,6 @@
}
@Override
- public void adviseWontNeed(ICachedPage page) {
- pageReplacementStrategy.adviseWontNeed((ICachedPageInternal) page);
- }
-
- @Override
public ICachedPage confiscatePage(long dpid) throws HyracksDataException {
return confiscatePage(dpid, 1);
}
@@ -1329,17 +1297,21 @@
finishQueue();
if (cycleCount > MAX_PIN_ATTEMPT_CYCLES) {
cycleCount = 0; // suppress warning below
- throw new HyracksDataException("Unable to find free page in buffer cache after "
- + MAX_PIN_ATTEMPT_CYCLES + " cycles (buffer cache undersized?)"
- + (DEBUG ? " ; " + (masterPinCount.get() - startingPinCount)
- + " successful pins since start of cycle" : ""));
+ throw new HyracksDataException(
+ "Unable to find free page in buffer cache after " + MAX_PIN_ATTEMPT_CYCLES
+ + " cycles (buffer cache undersized?)" + (DEBUG
+ ? " ; " + (masterPinCount.get() - startingPinCount)
+ + " successful pins since start of cycle"
+ : ""));
}
}
} finally {
if (cycleCount > PIN_ATTEMPT_CYCLES_WARNING_THRESHOLD && LOGGER.isLoggable(Level.WARNING)) {
LOGGER.warning("Took " + cycleCount + " cycles to find free page in buffer cache. (buffer cache "
- + "undersized?)" + (DEBUG ? " ; " + (masterPinCount.get() - startingPinCount)
- + " successful pins since start of cycle" : ""));
+ + "undersized?)" + (DEBUG
+ ? " ; " + (masterPinCount.get() - startingPinCount)
+ + " successful pins since start of cycle"
+ : ""));
}
}
}
@@ -1402,13 +1374,8 @@
}
@Override
- public void setPageDiskId(ICachedPage page, long dpid) {
- ((CachedPage) page).dpid = dpid;
- }
-
- @Override
public IFIFOPageQueue createFIFOQueue() {
- return fifoWriter.createQueue(FIFOLocalWriter.instance());
+ return fifoWriter.createQueue(FIFOLocalWriter.INSTANCE);
}
@Override
@@ -1430,10 +1397,6 @@
}
@Override
- /**
- * _ONLY_ call this if you absolutely, positively know this file has no dirty pages in the cache!
- * Bypasses the normal lifecycle of a file handle and evicts all references to it immediately.
- */
public void purgeHandle(int fileId) throws HyracksDataException {
synchronized (fileInfoMap) {
BufferedFileHandle fh = fileInfoMap.get(fileId);
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/CachedPage.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/CachedPage.java
index 76bbd4c..bc0a04e 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/CachedPage.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/CachedPage.java
@@ -46,7 +46,7 @@
private final StackTraceElement[] ctorStack;
//Constructor for making dummy entry for FIFO queue
- public CachedPage(){
+ public CachedPage() {
this.cpid = -1;
this.buffer = null;
this.pageReplacementStrategy = null;
@@ -55,7 +55,7 @@
pinCount = null;
queueInfo = null;
replacementStrategyObject = null;
- latch =null;
+ latch = null;
ctorStack = DEBUG ? new Throwable().getStackTrace() : null;
}
@@ -195,4 +195,14 @@
void setNext(CachedPage next) {
this.next = next;
}
+
+ @Override
+ public void setDiskPageId(long dpid) {
+ this.dpid = dpid;
+ }
+
+ @Override
+ public boolean isLargePage() {
+ return multiplier > 1;
+ }
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/DebugBufferCache.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/DebugBufferCache.java
index 8f7a965..f3de1c1 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/DebugBufferCache.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/DebugBufferCache.java
@@ -77,11 +77,6 @@
}
@Override
- public ICachedPage tryPin(long dpid) throws HyracksDataException {
- return bufferCache.tryPin(dpid);
- }
-
- @Override
public ICachedPage pin(long dpid, boolean newPage) throws HyracksDataException {
ICachedPage page = bufferCache.pin(dpid, newPage);
pinCount.addAndGet(1);
@@ -105,8 +100,8 @@
}
@Override
- public int getNumPages() {
- return bufferCache.getNumPages();
+ public int getPageBudget() {
+ return bufferCache.getPageBudget();
}
@Override
@@ -168,8 +163,8 @@
}
@Override
- public void flushDirtyPage(ICachedPage page) throws HyracksDataException {
- bufferCache.flushDirtyPage(page);
+ public void flush(ICachedPage page) throws HyracksDataException {
+ bufferCache.flush(page);
}
@Override
@@ -183,11 +178,6 @@
}
@Override
- public void adviseWontNeed(ICachedPage page) {
- bufferCache.adviseWontNeed(page);
- }
-
- @Override
public ICachedPage confiscatePage(long dpid) throws HyracksDataException {
return bufferCache.confiscatePage(dpid);
}
@@ -214,12 +204,6 @@
}
@Override
- public void setPageDiskId(ICachedPage page, long dpid) {
- // TODO Auto-generated method stub
-
- }
-
- @Override
public void returnPage(ICachedPage page, boolean reinsert) {
// TODO Auto-generated method stub
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/FIFOLocalWriter.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/FIFOLocalWriter.java
index 6774ddd..9d0b728 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/FIFOLocalWriter.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/FIFOLocalWriter.java
@@ -18,19 +18,15 @@
import org.apache.hyracks.api.exceptions.HyracksDataException;
public class FIFOLocalWriter implements IFIFOPageWriter {
- private static FIFOLocalWriter instance;
+ public static final FIFOLocalWriter INSTANCE = new FIFOLocalWriter();
private static boolean DEBUG = false;
- public static FIFOLocalWriter instance() {
- if(instance == null) {
- instance = new FIFOLocalWriter();
- }
- return instance;
+ private FIFOLocalWriter() {
}
@Override
public void write(ICachedPage page, BufferCache bufferCache) throws HyracksDataException {
- CachedPage cPage = (CachedPage)page;
+ CachedPage cPage = (CachedPage) page;
try {
bufferCache.write(cPage);
} finally {
@@ -43,6 +39,6 @@
@Override
public void sync(int fileId, BufferCache bufferCache) throws HyracksDataException {
- bufferCache.force(fileId,true);
+ bufferCache.force(fileId, true);
}
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/IBufferCache.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/IBufferCache.java
index 789f7b7..28801ea 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/IBufferCache.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/IBufferCache.java
@@ -88,50 +88,185 @@
*/
void deleteFile(FileReference file) throws HyracksDataException;
- ICachedPage tryPin(long dpid) throws HyracksDataException;
-
+ /**
+ * Pin the page so it can't be evicted from the buffer cache...
+ *
+ * @param dpid
+ * page id is a unique id that is a combination of file id and page id
+ * @param newPage
+ * whether this page is expected to be new.
+ * NOTE: undefined:
+ * -- what if the flag is true but the page exists?
+ * -- what if the flag is false but the page doesn't exist
+ * @return the pinned page
+ * @throws HyracksDataException
+ */
ICachedPage pin(long dpid, boolean newPage) throws HyracksDataException;
+ /**
+ * Unpin a pinned page so its buffer can be recycled
+ *
+ * @param page
+ * the page
+ * @throws HyracksDataException
+ */
void unpin(ICachedPage page) throws HyracksDataException;
- void flushDirtyPage(ICachedPage page) throws HyracksDataException;
+ /**
+ * Flush the page if it is dirty
+ *
+ * @param page
+ * the page to flush
+ * @throws HyracksDataException
+ */
+ void flush(ICachedPage page) throws HyracksDataException;
- void adviseWontNeed(ICachedPage page);
-
- ICachedPage confiscatePage(long dpid) throws HyracksDataException;
-
- ICachedPage confiscateLargePage(long dpid, int multiplier, int extraBlockPageId) throws HyracksDataException;
-
- void returnPage(ICachedPage page);
-
- void returnPage(ICachedPage page, boolean reinsert);
-
+ /**
+ * Force bits that have been already flushed to disk
+ * This method doesn't flush all dirty pages to disk but simply calls the sync method on the filesystem api
+ *
+ * @param fileId
+ * the file id
+ * @param metadata
+ * whether metadata should be synced as well
+ * @throws HyracksDataException
+ */
void force(int fileId, boolean metadata) throws HyracksDataException;
+ /**
+ * Take a page such that no one else has access to it
+ *
+ * @param dpid
+ * the unique (fileId,pageId)
+ * @return the confiscated page or null if no page is available
+ * @throws HyracksDataException
+ */
+ ICachedPage confiscatePage(long dpid) throws HyracksDataException;
+
+ /**
+ *
+ * @return the confiscated page or null if no page is available
+ * @throws HyracksDataException
+ */
+ /**
+ * Take a large page such that no one else has access to it
+ *
+ * @param dpid
+ * the unique (fileId,pageId)
+ * @param multiplier
+ * how many multiples of the original page size
+ * @param extraBlockPageId
+ * the page id where the large block comes from
+ * @return
+ * the confiscated page or null if a large page couldn't be found
+ * @throws HyracksDataException
+ */
+ ICachedPage confiscateLargePage(long dpid, int multiplier, int extraBlockPageId) throws HyracksDataException;
+
+ /**
+ * Return and re-insert a confiscated page
+ *
+ * @param page
+ * the confiscated page
+ */
+ void returnPage(ICachedPage page);
+
+ /**
+ * Return a confiscated page
+ *
+ * @param page
+ * the confiscated page
+ * @param reinsert
+ * if true, return the page to the cache, otherwise, destroy
+ */
+ void returnPage(ICachedPage page, boolean reinsert);
+
+ /**
+ * Get the standard page size
+ *
+ * @return the size in bytes
+ */
int getPageSize();
+ /**
+ * Get the standard page size with header if any
+ *
+ * @return the sum of page size and header size in bytes
+ */
int getPageSizeWithHeader();
- int getNumPages();
+ /**
+ * @return the maximum allowed pages in this buffer cahce
+ */
+ int getPageBudget();
+ /**
+ * Get the number of pages used for a file
+ *
+ * @param fileId
+ * the file id
+ * @return the number of pages used for the file
+ * @throws HyracksDataException
+ */
int getNumPagesOfFile(int fileId) throws HyracksDataException;
+ /**
+ * Get the reference count for a file (num of open - num of close)
+ *
+ * @param fileId
+ * the file
+ * @return the reference count
+ */
int getFileReferenceCount(int fileId);
+ /**
+ * Close the buffer cache, all of its files, and release the memory taken by it
+ * The buffer cache is open upon successful instantiation and can't be re-opened
+ *
+ * @throws HyracksDataException
+ */
void close() throws HyracksDataException;
+ /**
+ * @return an instance of {@link IFIFOPageQueue} that can be used to write pages to the file
+ */
IFIFOPageQueue createFIFOQueue();
+ /**
+ * Flush the queued pages written through buffer cache FIFO queues
+ */
void finishQueue();
- void setPageDiskId(ICachedPage page, long dpid);
-
+ // TODO: remove the replication out of the buffer cache interface
+ /**
+ * @return true if replication is enabled, false otherwise
+ */
boolean isReplicationEnabled();
+ /**
+ * @return the io replication manager
+ */
IIOReplicationManager getIOReplicationManager();
+ /**
+ * Deletes the file and recycle all of its pages without flushing them.
+ *
+ * ONLY call this if you absolutely, positively know this file has no dirty pages in the cache!
+ * Bypasses the normal lifecycle of a file handle and evicts all references to it immediately.
+ */
void purgeHandle(int fileId) throws HyracksDataException;
+ /**
+ * Resize the page
+ *
+ * @param page
+ * the page to resize
+ * @param multiplier
+ * how many multiples of the original page size
+ * @param extraPageBlockHelper
+ * helper to determine the location of the resize block
+ * @throws HyracksDataException
+ */
void resizePage(ICachedPage page, int multiplier, IExtraPageBlockHelper extraPageBlockHelper)
throws HyracksDataException;
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/ICachedPage.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/ICachedPage.java
index abbe233..16837b9 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/ICachedPage.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/ICachedPage.java
@@ -41,4 +41,13 @@
int getPageSize();
int getFrameSizeMultiplier();
+
+ void setDiskPageId(long dpid);
+
+ /**
+ * Check if a page is a large page
+ *
+ * @return true if the page is large, false otherwise
+ */
+ boolean isLargePage();
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/IExtraPageBlockHelper.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/IExtraPageBlockHelper.java
index ad7f2f6..607385a 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/IExtraPageBlockHelper.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/IExtraPageBlockHelper.java
@@ -21,6 +21,25 @@
import org.apache.hyracks.api.exceptions.HyracksDataException;
public interface IExtraPageBlockHelper {
+ /**
+ * Get the page id of the free block of size
+ *
+ * @param size
+ * the size of the block
+ * @return
+ * the page id
+ * @throws HyracksDataException
+ */
int getFreeBlock(int size) throws HyracksDataException;
+
+ /**
+ * Release the block at location blockPageId which has size size
+ *
+ * @param blockPageId
+ * the block page id
+ * @param size
+ * the size of the block
+ * @throws HyracksDataException
+ */
void returnFreePageBlock(int blockPageId, int size) throws HyracksDataException;
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/IFIFOPageQueue.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/IFIFOPageQueue.java
index 0fe5767..6c03671 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/IFIFOPageQueue.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/IFIFOPageQueue.java
@@ -17,6 +17,7 @@
import org.apache.hyracks.api.exceptions.HyracksDataException;
+@FunctionalInterface
public interface IFIFOPageQueue {
- public void put(ICachedPage page) throws HyracksDataException;
+ void put(ICachedPage page) throws HyracksDataException;
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/VirtualPage.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/VirtualPage.java
index cfca77a..139a3c4 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/VirtualPage.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/VirtualPage.java
@@ -121,4 +121,14 @@
this.buffer = buffer;
}
+ @Override
+ public void setDiskPageId(long dpid) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public boolean isLargePage() {
+ return multiplier > 1;
+ }
+
}
diff --git a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-common-test/src/test/java/org/apache/hyracks/storage/am/lsm/common/test/VirtualBufferCacheTest.java b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-common-test/src/test/java/org/apache/hyracks/storage/am/lsm/common/test/VirtualBufferCacheTest.java
index 5ff5a11..0e749fc 100644
--- a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-common-test/src/test/java/org/apache/hyracks/storage/am/lsm/common/test/VirtualBufferCacheTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-common-test/src/test/java/org/apache/hyracks/storage/am/lsm/common/test/VirtualBufferCacheTest.java
@@ -25,49 +25,139 @@
import java.util.Random;
import java.util.Set;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.io.FileReference;
+import org.apache.hyracks.api.util.SingleThreadEventProcessor;
import org.apache.hyracks.control.nc.io.IOManager;
import org.apache.hyracks.storage.am.lsm.common.impls.VirtualBufferCache;
import org.apache.hyracks.storage.common.buffercache.HeapBufferAllocator;
import org.apache.hyracks.storage.common.buffercache.ICacheMemoryAllocator;
import org.apache.hyracks.storage.common.buffercache.ICachedPage;
+import org.apache.hyracks.storage.common.buffercache.IExtraPageBlockHelper;
import org.apache.hyracks.storage.common.file.BufferedFileHandle;
import org.apache.hyracks.test.support.TestStorageManagerComponentHolder;
+import org.junit.Assert;
import org.junit.Test;
public class VirtualBufferCacheTest {
- private static final long SEED = 123456789L;
- private static final int NUM_OVERPIN = 128;
- private static final int PAGE_SIZE = 256;
- private static final int NUM_FILES = 10;
- private static final int NUM_PAGES = 1000;
+ /*
+ * Missing tests:
+ * 0. concurrent pinnings for a single file from multiple threads
+ * 1. concurrent create file
+ * 2. file deletes while pages are pinned? Note that currently, the vbc doesn't keep track of number of pinnings
+ */
+ private static class TestExtraPageBlockHelper implements IExtraPageBlockHelper {
+ private final int fileId;
+ private int pinCount;
+ private Set<ICachedPage> pinnedPages;
+ private int totalNumPages;
- private final Random random;
- private final FileState[] fileStates;
-
- private VirtualBufferCache vbc;
- private IOManager ioManager;
-
- public VirtualBufferCacheTest() {
- fileStates = new FileState[NUM_FILES];
- for (int i = 0; i < NUM_FILES; i++) {
- fileStates[i] = new FileState();
+ public TestExtraPageBlockHelper(int fileId) {
+ this.fileId = fileId;
+ pinCount = 0;
+ pinnedPages = new HashSet<>();
}
- random = new Random(SEED);
- vbc = null;
+
+ @Override
+ public int getFreeBlock(int size) throws HyracksDataException {
+ int before = totalNumPages;
+ totalNumPages += size - 1;
+ return before;
+ }
+
+ @Override
+ public void returnFreePageBlock(int blockPageId, int size) throws HyracksDataException {
+ // Do nothing. we don't reclaim large pages from file in this test
+ }
+
+ public void pin(VirtualBufferCache vbc, int multiplier) throws HyracksDataException {
+ ICachedPage p = vbc.pin(BufferedFileHandle.getDiskPageId(fileId, pinCount), true);
+ pinnedPages.add(p);
+ pinCount++;
+ totalNumPages++;
+ if (multiplier > 1) {
+ vbc.resizePage(p, multiplier, this);
+ }
+ }
+
}
private static class FileState {
- private int fileId;
+ private final VirtualBufferCache vbc;
+ private final int fileId;
+ private final TestExtraPageBlockHelper helper;
private FileReference fileRef;
- private int pinCount;
- private Set<ICachedPage> pinnedPages;
- public FileState() {
- fileId = -1;
- fileRef = null;
- pinCount = 0;
- pinnedPages = new HashSet<>();
+ public FileState(VirtualBufferCache vbc, String fileName) throws HyracksDataException {
+ this.vbc = vbc;
+ IOManager ioManager = TestStorageManagerComponentHolder.getIOManager();
+ fileRef = ioManager.resolve(fileName);
+ vbc.createFile(fileRef);
+ fileId = vbc.getFileMapProvider().lookupFileId(fileRef);
+ helper = new TestExtraPageBlockHelper(fileId);
+ }
+
+ public void pin(int multiplier) throws HyracksDataException {
+ helper.pin(vbc, multiplier);
+ }
+ }
+
+ private static class Request {
+ private enum Type {
+ PIN_PAGE,
+ CALLBACK
+ }
+
+ private final Type type;
+ private boolean done;
+
+ public Request(Type type) {
+ this.type = type;
+ done = false;
+ }
+
+ Type getType() {
+ return type;
+ }
+
+ synchronized void complete() {
+ done = true;
+ notifyAll();
+ }
+
+ synchronized void await() throws InterruptedException {
+ while (!done) {
+ wait();
+ }
+ }
+ }
+
+ public class User extends SingleThreadEventProcessor<Request> {
+ private final VirtualBufferCache vbc;
+ private final FileState fileState;
+
+ public User(String name, VirtualBufferCache vbc, FileState fileState) throws HyracksDataException {
+ super(name);
+ this.vbc = vbc;
+ this.fileState = fileState;
+ }
+
+ @Override
+ protected void handle(Request req) throws Exception {
+ try {
+ switch (req.getType()) {
+ case PIN_PAGE:
+ ICachedPage p = vbc.pin(
+ BufferedFileHandle.getDiskPageId(fileState.fileId, fileState.helper.pinCount), true);
+ fileState.helper.pinnedPages.add(p);
+ ++fileState.helper.pinCount;
+ break;
+ default:
+ break;
+ }
+ } finally {
+ req.complete();
+ }
}
}
@@ -79,60 +169,184 @@
* of pages.
*/
@Test
- public void test01() throws Exception {
- TestStorageManagerComponentHolder.init(PAGE_SIZE, NUM_PAGES, NUM_FILES);
- ioManager = TestStorageManagerComponentHolder.getIOManager();
+ public void testDisjointPins() throws Exception {
+ final int numOverpin = 128;
+ final int pageSize = 256;
+ final int numFiles = 10;
+ final int numPages = 1000;
+ Random random = new Random();
ICacheMemoryAllocator allocator = new HeapBufferAllocator();
- vbc = new VirtualBufferCache(allocator, PAGE_SIZE, NUM_PAGES);
+ VirtualBufferCache vbc = new VirtualBufferCache(allocator, pageSize, numPages);
vbc.open();
- createFiles();
+ FileState[] fileStates = new FileState[numFiles];
+ for (int i = 0; i < numFiles; i++) {
+ fileStates[i] = new FileState(vbc, String.format("f%d", i));
+ }
- kPins(NUM_PAGES);
- assertTrue(pagesDisjointed());
+ kPins(numPages, numFiles, fileStates, vbc, random);
+ assertTrue(pagesDisjointed(numFiles, fileStates));
- kPins(NUM_OVERPIN);
- assertTrue(pagesDisjointed());
+ kPins(numOverpin, numFiles, fileStates, vbc, random);
+ assertTrue(pagesDisjointed(numFiles, fileStates));
- deleteFiles();
+ deleteFilesAndCheckMemory(numFiles, fileStates, vbc);
vbc.close();
}
- private boolean pagesDisjointed() {
+ @Test
+ public void testConcurrentUsersDifferentFiles() throws Exception {
+ final int numOverpin = 128;
+ final int pageSize = 256;
+ final int numFiles = 10;
+ final int numPages = 1000;
+ Random random = new Random();
+ ICacheMemoryAllocator allocator = new HeapBufferAllocator();
+ VirtualBufferCache vbc = new VirtualBufferCache(allocator, pageSize, numPages);
+ vbc.open();
+ FileState[] fileStates = new FileState[numFiles];
+ User[] users = new User[numFiles];
+ for (int i = 0; i < numFiles; i++) {
+ fileStates[i] = new FileState(vbc, String.format("f%d", i));
+ users[i] = new User("User-" + i, vbc, fileStates[i]);
+ }
+ for (int i = 0; i < numPages; i++) {
+ int fsIdx = random.nextInt(numFiles);
+ users[fsIdx].add(new Request(Request.Type.PIN_PAGE));
+ }
+ // ensure all are done
+ wait(users);
+ assertTrue(pagesDisjointed(numFiles, fileStates));
+ for (int i = 0; i < numOverpin; i++) {
+ int fsIdx = random.nextInt(numFiles);
+ users[fsIdx].add(new Request(Request.Type.PIN_PAGE));
+ }
+ // ensure all are done
+ wait(users);
+ assertTrue(pagesDisjointed(numFiles, fileStates));
+ // shutdown users
+ shutdown(users);
+ deleteFilesAndCheckMemory(numFiles, fileStates, vbc);
+ vbc.close();
+ }
+
+ private void shutdown(User[] users) throws HyracksDataException, InterruptedException {
+ for (int i = 0; i < users.length; i++) {
+ users[i].stop();
+ }
+ }
+
+ private void wait(User[] users) throws InterruptedException {
+ for (int i = 0; i < users.length; i++) {
+ Request callback = new Request(Request.Type.CALLBACK);
+ users[i].add(callback);
+ callback.await();
+ }
+ }
+
+ @Test
+ public void testLargePages() throws Exception {
+ final int pageSize = 256;
+ final int numFiles = 3;
+ final int numPages = 1000;
+ ICacheMemoryAllocator allocator = new HeapBufferAllocator();
+ VirtualBufferCache vbc = new VirtualBufferCache(allocator, pageSize, numPages);
+ vbc.open();
+ FileState[] fileStates = new FileState[numFiles];
+ for (int i = 0; i < numFiles; i++) {
+ fileStates[i] = new FileState(vbc, String.format("f%d", i));
+ }
+ // Get a large page that is 52 pages size
+ int fileIdx = 0;
+ FileState f = fileStates[fileIdx];
+ f.pin(52);
+ // Assert that 52 pages are accounted for
+ Assert.assertEquals(52, vbc.getUsage());
+ // Delete file
+ vbc.deleteFile(f.fileId);
+ // Assert that usage fell down to 0
+ Assert.assertEquals(0, vbc.getUsage());
+ // Assert that no pages are pre-allocated
+ Assert.assertEquals(0, vbc.getPreAllocatedPages());
+ // Next file
+ fileIdx++;
+ f = fileStates[fileIdx];
+ // Pin small pages to capacity
+ int count = 0;
+ while (vbc.getUsage() <= vbc.getPageBudget()) {
+ f.pin(1);
+ count++;
+ Assert.assertEquals(count, vbc.getUsage());
+ }
+ // Delete file
+ vbc.deleteFile(f.fileRef);
+ // Assert that usage fell down to 0
+ Assert.assertEquals(0, vbc.getUsage());
+ // Assert that small pages are available
+ Assert.assertEquals(vbc.getPreAllocatedPages(), vbc.getPageBudget());
+ // Next file
+ fileIdx++;
+ f = fileStates[fileIdx];
+ count = 0;
+ int sizeOfLargePage = 4;
+ while (vbc.getUsage() <= vbc.getPageBudget()) {
+ f.pin(sizeOfLargePage);
+ count += sizeOfLargePage;
+ Assert.assertEquals(count, vbc.getUsage());
+ Assert.assertEquals(Integer.max(0, vbc.getPageBudget() - count), vbc.getPreAllocatedPages());
+ }
+ // Delete file
+ vbc.deleteFile(f.fileId);
+ // Assert that usage fell down to 0
+ Assert.assertEquals(0, vbc.getUsage());
+ // Assert that no pages are pre-allocated
+ Assert.assertEquals(0, vbc.getPreAllocatedPages());
+ vbc.close();
+ }
+
+ private boolean pagesDisjointed(int numFiles, FileState[] fileStates) {
boolean disjoint = true;
- for (int i = 0; i < NUM_FILES; i++) {
+ for (int i = 0; i < numFiles; i++) {
FileState fi = fileStates[i];
- for (int j = i + 1; j < NUM_FILES; j++) {
+ for (int j = i + 1; j < numFiles; j++) {
FileState fj = fileStates[j];
- disjoint = disjoint && Collections.disjoint(fi.pinnedPages, fj.pinnedPages);
+ disjoint = disjoint && Collections.disjoint(fi.helper.pinnedPages, fj.helper.pinnedPages);
}
}
return disjoint;
}
- private void createFiles() throws Exception {
- for (int i = 0; i < NUM_FILES; i++) {
- FileState f = fileStates[i];
- String fName = String.format("f%d", i);
- f.fileRef = ioManager.resolve(fName);
- vbc.createFile(f.fileRef);
- f.fileId = vbc.getFileMapProvider().lookupFileId(f.fileRef);
+ private void deleteFilesAndCheckMemory(int numFiles, FileState[] fileStates, VirtualBufferCache vbc)
+ throws Exception {
+ // Get the size of the buffer cache
+ int totalInStates = 0;
+ for (int i = 0; i < numFiles; i++) {
+ totalInStates += fileStates[i].helper.pinnedPages.size();
}
- }
-
- private void deleteFiles() throws Exception {
- for (int i = 0; i < NUM_FILES; i++) {
+ Assert.assertEquals(totalInStates, vbc.getUsage());
+ int totalFree = 0;
+ Assert.assertEquals(totalFree, vbc.getPreAllocatedPages());
+ boolean hasLargePages = vbc.getLargePages() > 0;
+ for (int i = 0; i < numFiles; i++) {
+ int expectedToBeReclaimed = 0;
+ for (ICachedPage page : fileStates[i].helper.pinnedPages) {
+ expectedToBeReclaimed += page.getFrameSizeMultiplier();
+ }
vbc.deleteFile(fileStates[i].fileId);
+ totalFree += expectedToBeReclaimed;
+ Assert.assertEquals(totalInStates - totalFree, vbc.getUsage());
+ if (!hasLargePages) {
+ Assert.assertEquals(Integer.max(0, vbc.getPageBudget() - vbc.getUsage()), vbc.getPreAllocatedPages());
+ }
}
}
- private void kPins(int k) throws Exception {
+ private void kPins(int k, int numFiles, FileState[] fileStates, VirtualBufferCache vbc, Random random)
+ throws Exception {
int numPinned = 0;
while (numPinned < k) {
- int fsIdx = random.nextInt(NUM_FILES);
+ int fsIdx = random.nextInt(numFiles);
FileState f = fileStates[fsIdx];
- ICachedPage p = vbc.pin(BufferedFileHandle.getDiskPageId(f.fileId, f.pinCount), true);
- f.pinnedPages.add(p);
- ++f.pinCount;
+ f.pin(1);
++numPinned;
}
}
diff --git a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-common-test/src/test/java/org/apache/hyracks/storage/common/BufferCacheTest.java b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-common-test/src/test/java/org/apache/hyracks/storage/common/BufferCacheTest.java
index e34ee0e..26ad457 100644
--- a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-common-test/src/test/java/org/apache/hyracks/storage/common/BufferCacheTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-common-test/src/test/java/org/apache/hyracks/storage/common/BufferCacheTest.java
@@ -78,10 +78,6 @@
ICachedPage page = null;
- // tryPin should fail
- page = bufferCache.tryPin(BufferedFileHandle.getDiskPageId(fileId, testPageId));
- Assert.assertNull(page);
-
// pin page should succeed
page = bufferCache.pin(BufferedFileHandle.getDiskPageId(fileId, testPageId), true);
page.acquireWriteLatch();
@@ -89,12 +85,6 @@
for (int i = 0; i < num; i++) {
page.getBuffer().putInt(i * 4, i);
}
-
- // try pin should succeed
- ICachedPage page2 = bufferCache.tryPin(BufferedFileHandle.getDiskPageId(fileId, testPageId));
- Assert.assertNotNull(page2);
- bufferCache.unpin(page2);
-
} finally {
page.releaseWriteLatch(true);
bufferCache.unpin(page);
@@ -102,31 +92,11 @@
bufferCache.closeFile(fileId);
- // This code is commented because the method pinSanityCheck in the BufferCache is commented.
- /*boolean exceptionThrown = false;
-
- // tryPin should fail since file is not open
- try {
- page = bufferCache.tryPin(BufferedFileHandle.getDiskPageId(fileId, testPageId));
- } catch (HyracksDataException e) {
- exceptionThrown = true;
- }
- Assert.assertTrue(exceptionThrown);
-
- // pin should fail since file is not open
- exceptionThrown = false;
- try {
- page = bufferCache.pin(BufferedFileHandle.getDiskPageId(fileId, testPageId), false);
- } catch (HyracksDataException e) {
- exceptionThrown = true;
- }
- Assert.assertTrue(exceptionThrown);*/
-
// open file again
bufferCache.openFile(fileId);
// tryPin should succeed because page should still be cached
- page = bufferCache.tryPin(BufferedFileHandle.getDiskPageId(fileId, testPageId));
+ page = bufferCache.pin(BufferedFileHandle.getDiskPageId(fileId, testPageId), false);
Assert.assertNotNull(page);
page.acquireReadLatch();
try {