ASTERIXDB-1058: make Asterix compatible with lazy LSM memory allocation
- Adapt memory budget calculation to lazy LSM memory allocation.
- Add IDatasetLifecycleManager interface.
Change-Id: I4ea1eb129fe3043d43b077473dc29d17a97dfcc2
Reviewed-on: https://asterix-gerrit.ics.uci.edu/408
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Ian Maxon <imaxon@apache.org>
Reviewed-by: Young-Seok Kim <kisskys@gmail.com>
diff --git a/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixAppRuntimeContext.java b/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixAppRuntimeContext.java
index 2a15384..2e7c23f 100644
--- a/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixAppRuntimeContext.java
+++ b/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixAppRuntimeContext.java
@@ -24,6 +24,7 @@
import org.apache.asterix.common.api.AsterixThreadExecutor;
import org.apache.asterix.common.api.IAsterixAppRuntimeContext;
+import org.apache.asterix.common.api.IDatasetLifecycleManager;
import org.apache.asterix.common.config.AsterixCompilerProperties;
import org.apache.asterix.common.config.AsterixExternalProperties;
import org.apache.asterix.common.config.AsterixFeedProperties;
@@ -42,16 +43,13 @@
import org.apache.asterix.common.transactions.ITransactionSubsystem;
import org.apache.asterix.feeds.FeedManager;
import org.apache.asterix.metadata.bootstrap.MetadataPrimaryIndexes;
-import org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository;
import org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepositoryFactory;
-import org.apache.asterix.transaction.management.service.logging.LogManager;
import org.apache.asterix.transaction.management.service.transaction.TransactionSubsystem;
import org.apache.hyracks.api.application.INCApplicationContext;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.io.IIOManager;
import org.apache.hyracks.api.lifecycle.ILifeCycleComponent;
import org.apache.hyracks.api.lifecycle.ILifeCycleComponentManager;
-import org.apache.hyracks.storage.am.common.api.IIndexLifecycleManager;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationScheduler;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMMergePolicyFactory;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMOperationTracker;
@@ -99,7 +97,7 @@
private AsterixBuildProperties buildProperties;
private AsterixThreadExecutor threadExecutor;
- private DatasetLifecycleManager indexLifecycleManager;
+ private IDatasetLifecycleManager datasetLifecycleManager;
private IFileMapManager fileMapManager;
private IBufferCache bufferCache;
private ITransactionSubsystem txnSubsystem;
@@ -112,7 +110,7 @@
private IFeedManager feedManager;
- public AsterixAppRuntimeContext(INCApplicationContext ncApplicationContext) throws AsterixException {
+ public AsterixAppRuntimeContext(INCApplicationContext ncApplicationContext) {
this.ncApplicationContext = ncApplicationContext;
compilerProperties = new AsterixCompilerProperties(ASTERIX_PROPERTIES_ACCESSOR);
externalProperties = new AsterixExternalProperties(ASTERIX_PROPERTIES_ACCESSOR);
@@ -143,8 +141,7 @@
ILocalResourceRepositoryFactory persistentLocalResourceRepositoryFactory = new PersistentLocalResourceRepositoryFactory(
ioManager, ncApplicationContext.getNodeId());
- localResourceRepository = (PersistentLocalResourceRepository) persistentLocalResourceRepositoryFactory
- .createRepository();
+ localResourceRepository = persistentLocalResourceRepositoryFactory.createRepository();
resourceIdFactory = (new ResourceIdFactoryProvider(localResourceRepository)).createResourceIdFactory();
IAsterixAppRuntimeContextProvider asterixAppRuntimeContextProvider = new AsterixAppRuntimeContextProdiverForRecovery(
@@ -152,8 +149,8 @@
txnSubsystem = new TransactionSubsystem(ncApplicationContext.getNodeId(), asterixAppRuntimeContextProvider,
txnProperties);
- indexLifecycleManager = new DatasetLifecycleManager(storageProperties, localResourceRepository,
- MetadataPrimaryIndexes.FIRST_AVAILABLE_USER_DATASET_ID, (LogManager) txnSubsystem.getLogManager());
+ datasetLifecycleManager = new DatasetLifecycleManager(storageProperties, localResourceRepository,
+ MetadataPrimaryIndexes.FIRST_AVAILABLE_USER_DATASET_ID, txnSubsystem.getLogManager());
isShuttingdown = false;
@@ -165,7 +162,7 @@
lccm.register((ILifeCycleComponent) bufferCache);
lccm.register((ILifeCycleComponent) txnSubsystem.getTransactionManager());
lccm.register((ILifeCycleComponent) txnSubsystem.getLogManager());
- lccm.register((ILifeCycleComponent) indexLifecycleManager);
+ lccm.register((ILifeCycleComponent) datasetLifecycleManager);
lccm.register((ILifeCycleComponent) txnSubsystem.getLockManager());
lccm.register((ILifeCycleComponent) txnSubsystem.getRecoveryManager());
}
@@ -193,8 +190,8 @@
return txnSubsystem;
}
- public IIndexLifecycleManager getIndexLifecycleManager() {
- return indexLifecycleManager;
+ public IDatasetLifecycleManager getDatasetLifecycleManager() {
+ return datasetLifecycleManager;
}
public double getBloomFilterFalsePositiveRate() {
@@ -258,12 +255,12 @@
@Override
public List<IVirtualBufferCache> getVirtualBufferCaches(int datasetID) {
- return indexLifecycleManager.getVirtualBufferCaches(datasetID);
+ return datasetLifecycleManager.getVirtualBufferCaches(datasetID);
}
@Override
public ILSMOperationTracker getLSMBTreeOperationTracker(int datasetID) {
- return indexLifecycleManager.getOperationTracker(datasetID);
+ return datasetLifecycleManager.getOperationTracker(datasetID);
}
@Override
diff --git a/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixAppRuntimeContextProdiverForRecovery.java b/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixAppRuntimeContextProdiverForRecovery.java
index 1d504dd..570c3c9 100644
--- a/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixAppRuntimeContextProdiverForRecovery.java
+++ b/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixAppRuntimeContextProdiverForRecovery.java
@@ -22,10 +22,10 @@
import org.apache.asterix.common.api.AsterixThreadExecutor;
import org.apache.asterix.common.api.IAsterixAppRuntimeContext;
+import org.apache.asterix.common.api.IDatasetLifecycleManager;
import org.apache.asterix.common.transactions.IAsterixAppRuntimeContextProvider;
import org.apache.asterix.common.transactions.ITransactionSubsystem;
import org.apache.hyracks.api.io.IIOManager;
-import org.apache.hyracks.storage.am.common.api.IIndexLifecycleManager;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationScheduler;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMOperationTracker;
import org.apache.hyracks.storage.am.lsm.common.api.IVirtualBufferCache;
@@ -58,8 +58,8 @@
}
@Override
- public IIndexLifecycleManager getIndexLifecycleManager() {
- return asterixAppRuntimeContext.getIndexLifecycleManager();
+ public IDatasetLifecycleManager getDatasetLifecycleManager() {
+ return asterixAppRuntimeContext.getDatasetLifecycleManager();
}
@Override
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/api/IAsterixAppRuntimeContext.java b/asterix-common/src/main/java/org/apache/asterix/common/api/IAsterixAppRuntimeContext.java
index 3c5549f..f86ed8a 100644
--- a/asterix-common/src/main/java/org/apache/asterix/common/api/IAsterixAppRuntimeContext.java
+++ b/asterix-common/src/main/java/org/apache/asterix/common/api/IAsterixAppRuntimeContext.java
@@ -28,7 +28,6 @@
import org.apache.asterix.common.transactions.ITransactionSubsystem;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.io.IIOManager;
-import org.apache.hyracks.storage.am.common.api.IIndexLifecycleManager;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationScheduler;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMMergePolicyFactory;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMOperationTracker;
@@ -60,7 +59,7 @@
public ILocalResourceRepository getLocalResourceRepository();
- public IIndexLifecycleManager getIndexLifecycleManager();
+ public IDatasetLifecycleManager getDatasetLifecycleManager();
public ResourceIdFactory getResourceIdFactory();
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/api/IAsterixRuntimeComponentsProvider.java b/asterix-common/src/main/java/org/apache/asterix/common/api/IAsterixRuntimeComponentsProvider.java
index f978a8a..1e4869e 100644
--- a/asterix-common/src/main/java/org/apache/asterix/common/api/IAsterixRuntimeComponentsProvider.java
+++ b/asterix-common/src/main/java/org/apache/asterix/common/api/IAsterixRuntimeComponentsProvider.java
@@ -19,7 +19,6 @@
package org.apache.asterix.common.api;
import org.apache.hyracks.api.io.IIOManager;
-import org.apache.hyracks.storage.am.common.api.IIndexLifecycleManager;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallbackProvider;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationScheduler;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMMergePolicy;
@@ -35,7 +34,7 @@
public IFileMapProvider getFileMapManager();
- public IIndexLifecycleManager getIndexLifecycleManager();
+ public IDatasetLifecycleManager getDatasetLifecycleManager();
public double getBloomFilterFalsePositiveRate();
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetLifecycleManager.java b/asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetLifecycleManager.java
new file mode 100644
index 0000000..86e6db5
--- /dev/null
+++ b/asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetLifecycleManager.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.api;
+
+import java.util.List;
+
+import org.apache.asterix.common.context.DatasetLifecycleManager.DatasetInfo;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.storage.am.common.api.IIndex;
+import org.apache.hyracks.storage.am.common.api.IIndexLifecycleManager;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMOperationTracker;
+import org.apache.hyracks.storage.am.lsm.common.api.IVirtualBufferCache;
+
+public interface IDatasetLifecycleManager extends IIndexLifecycleManager {
+ /**
+ * @param datasetID
+ * @param resourceID
+ * @return The corresponding index, or null if it is not found in the registered indexes.
+ * @throws HyracksDataException
+ */
+ IIndex getIndex(int datasetID, long resourceID) throws HyracksDataException;
+
+ /**
+ * Allocates the memory budget of the dataset in the virtual buffer cache.
+ * @param datasetID
+ * @throws HyracksDataException
+ */
+ void allocateDatasetMemory(int datasetID) throws HyracksDataException;
+
+ /**
+ * Flushes all open datasets synchronously.
+ * @throws HyracksDataException
+ */
+ void flushAllDatasets() throws HyracksDataException;
+
+ /**
+ * Schedules asynchronous flush on datasets that have memory components with first LSN < nonSharpCheckpointTargetLSN.
+ * @param nonSharpCheckpointTargetLSN
+ * @throws HyracksDataException
+ */
+ void scheduleAsyncFlushForLaggingDatasets(long nonSharpCheckpointTargetLSN) throws HyracksDataException;
+
+ /**
+ * creates (if necessary) and returns the dataset info.
+ * @param datasetID
+ * @return
+ */
+ DatasetInfo getDatasetInfo(int datasetID);
+
+ /**
+ * @param datasetId
+ * the dataset id to be flushed.
+ * @param asyncFlush
+ * a flag indicating whether to wait for the flush to complete or not.
+ * @throws HyracksDataException
+ */
+ void flushDataset(int datasetId, boolean asyncFlush) throws HyracksDataException;
+
+ /**
+ * creates (if necessary) and returns the primary index operation tracker of a dataset.
+ * @param datasetID
+ * @return
+ */
+ ILSMOperationTracker getOperationTracker(int datasetID);
+
+ /**
+ * creates (if necessary) and returns the dataset virtual buffer caches.
+ * @param datasetID
+ * @return
+ */
+ List<IVirtualBufferCache> getVirtualBufferCaches(int datasetID);
+}
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/context/BaseOperationTracker.java b/asterix-common/src/main/java/org/apache/asterix/common/context/BaseOperationTracker.java
index 3b824ec..d78e2cb 100644
--- a/asterix-common/src/main/java/org/apache/asterix/common/context/BaseOperationTracker.java
+++ b/asterix-common/src/main/java/org/apache/asterix/common/context/BaseOperationTracker.java
@@ -18,6 +18,7 @@
*/
package org.apache.asterix.common.context;
+import org.apache.asterix.common.api.IDatasetLifecycleManager;
import org.apache.asterix.common.context.DatasetLifecycleManager.DatasetInfo;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.storage.am.common.api.IModificationOperationCallback;
@@ -28,11 +29,11 @@
public class BaseOperationTracker implements ILSMOperationTracker {
- protected final DatasetLifecycleManager datasetLifecycleManager;
+ protected final IDatasetLifecycleManager datasetLifecycleManager;
protected final int datasetID;
- protected DatasetInfo dsInfo;
-
- public BaseOperationTracker(DatasetLifecycleManager datasetLifecycleManager, int datasetID, DatasetInfo dsInfo) {
+ protected final DatasetInfo dsInfo;
+
+ public BaseOperationTracker(IDatasetLifecycleManager datasetLifecycleManager, int datasetID, DatasetInfo dsInfo) {
this.datasetLifecycleManager = datasetLifecycleManager;
this.datasetID = datasetID;
this.dsInfo = dsInfo;
@@ -59,10 +60,6 @@
IModificationOperationCallback modificationCallback) throws HyracksDataException {
}
- public void setDatasetInfo(DatasetInfo dsInfo){
- this.dsInfo = dsInfo;
- }
-
public void exclusiveJobCommitted() throws HyracksDataException {
}
}
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/context/CorrelatedPrefixMergePolicy.java b/asterix-common/src/main/java/org/apache/asterix/common/context/CorrelatedPrefixMergePolicy.java
index 567f75e..a7374d3 100644
--- a/asterix-common/src/main/java/org/apache/asterix/common/context/CorrelatedPrefixMergePolicy.java
+++ b/asterix-common/src/main/java/org/apache/asterix/common/context/CorrelatedPrefixMergePolicy.java
@@ -25,7 +25,7 @@
import java.util.Map;
import java.util.Set;
-import org.apache.asterix.common.context.DatasetLifecycleManager.DatasetInfo;
+import org.apache.asterix.common.api.IDatasetLifecycleManager;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.storage.am.common.api.IIndexLifecycleManager;
import org.apache.hyracks.storage.am.common.api.IndexException;
@@ -42,11 +42,11 @@
private long maxMergableComponentSize;
private int maxToleranceComponentCount;
- private final DatasetLifecycleManager datasetLifecycleManager;
+ private final IDatasetLifecycleManager datasetLifecycleManager;
private final int datasetID;
-
+
public CorrelatedPrefixMergePolicy(IIndexLifecycleManager datasetLifecycleManager, int datasetID) {
- this.datasetLifecycleManager = (DatasetLifecycleManager) datasetLifecycleManager;
+ this.datasetLifecycleManager = (DatasetLifecycleManager)datasetLifecycleManager;
this.datasetID = datasetID;
}
@@ -70,7 +70,7 @@
}
}
if (fullMergeIsRequested) {
- ILSMIndexAccessor accessor = (ILSMIndexAccessor) index.createAccessor(NoOpOperationCallback.INSTANCE,
+ ILSMIndexAccessor accessor = index.createAccessor(NoOpOperationCallback.INSTANCE,
NoOpOperationCallback.INSTANCE);
accessor.scheduleFullMerge(index.getIOOperationCallback());
return;
@@ -113,7 +113,7 @@
// Reverse the components order back to its original order
Collections.reverse(mergableComponents);
- ILSMIndexAccessor accessor = (ILSMIndexAccessor) lsmIndex.createAccessor(
+ ILSMIndexAccessor accessor = lsmIndex.createAccessor(
NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE);
accessor.scheduleMerge(lsmIndex.getIOOperationCallback(), mergableComponents);
}
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/context/CorrelatedPrefixMergePolicyFactory.java b/asterix-common/src/main/java/org/apache/asterix/common/context/CorrelatedPrefixMergePolicyFactory.java
index 385bbbe..ce405fc 100644
--- a/asterix-common/src/main/java/org/apache/asterix/common/context/CorrelatedPrefixMergePolicyFactory.java
+++ b/asterix-common/src/main/java/org/apache/asterix/common/context/CorrelatedPrefixMergePolicyFactory.java
@@ -25,6 +25,7 @@
import java.util.Set;
import org.apache.asterix.common.api.IAsterixAppRuntimeContext;
+import org.apache.asterix.common.api.IDatasetLifecycleManager;
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.storage.am.common.api.IIndexLifecycleManager;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMMergePolicy;
@@ -42,8 +43,8 @@
@Override
public ILSMMergePolicy createMergePolicy(Map<String, String> properties, IHyracksTaskContext ctx) {
- DatasetLifecycleManager dslcManager = (DatasetLifecycleManager) ((IAsterixAppRuntimeContext) ctx
- .getJobletContext().getApplicationContext().getApplicationObject()).getIndexLifecycleManager();
+ IDatasetLifecycleManager dslcManager = ((IAsterixAppRuntimeContext) ctx.getJobletContext()
+ .getApplicationContext().getApplicationObject()).getDatasetLifecycleManager();
ILSMMergePolicy policy = new CorrelatedPrefixMergePolicy(dslcManager, datasetID);
policy.configure(properties);
return policy;
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java b/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
index 741e106..b2e7b69 100644
--- a/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
+++ b/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
@@ -28,6 +28,7 @@
import java.util.Map;
import java.util.Set;
+import org.apache.asterix.common.api.IDatasetLifecycleManager;
import org.apache.asterix.common.api.ILocalResourceMetadata;
import org.apache.asterix.common.config.AsterixStorageProperties;
import org.apache.asterix.common.exceptions.ACIDException;
@@ -37,7 +38,6 @@
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.lifecycle.ILifeCycleComponent;
import org.apache.hyracks.storage.am.common.api.IIndex;
-import org.apache.hyracks.storage.am.common.api.IIndexLifecycleManager;
import org.apache.hyracks.storage.am.common.impls.NoOpOperationCallback;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
@@ -50,7 +50,7 @@
import org.apache.hyracks.storage.common.file.ILocalResourceRepository;
import org.apache.hyracks.storage.common.file.LocalResource;
-public class DatasetLifecycleManager implements IIndexLifecycleManager, ILifeCycleComponent {
+public class DatasetLifecycleManager implements IDatasetLifecycleManager, ILifeCycleComponent {
private final AsterixStorageProperties storageProperties;
private final Map<Integer, List<IVirtualBufferCache>> datasetVirtualBufferCaches;
private final Map<Integer, ILSMOperationTracker> datasetOpTrackers;
@@ -108,6 +108,7 @@
dsInfo.isExternal = !index.hasMemoryComponents();
dsInfo.isRegistered = true;
}
+
if (dsInfo.indexes.containsKey(resourceID)) {
throw new HyracksDataException("Index with resource ID " + resourceID + " already exists.");
}
@@ -193,21 +194,8 @@
throw new HyracksDataException("Failed to open index with resource ID " + resourceID
+ " since it does not exist.");
}
-
- // This is not needed for external datasets' indexes since they never use the virtual buffer cache.
if (!dsInfo.isOpen && !dsInfo.isExternal) {
- List<IVirtualBufferCache> vbcs = getVirtualBufferCaches(did);
- assert vbcs != null;
- long additionalSize = 0;
- for (IVirtualBufferCache vbc : vbcs) {
- additionalSize += vbc.getNumPages() * vbc.getPageSize();
- }
- while (used + additionalSize > capacity) {
- if (!evictCandidateDataset()) {
- throw new HyracksDataException("Cannot activate index since memory budget would be exceeded.");
- }
- }
- used += additionalSize;
+ initializeDatasetVirtualBufferCache(did);
}
dsInfo.isOpen = true;
@@ -226,7 +214,6 @@
// We will take a dataset that has no active transactions, it is open (a dataset consuming memory),
// that is not being used (refcount == 0) and has been least recently used. The sort order defined
// for DatasetInfo maintains this. See DatasetInfo.compareTo().
-
List<DatasetInfo> datasetInfosList = new ArrayList<DatasetInfo>(datasetInfos.values());
Collections.sort(datasetInfosList);
for (DatasetInfo dsInfo : datasetInfosList) {
@@ -241,7 +228,7 @@
return false;
}
- private void flushAndWaitForIO(DatasetInfo dsInfo, IndexInfo iInfo) throws HyracksDataException {
+ private static void flushAndWaitForIO(DatasetInfo dsInfo, IndexInfo iInfo) throws HyracksDataException {
if (iInfo.isOpen) {
ILSMIndexAccessor accessor = iInfo.index.createAccessor(NoOpOperationCallback.INSTANCE,
NoOpOperationCallback.INSTANCE);
@@ -261,6 +248,7 @@
}
}
+ @Override
public DatasetInfo getDatasetInfo(int datasetID) {
synchronized (datasetInfos) {
DatasetInfo dsInfo = datasetInfos.get(datasetID);
@@ -306,32 +294,39 @@
synchronized (datasetVirtualBufferCaches) {
List<IVirtualBufferCache> vbcs = datasetVirtualBufferCaches.get(datasetID);
if (vbcs == null) {
- vbcs = new ArrayList<IVirtualBufferCache>();
- int numPages = datasetID < firstAvilableUserDatasetID ? storageProperties
- .getMetadataMemoryComponentNumPages() : storageProperties.getMemoryComponentNumPages();
- for (int i = 0; i < storageProperties.getMemoryComponentsNum(); i++) {
- MultitenantVirtualBufferCache vbc = new MultitenantVirtualBufferCache(new VirtualBufferCache(
- new HeapBufferAllocator(), storageProperties.getMemoryComponentPageSize(), numPages
- / storageProperties.getMemoryComponentsNum()));
- vbcs.add(vbc);
+ initializeDatasetVirtualBufferCache(datasetID);
+ vbcs = datasetVirtualBufferCaches.get(datasetID);
+ if (vbcs == null) {
+ throw new IllegalStateException("Could not find dataset " + datasetID + " virtual buffer cache.");
}
- datasetVirtualBufferCaches.put(datasetID, vbcs);
}
return vbcs;
}
}
- private void removeDatasetFromCache(int datasetID) {
- List<IVirtualBufferCache> vbcs = getVirtualBufferCaches(datasetID);
- assert vbcs != null;
- for (IVirtualBufferCache vbc : vbcs) {
- used -= (vbc.getNumPages() * vbc.getPageSize());
- }
+ private void removeDatasetFromCache(int datasetID) throws HyracksDataException {
+ deallocateDatasetMemory(datasetID);
datasetInfos.remove(datasetID);
datasetVirtualBufferCaches.remove(datasetID);
datasetOpTrackers.remove(datasetID);
}
+ private void initializeDatasetVirtualBufferCache(int datasetID) {
+ List<IVirtualBufferCache> vbcs = new ArrayList<IVirtualBufferCache>();
+ synchronized (datasetVirtualBufferCaches) {
+ int numPages = datasetID < firstAvilableUserDatasetID ? storageProperties
+ .getMetadataMemoryComponentNumPages() : storageProperties.getMemoryComponentNumPages();
+ for (int i = 0; i < storageProperties.getMemoryComponentsNum(); i++) {
+ MultitenantVirtualBufferCache vbc = new MultitenantVirtualBufferCache(new VirtualBufferCache(
+ new HeapBufferAllocator(), storageProperties.getMemoryComponentPageSize(), numPages
+ / storageProperties.getMemoryComponentsNum()));
+ vbcs.add(vbc);
+ }
+ datasetVirtualBufferCaches.put(datasetID, vbcs);
+ }
+ }
+
+ @Override
public ILSMOperationTracker getOperationTracker(int datasetID) {
synchronized (datasetOpTrackers) {
ILSMOperationTracker opTracker = datasetOpTrackers.get(datasetID);
@@ -376,12 +371,14 @@
private int numActiveIOOps;
private boolean isExternal;
private boolean isRegistered;
+ private boolean memoryAllocated;
public DatasetInfo(int datasetID) {
this.indexes = new HashMap<Long, IndexInfo>();
this.lastAccess = -1;
this.datasetID = datasetID;
this.isRegistered = false;
+ this.memoryAllocated = false;
}
@Override
@@ -406,7 +403,7 @@
notifyAll();
}
- public synchronized Set<ILSMIndex> getDatasetIndexes() throws HyracksDataException {
+ public synchronized Set<ILSMIndex> getDatasetIndexes() {
Set<ILSMIndex> datasetIndexes = new HashSet<ILSMIndex>();
for (IndexInfo iInfo : indexes.values()) {
if (iInfo.isOpen) {
@@ -454,7 +451,16 @@
@Override
public String toString() {
return "DatasetID: " + datasetID + ", isOpen: " + isOpen + ", refCount: " + referenceCount
- + ", lastAccess: " + lastAccess + "}";
+ + ", lastAccess: " + lastAccess + ", isRegistered: " + isRegistered + ", memoryAllocated: "
+ + memoryAllocated;
+ }
+
+ public boolean isMemoryAllocated() {
+ return memoryAllocated;
+ }
+
+ public int getDatasetID() {
+ return datasetID;
}
}
@@ -463,12 +469,14 @@
used = 0;
}
+ @Override
public synchronized void flushAllDatasets() throws HyracksDataException {
for (DatasetInfo dsInfo : datasetInfos.values()) {
flushDatasetOpenIndexes(dsInfo, false);
}
}
+ @Override
public synchronized void flushDataset(int datasetId, boolean asyncFlush) throws HyracksDataException {
DatasetInfo datasetInfo = datasetInfos.get(datasetId);
if (datasetInfo != null) {
@@ -476,6 +484,7 @@
}
}
+ @Override
public synchronized void scheduleAsyncFlushForLaggingDatasets(long targetLSN) throws HyracksDataException {
//schedule flush for datasets with min LSN (Log Serial Number) < targetLSN
for (DatasetInfo dsInfo : datasetInfos.values()) {
@@ -537,7 +546,6 @@
accessor.scheduleFlush(iInfo.index.getIOOperationCallback());
}
} else {
-
for (IndexInfo iInfo : dsInfo.indexes.values()) {
// TODO: This is not efficient since we flush the indexes sequentially.
// Think of a way to allow submitting the flush requests concurrently. We don't do them concurrently because this
@@ -557,14 +565,12 @@
throw new HyracksDataException(e);
}
}
-
}
try {
flushDatasetOpenIndexes(dsInfo, false);
} catch (Exception e) {
throw new HyracksDataException(e);
}
-
for (IndexInfo iInfo : dsInfo.indexes.values()) {
if (iInfo.isOpen) {
ILSMOperationTracker opTracker = iInfo.index.getOperationTracker();
@@ -576,7 +582,6 @@
assert iInfo.referenceCount == 0;
}
dsInfo.isOpen = false;
-
removeDatasetFromCache(dsInfo.datasetID);
}
@@ -628,4 +633,48 @@
outputStream.write(sb.toString().getBytes());
}
-}
+
+ @Override
+ public synchronized void allocateDatasetMemory(int datasetId) throws HyracksDataException {
+ DatasetInfo dsInfo = datasetInfos.get(datasetId);
+ if (dsInfo == null) {
+ throw new HyracksDataException("Failed to allocate memory for dataset with ID " + datasetId
+ + " since it is not open.");
+ }
+ synchronized (dsInfo) {
+ // This is not needed for external datasets' indexes since they never use the virtual buffer cache.
+ if (!dsInfo.memoryAllocated && !dsInfo.isExternal) {
+ List<IVirtualBufferCache> vbcs = getVirtualBufferCaches(dsInfo.datasetID);
+ long additionalSize = 0;
+ for (IVirtualBufferCache vbc : vbcs) {
+ additionalSize += vbc.getNumPages() * vbc.getPageSize();
+ }
+ while (used + additionalSize > capacity) {
+ if (!evictCandidateDataset()) {
+ throw new HyracksDataException("Cannot allocate dataset " + dsInfo.datasetID
+ + " memory since memory budget would be exceeded.");
+ }
+ }
+ used += additionalSize;
+ dsInfo.memoryAllocated = true;
+ }
+ }
+ }
+
+ private synchronized void deallocateDatasetMemory(int datasetId) throws HyracksDataException {
+ DatasetInfo dsInfo = datasetInfos.get(datasetId);
+ if (dsInfo == null) {
+ throw new HyracksDataException("Failed to deallocate memory for dataset with ID " + datasetId
+ + " since it is not open.");
+ }
+ synchronized (dsInfo) {
+ if (dsInfo.isOpen && dsInfo.memoryAllocated) {
+ List<IVirtualBufferCache> vbcs = getVirtualBufferCaches(dsInfo.datasetID);
+ for (IVirtualBufferCache vbc : vbcs) {
+ used -= (vbc.getNumPages() * vbc.getPageSize());
+ }
+ dsInfo.memoryAllocated = false;
+ }
+ }
+ }
+}
\ No newline at end of file
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java b/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java
index 6c63248..c3d75b1 100644
--- a/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java
+++ b/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java
@@ -22,6 +22,7 @@
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.asterix.common.api.IDatasetLifecycleManager;
import org.apache.asterix.common.context.DatasetLifecycleManager.DatasetInfo;
import org.apache.asterix.common.exceptions.ACIDException;
import org.apache.asterix.common.ioopcallbacks.AbstractLSMIOOperationCallback;
@@ -48,7 +49,7 @@
private boolean flushOnExit = false;
private boolean flushLogCreated = false;
- public PrimaryIndexOperationTracker(DatasetLifecycleManager datasetLifecycleManager, int datasetID,
+ public PrimaryIndexOperationTracker(IDatasetLifecycleManager datasetLifecycleManager, int datasetID,
ILogManager logManager, DatasetInfo dsInfo) {
super(datasetLifecycleManager, datasetID, dsInfo);
this.logManager = logManager;
@@ -59,6 +60,9 @@
public void beforeOperation(ILSMIndex index, LSMOperationType opType, ISearchOperationCallback searchCallback,
IModificationOperationCallback modificationCallback) throws HyracksDataException {
if (opType == LSMOperationType.MODIFICATION || opType == LSMOperationType.FORCE_MODIFICATION) {
+ if (!dsInfo.isMemoryAllocated()) {
+ datasetLifecycleManager.allocateDatasetMemory(dsInfo.getDatasetID());
+ }
incrementNumActiveOperations(modificationCallback);
} else if (opType == LSMOperationType.FLUSH || opType == LSMOperationType.MERGE) {
dsInfo.declareActiveIOOperation();
@@ -77,7 +81,7 @@
@Override
public synchronized void completeOperation(ILSMIndex index, LSMOperationType opType,
ISearchOperationCallback searchCallback, IModificationOperationCallback modificationCallback)
- throws HyracksDataException {
+ throws HyracksDataException {
if (opType == LSMOperationType.MODIFICATION || opType == LSMOperationType.FORCE_MODIFICATION) {
decrementNumActiveOperations(modificationCallback);
if (numActiveOperations.get() == 0) {
@@ -108,7 +112,7 @@
}
if (needsFlush || flushOnExit) {
- //Make the current mutable components READABLE_UNWRITABLE to stop coming modify operations from entering them until the current flush is schedule.
+ //Make the current mutable components READABLE_UNWRITABLE to stop coming modify operations from entering them until the current flush is scheduled.
for (ILSMIndex lsmIndex : indexes) {
AbstractLSMIndex abstractLSMIndex = ((AbstractLSMIndex) lsmIndex);
ILSMOperationTracker opTracker = abstractLSMIndex.getOperationTracker();
@@ -133,7 +137,7 @@
}
}
- //Since this method is called sequentially by LogPage.notifyFlushTerminator in the sequence flush were scheduled.
+ //This method is called sequentially by LogPage.notifyFlushTerminator in the sequence flushes were scheduled.
public synchronized void triggerScheduleFlush(LogRecord logRecord) throws HyracksDataException {
for (ILSMIndex lsmIndex : dsInfo.getDatasetIndexes()) {
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/transactions/IAsterixAppRuntimeContextProvider.java b/asterix-common/src/main/java/org/apache/asterix/common/transactions/IAsterixAppRuntimeContextProvider.java
index c965dd2..d308564 100644
--- a/asterix-common/src/main/java/org/apache/asterix/common/transactions/IAsterixAppRuntimeContextProvider.java
+++ b/asterix-common/src/main/java/org/apache/asterix/common/transactions/IAsterixAppRuntimeContextProvider.java
@@ -22,8 +22,8 @@
import org.apache.asterix.common.api.AsterixThreadExecutor;
import org.apache.asterix.common.api.IAsterixAppRuntimeContext;
+import org.apache.asterix.common.api.IDatasetLifecycleManager;
import org.apache.hyracks.api.io.IIOManager;
-import org.apache.hyracks.storage.am.common.api.IIndexLifecycleManager;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationScheduler;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMOperationTracker;
import org.apache.hyracks.storage.am.lsm.common.api.IVirtualBufferCache;
@@ -42,7 +42,7 @@
public ITransactionSubsystem getTransactionSubsystem();
- public IIndexLifecycleManager getIndexLifecycleManager();
+ public IDatasetLifecycleManager getDatasetLifecycleManager();
public double getBloomFilterFalsePositiveRate();
diff --git a/asterix-common/src/test/java/org/apache/asterix/test/aql/TestsUtils.java b/asterix-common/src/test/java/org/apache/asterix/test/aql/TestsUtils.java
index 69ea1f1..f030e18 100644
--- a/asterix-common/src/test/java/org/apache/asterix/test/aql/TestsUtils.java
+++ b/asterix-common/src/test/java/org/apache/asterix/test/aql/TestsUtils.java
@@ -212,12 +212,19 @@
// In future this may be changed depending on the requested
// output format sent to the servlet.
String errorBody = method.getResponseBodyAsString();
- JSONObject result = new JSONObject(errorBody);
- String[] errors = { result.getJSONArray("error-code").getString(0), result.getString("summary"),
- result.getString("stacktrace") };
- GlobalConfig.ASTERIX_LOGGER.log(Level.SEVERE, errors[2]);
- throw new Exception("HTTP operation failed: " + errors[0] + "\nSTATUS LINE: " + method.getStatusLine()
- + "\nSUMMARY: " + errors[1] + "\nSTACKTRACE: " + errors[2]);
+ JSONObject result = null;
+ try {
+ result = new JSONObject(errorBody);
+ } catch (Exception e) {
+ throw new Exception(errorBody);
+ }
+ if (result != null) {
+ String[] errors = { result.getJSONArray("error-code").getString(0), result.getString("summary"),
+ result.getString("stacktrace") };
+ GlobalConfig.ASTERIX_LOGGER.log(Level.SEVERE, errors[2]);
+ throw new Exception("HTTP operation failed: " + errors[0] + "\nSTATUS LINE: " + method.getStatusLine()
+ + "\nSUMMARY: " + errors[1] + "\nSTACKTRACE: " + errors[2]);
+ }
}
return statusCode;
}
diff --git a/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataManager.java b/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataManager.java
index 698a54f..97260a1 100644
--- a/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataManager.java
+++ b/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataManager.java
@@ -85,8 +85,8 @@
*/
public class MetadataManager implements IMetadataManager {
private static final int INITIAL_SLEEP_TIME = 64;
- private static final int RETRY_MULTIPLIER = 4;
- private static final int MAX_RETRY_COUNT = 6;
+ private static final int RETRY_MULTIPLIER = 5;
+ private static final int MAX_RETRY_COUNT = 10;
// Set in init().
public static MetadataManager INSTANCE;
diff --git a/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataNode.java b/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataNode.java
index d600f57..07675fe 100644
--- a/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataNode.java
+++ b/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataNode.java
@@ -24,6 +24,7 @@
import java.util.List;
import org.apache.asterix.common.api.IAsterixAppRuntimeContext;
+import org.apache.asterix.common.api.IDatasetLifecycleManager;
import org.apache.asterix.common.config.DatasetConfig.DatasetType;
import org.apache.asterix.common.config.DatasetConfig.IndexType;
import org.apache.asterix.common.dataflow.AsterixLSMIndexUtil;
@@ -110,7 +111,7 @@
private static final DatasetId METADATA_DATASET_ID = new DatasetId(MetadataPrimaryIndexes.METADATA_DATASET_ID);
- private IIndexLifecycleManager indexLifecycleManager;
+ private IDatasetLifecycleManager datasetLifecycleManager;
private ITransactionSubsystem transactionSubsystem;
public static final MetadataNode INSTANCE = new MetadataNode();
@@ -121,7 +122,7 @@
public void initialize(IAsterixAppRuntimeContext runtimeContext) {
this.transactionSubsystem = runtimeContext.getTransactionSubsystem();
- this.indexLifecycleManager = runtimeContext.getIndexLifecycleManager();
+ this.datasetLifecycleManager = runtimeContext.getDatasetLifecycleManager();
}
@Override
@@ -285,9 +286,9 @@
throws Exception {
long resourceID = metadataIndex.getResourceID();
String resourceName = metadataIndex.getFile().toString();
- ILSMIndex lsmIndex = (ILSMIndex) indexLifecycleManager.getIndex(resourceName);
+ ILSMIndex lsmIndex = (ILSMIndex) datasetLifecycleManager.getIndex(resourceName);
try {
- indexLifecycleManager.open(resourceName);
+ datasetLifecycleManager.open(resourceName);
// prepare a Callback for logging
IModificationOperationCallback modCallback = createIndexModificationCallback(jobId, resourceID,
@@ -308,7 +309,7 @@
} catch (Exception e) {
throw e;
} finally {
- indexLifecycleManager.close(resourceName);
+ datasetLifecycleManager.close(resourceName);
}
}
@@ -635,9 +636,9 @@
throws Exception {
long resourceID = metadataIndex.getResourceID();
String resourceName = metadataIndex.getFile().toString();
- ILSMIndex lsmIndex = (ILSMIndex) indexLifecycleManager.getIndex(resourceName);
+ ILSMIndex lsmIndex = (ILSMIndex) datasetLifecycleManager.getIndex(resourceName);
try {
- indexLifecycleManager.open(resourceName);
+ datasetLifecycleManager.open(resourceName);
// prepare a Callback for logging
IModificationOperationCallback modCallback = createIndexModificationCallback(jobId, resourceID,
metadataIndex, lsmIndex, IndexOperation.DELETE);
@@ -655,7 +656,7 @@
} catch (Exception e) {
throw e;
} finally {
- indexLifecycleManager.close(resourceName);
+ datasetLifecycleManager.close(resourceName);
}
}
@@ -969,8 +970,8 @@
try {
IMetadataIndex index = MetadataPrimaryIndexes.DATAVERSE_DATASET;
String resourceName = index.getFile().toString();
- IIndex indexInstance = indexLifecycleManager.getIndex(resourceName);
- indexLifecycleManager.open(resourceName);
+ IIndex indexInstance = datasetLifecycleManager.getIndex(resourceName);
+ datasetLifecycleManager.open(resourceName);
IIndexAccessor indexAccessor = indexInstance.createAccessor(NoOpOperationCallback.INSTANCE,
NoOpOperationCallback.INSTANCE);
ITreeIndexCursor rangeCursor = (ITreeIndexCursor) indexAccessor.createSearchCursor(false);
@@ -988,11 +989,11 @@
} finally {
rangeCursor.close();
}
- indexLifecycleManager.close(resourceName);
+ datasetLifecycleManager.close(resourceName);
index = MetadataPrimaryIndexes.DATASET_DATASET;
- indexInstance = indexLifecycleManager.getIndex(resourceName);
- indexLifecycleManager.open(resourceName);
+ indexInstance = datasetLifecycleManager.getIndex(resourceName);
+ datasetLifecycleManager.open(resourceName);
indexAccessor = indexInstance
.createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE);
rangeCursor = (ITreeIndexCursor) indexAccessor.createSearchCursor(false);
@@ -1010,11 +1011,11 @@
} finally {
rangeCursor.close();
}
- indexLifecycleManager.close(resourceName);
+ datasetLifecycleManager.close(resourceName);
index = MetadataPrimaryIndexes.INDEX_DATASET;
- indexInstance = indexLifecycleManager.getIndex(resourceName);
- indexLifecycleManager.open(resourceName);
+ indexInstance = datasetLifecycleManager.getIndex(resourceName);
+ datasetLifecycleManager.open(resourceName);
indexAccessor = indexInstance
.createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE);
rangeCursor = (ITreeIndexCursor) indexAccessor.createSearchCursor(false);
@@ -1033,7 +1034,7 @@
} finally {
rangeCursor.close();
}
- indexLifecycleManager.close(resourceName);
+ datasetLifecycleManager.close(resourceName);
} catch (Exception e) {
e.printStackTrace();
}
@@ -1044,8 +1045,8 @@
IValueExtractor<ResultType> valueExtractor, List<ResultType> results) throws Exception {
IBinaryComparatorFactory[] comparatorFactories = index.getKeyBinaryComparatorFactory();
String resourceName = index.getFile().toString();
- IIndex indexInstance = indexLifecycleManager.getIndex(resourceName);
- indexLifecycleManager.open(resourceName);
+ IIndex indexInstance = datasetLifecycleManager.getIndex(resourceName);
+ datasetLifecycleManager.open(resourceName);
IIndexAccessor indexAccessor = indexInstance.createAccessor(NoOpOperationCallback.INSTANCE,
NoOpOperationCallback.INSTANCE);
ITreeIndexCursor rangeCursor = (ITreeIndexCursor) indexAccessor.createSearchCursor(false);
@@ -1074,7 +1075,7 @@
} finally {
rangeCursor.close();
}
- indexLifecycleManager.close(resourceName);
+ datasetLifecycleManager.close(resourceName);
}
@Override
@@ -1082,8 +1083,8 @@
int mostRecentDatasetId = MetadataPrimaryIndexes.FIRST_AVAILABLE_USER_DATASET_ID;
try {
String resourceName = MetadataPrimaryIndexes.DATASET_DATASET.getFile().toString();
- IIndex indexInstance = indexLifecycleManager.getIndex(resourceName);
- indexLifecycleManager.open(resourceName);
+ IIndex indexInstance = datasetLifecycleManager.getIndex(resourceName);
+ datasetLifecycleManager.open(resourceName);
try {
IIndexAccessor indexAccessor = indexInstance.createAccessor(NoOpOperationCallback.INSTANCE,
NoOpOperationCallback.INSTANCE);
@@ -1110,7 +1111,7 @@
rangeCursor.close();
}
} finally {
- indexLifecycleManager.close(resourceName);
+ datasetLifecycleManager.close(resourceName);
}
} catch (Exception e) {
diff --git a/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java b/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java
index c94133a..0605b47 100644
--- a/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java
+++ b/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java
@@ -31,6 +31,7 @@
import java.util.logging.Logger;
import org.apache.asterix.common.api.IAsterixAppRuntimeContext;
+import org.apache.asterix.common.api.IDatasetLifecycleManager;
import org.apache.asterix.common.api.ILocalResourceMetadata;
import org.apache.asterix.common.config.AsterixMetadataProperties;
import org.apache.asterix.common.config.DatasetConfig.DatasetType;
@@ -38,7 +39,6 @@
import org.apache.asterix.common.config.GlobalConfig;
import org.apache.asterix.common.config.IAsterixPropertiesProvider;
import org.apache.asterix.common.context.BaseOperationTracker;
-import org.apache.asterix.common.context.DatasetLifecycleManager;
import org.apache.asterix.common.exceptions.ACIDException;
import org.apache.asterix.common.ioopcallbacks.LSMBTreeIOOperationCallbackFactory;
import org.apache.asterix.metadata.IDatasetDetails;
@@ -76,7 +76,6 @@
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.io.FileReference;
import org.apache.hyracks.api.io.IIOManager;
-import org.apache.hyracks.storage.am.common.api.IIndexLifecycleManager;
import org.apache.hyracks.storage.am.common.util.IndexFileNameUtil;
import org.apache.hyracks.storage.am.lsm.btree.impls.LSMBTree;
import org.apache.hyracks.storage.am.lsm.btree.util.LSMBTreeUtils;
@@ -107,7 +106,7 @@
private static IBufferCache bufferCache;
private static IFileMapProvider fileMapProvider;
- private static IIndexLifecycleManager indexLifecycleManager;
+ private static IDatasetLifecycleManager dataLifecycleManager;
private static ILocalResourceRepository localResourceRepository;
private static IIOManager ioManager;
@@ -155,7 +154,7 @@
nodeNames = metadataProperties.getNodeNames();
// nodeStores = asterixProperity.getStores();
- indexLifecycleManager = runtimeContext.getIndexLifecycleManager();
+ dataLifecycleManager = runtimeContext.getDatasetLifecycleManager();
localResourceRepository = runtimeContext.getLocalResourceRepository();
bufferCache = runtimeContext.getBufferCache();
fileMapProvider = runtimeContext.getFileMapManager();
@@ -391,8 +390,8 @@
LSMBTree lsmBtree = null;
long resourceID = -1;
ILSMOperationTracker opTracker = index.isPrimaryIndex() ? runtimeContext.getLSMBTreeOperationTracker(index
- .getDatasetId().getId()) : new BaseOperationTracker((DatasetLifecycleManager) indexLifecycleManager,
- index.getDatasetId().getId(), ((DatasetLifecycleManager) indexLifecycleManager).getDatasetInfo(index
+ .getDatasetId().getId()) : new BaseOperationTracker(dataLifecycleManager,
+ index.getDatasetId().getId(), dataLifecycleManager.getDatasetInfo(index
.getDatasetId().getId()));
final String path = file.getFile().getPath();
if (create) {
@@ -406,7 +405,7 @@
bloomFilterKeyFields,
runtimeContext.getBloomFilterFalsePositiveRate(),
runtimeContext.getMetadataMergePolicyFactory().createMergePolicy(
- GlobalConfig.DEFAULT_COMPACTION_POLICY_PROPERTIES, indexLifecycleManager), opTracker,
+ GlobalConfig.DEFAULT_COMPACTION_POLICY_PROPERTIES, dataLifecycleManager), opTracker,
runtimeContext.getLSMIOScheduler(),
LSMBTreeIOOperationCallbackFactory.INSTANCE.createIOOperationCallback(), index.isPrimaryIndex(),
null, null, null, null, true);
@@ -420,11 +419,11 @@
localResourceMetadata, LocalResource.LSMBTreeResource);
ILocalResourceFactory localResourceFactory = localResourceFactoryProvider.getLocalResourceFactory();
localResourceRepository.insert(localResourceFactory.createLocalResource(resourceID, path, 0));
- indexLifecycleManager.register(path, lsmBtree);
+ dataLifecycleManager.register(path, lsmBtree);
} else {
final LocalResource resource = localResourceRepository.getResourceByName(path);
resourceID = resource.getResourceId();
- lsmBtree = (LSMBTree) indexLifecycleManager.getIndex(resource.getResourceName());
+ lsmBtree = (LSMBTree) dataLifecycleManager.getIndex(resource.getResourceName());
if (lsmBtree == null) {
lsmBtree = LSMBTreeUtils.createLSMTree(
virtualBufferCaches,
@@ -436,10 +435,10 @@
bloomFilterKeyFields,
runtimeContext.getBloomFilterFalsePositiveRate(),
runtimeContext.getMetadataMergePolicyFactory().createMergePolicy(
- GlobalConfig.DEFAULT_COMPACTION_POLICY_PROPERTIES, indexLifecycleManager), opTracker,
+ GlobalConfig.DEFAULT_COMPACTION_POLICY_PROPERTIES, dataLifecycleManager), opTracker,
runtimeContext.getLSMIOScheduler(), LSMBTreeIOOperationCallbackFactory.INSTANCE
.createIOOperationCallback(), index.isPrimaryIndex(), null, null, null, null, true);
- indexLifecycleManager.register(path, lsmBtree);
+ dataLifecycleManager.register(path, lsmBtree);
}
}
diff --git a/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/std/FlushDatasetOperatorDescriptor.java b/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/std/FlushDatasetOperatorDescriptor.java
index b4017fa..122a8ff 100644
--- a/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/std/FlushDatasetOperatorDescriptor.java
+++ b/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/std/FlushDatasetOperatorDescriptor.java
@@ -21,7 +21,7 @@
import java.nio.ByteBuffer;
import org.apache.asterix.common.api.IAsterixAppRuntimeContext;
-import org.apache.asterix.common.context.DatasetLifecycleManager;
+import org.apache.asterix.common.api.IDatasetLifecycleManager;
import org.apache.asterix.common.exceptions.ACIDException;
import org.apache.asterix.common.transactions.DatasetId;
import org.apache.asterix.common.transactions.ILockManager;
@@ -73,8 +73,7 @@
try {
IAsterixAppRuntimeContext runtimeCtx = (IAsterixAppRuntimeContext) ctx.getJobletContext()
.getApplicationContext().getApplicationObject();
- DatasetLifecycleManager datasetLifeCycleManager = (DatasetLifecycleManager) runtimeCtx
- .getIndexLifecycleManager();
+ IDatasetLifecycleManager datasetLifeCycleManager = runtimeCtx.getDatasetLifecycleManager();
ILockManager lockManager = runtimeCtx.getTransactionSubsystem().getLockManager();
ITransactionManager txnManager = runtimeCtx.getTransactionSubsystem().getTransactionManager();
// get the local transaction
diff --git a/asterix-transactions/pom.xml b/asterix-transactions/pom.xml
index b8673bb..e5ad1b4 100644
--- a/asterix-transactions/pom.xml
+++ b/asterix-transactions/pom.xml
@@ -107,10 +107,15 @@
<scope>compile</scope>
</dependency>
<dependency>
- <groupId>com.google.guava</groupId>
- <artifactId>guava</artifactId>
- <version>18.0</version>
- </dependency>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ <version>18.0</version>
+ </dependency>
+ <dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-all</artifactId>
+ <version>1.10.19</version>
+ </dependency>
</dependencies>
</project>
diff --git a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallbackFactory.java b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallbackFactory.java
index 4c2f25d..58bc44e 100644
--- a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallbackFactory.java
+++ b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallbackFactory.java
@@ -55,7 +55,7 @@
ITransactionSubsystem txnSubsystem = txnSubsystemProvider.getTransactionSubsystem(ctx);
IIndexLifecycleManager indexLifeCycleManager = txnSubsystem.getAsterixAppRuntimeContextProvider()
- .getIndexLifecycleManager();
+ .getDatasetLifecycleManager();
ILSMIndex index = (ILSMIndex) indexLifeCycleManager.getIndex(resourceName);
if (index == null) {
throw new HyracksDataException("Index(id:" + resourceId + ") is not registered.");
diff --git a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexOperationTrackerProvider.java b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexOperationTrackerProvider.java
index 1693685..876fcb3 100644
--- a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexOperationTrackerProvider.java
+++ b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexOperationTrackerProvider.java
@@ -20,7 +20,7 @@
package org.apache.asterix.transaction.management.opcallbacks;
import org.apache.asterix.common.api.IAsterixAppRuntimeContext;
-import org.apache.asterix.common.context.DatasetLifecycleManager;
+import org.apache.asterix.common.api.IDatasetLifecycleManager;
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMOperationTracker;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMOperationTrackerProvider;
@@ -37,8 +37,8 @@
@Override
public ILSMOperationTracker getOperationTracker(IHyracksTaskContext ctx) {
- DatasetLifecycleManager dslcManager = (DatasetLifecycleManager) ((IAsterixAppRuntimeContext) ctx
- .getJobletContext().getApplicationContext().getApplicationObject()).getIndexLifecycleManager();
+ IDatasetLifecycleManager dslcManager = ((IAsterixAppRuntimeContext) ctx.getJobletContext()
+ .getApplicationContext().getApplicationObject()).getDatasetLifecycleManager();
return dslcManager.getOperationTracker(datasetID);
}
diff --git a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexModificationOperationCallbackFactory.java b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexModificationOperationCallbackFactory.java
index 9c897f2..48ebff3 100644
--- a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexModificationOperationCallbackFactory.java
+++ b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexModificationOperationCallbackFactory.java
@@ -51,7 +51,7 @@
Object resource, IHyracksTaskContext ctx) throws HyracksDataException {
ITransactionSubsystem txnSubsystem = txnSubsystemProvider.getTransactionSubsystem(ctx);
IIndexLifecycleManager indexLifeCycleManager = txnSubsystem.getAsterixAppRuntimeContextProvider()
- .getIndexLifecycleManager();
+ .getDatasetLifecycleManager();
ILSMIndex index = (ILSMIndex) indexLifeCycleManager.getIndex(resourceName);
if (index == null) {
throw new HyracksDataException("Index(id:" + resourceId + ") is not registered.");
diff --git a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexOperationTrackerProvider.java b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexOperationTrackerProvider.java
index 7b17193..7b1ec04 100644
--- a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexOperationTrackerProvider.java
+++ b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexOperationTrackerProvider.java
@@ -19,8 +19,8 @@
package org.apache.asterix.transaction.management.opcallbacks;
import org.apache.asterix.common.api.IAsterixAppRuntimeContext;
+import org.apache.asterix.common.api.IDatasetLifecycleManager;
import org.apache.asterix.common.context.BaseOperationTracker;
-import org.apache.asterix.common.context.DatasetLifecycleManager;
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMOperationTracker;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMOperationTrackerProvider;
@@ -37,8 +37,8 @@
@Override
public ILSMOperationTracker getOperationTracker(IHyracksTaskContext ctx) {
- DatasetLifecycleManager dslcManager = (DatasetLifecycleManager) ((IAsterixAppRuntimeContext) ctx
- .getJobletContext().getApplicationContext().getApplicationObject()).getIndexLifecycleManager();
+ IDatasetLifecycleManager dslcManager = ((IAsterixAppRuntimeContext) ctx
+ .getJobletContext().getApplicationContext().getApplicationObject()).getDatasetLifecycleManager();
return new BaseOperationTracker(dslcManager, datasetID, dslcManager.getDatasetInfo(datasetID));
}
diff --git a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/TempDatasetPrimaryIndexModificationOperationCallbackFactory.java b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/TempDatasetPrimaryIndexModificationOperationCallbackFactory.java
index 4de0749..23eb2be 100644
--- a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/TempDatasetPrimaryIndexModificationOperationCallbackFactory.java
+++ b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/TempDatasetPrimaryIndexModificationOperationCallbackFactory.java
@@ -52,7 +52,7 @@
Object resource, IHyracksTaskContext ctx) throws HyracksDataException {
ITransactionSubsystem txnSubsystem = txnSubsystemProvider.getTransactionSubsystem(ctx);
IIndexLifecycleManager indexLifeCycleManager = txnSubsystem.getAsterixAppRuntimeContextProvider()
- .getIndexLifecycleManager();
+ .getDatasetLifecycleManager();
ILSMIndex index = (ILSMIndex) indexLifeCycleManager.getIndex(resourceName);
if (index == null) {
throw new HyracksDataException("Index(id:" + resourceId + ") is not registered.");
diff --git a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/TempDatasetSecondaryIndexModificationOperationCallbackFactory.java b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/TempDatasetSecondaryIndexModificationOperationCallbackFactory.java
index c2f56a0..6e0394a 100644
--- a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/TempDatasetSecondaryIndexModificationOperationCallbackFactory.java
+++ b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/TempDatasetSecondaryIndexModificationOperationCallbackFactory.java
@@ -52,7 +52,7 @@
Object resource, IHyracksTaskContext ctx) throws HyracksDataException {
ITransactionSubsystem txnSubsystem = txnSubsystemProvider.getTransactionSubsystem(ctx);
IIndexLifecycleManager indexLifeCycleManager = txnSubsystem.getAsterixAppRuntimeContextProvider()
- .getIndexLifecycleManager();
+ .getDatasetLifecycleManager();
ILSMIndex index = (ILSMIndex) indexLifeCycleManager.getIndex(resourceName);
if (index == null) {
throw new HyracksDataException("Index(id:" + resourceId + ") is not registered.");
diff --git a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/ExternalBTreeLocalResourceMetadata.java b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/ExternalBTreeLocalResourceMetadata.java
index 5851240..26f56b7 100644
--- a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/ExternalBTreeLocalResourceMetadata.java
+++ b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/ExternalBTreeLocalResourceMetadata.java
@@ -22,7 +22,6 @@
import java.util.Map;
import org.apache.asterix.common.context.BaseOperationTracker;
-import org.apache.asterix.common.context.DatasetLifecycleManager;
import org.apache.asterix.common.ioopcallbacks.LSMBTreeIOOperationCallbackFactory;
import org.apache.asterix.common.transactions.IAsterixAppRuntimeContextProvider;
import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
@@ -57,10 +56,10 @@
bloomFilterKeyFields,
runtimeContextProvider.getBloomFilterFalsePositiveRate(),
mergePolicyFactory.createMergePolicy(mergePolicyProperties,
- runtimeContextProvider.getIndexLifecycleManager()),
- new BaseOperationTracker((DatasetLifecycleManager) runtimeContextProvider.getIndexLifecycleManager(),
- datasetID, ((DatasetLifecycleManager) runtimeContextProvider.getIndexLifecycleManager())
- .getDatasetInfo(datasetID)), runtimeContextProvider.getLSMIOScheduler(),
+ runtimeContextProvider.getDatasetLifecycleManager()),
+ new BaseOperationTracker(runtimeContextProvider.getDatasetLifecycleManager(),
+ datasetID, runtimeContextProvider.getDatasetLifecycleManager()
+ .getDatasetInfo(datasetID)), runtimeContextProvider.getLSMIOScheduler(),
LSMBTreeIOOperationCallbackFactory.INSTANCE.createIOOperationCallback(), -1, true);
return lsmBTree;
}
diff --git a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/ExternalBTreeWithBuddyLocalResourceMetadata.java b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/ExternalBTreeWithBuddyLocalResourceMetadata.java
index af315f9..beb1bb8 100644
--- a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/ExternalBTreeWithBuddyLocalResourceMetadata.java
+++ b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/ExternalBTreeWithBuddyLocalResourceMetadata.java
@@ -22,7 +22,6 @@
import java.util.Map;
import org.apache.asterix.common.context.BaseOperationTracker;
-import org.apache.asterix.common.context.DatasetLifecycleManager;
import org.apache.asterix.common.ioopcallbacks.LSMBTreeWithBuddyIOOperationCallbackFactory;
import org.apache.asterix.common.transactions.IAsterixAppRuntimeContextProvider;
import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
@@ -69,9 +68,9 @@
btreeCmpFactories,
runtimeContextProvider.getBloomFilterFalsePositiveRate(),
mergePolicyFactory.createMergePolicy(mergePolicyProperties,
- runtimeContextProvider.getIndexLifecycleManager()),
- new BaseOperationTracker((DatasetLifecycleManager) runtimeContextProvider.getIndexLifecycleManager(),
- datasetID, ((DatasetLifecycleManager) runtimeContextProvider.getIndexLifecycleManager())
+ runtimeContextProvider.getDatasetLifecycleManager()),
+ new BaseOperationTracker(runtimeContextProvider.getDatasetLifecycleManager(),
+ datasetID, runtimeContextProvider.getDatasetLifecycleManager()
.getDatasetInfo(datasetID)), runtimeContextProvider.getLSMIOScheduler(),
LSMBTreeWithBuddyIOOperationCallbackFactory.INSTANCE.createIOOperationCallback(), buddyBtreeFields, -1,
true);
diff --git a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/ExternalRTreeLocalResourceMetadata.java b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/ExternalRTreeLocalResourceMetadata.java
index 576eccd..b9f5af9 100644
--- a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/ExternalRTreeLocalResourceMetadata.java
+++ b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/ExternalRTreeLocalResourceMetadata.java
@@ -22,7 +22,6 @@
import java.util.Map;
import org.apache.asterix.common.context.BaseOperationTracker;
-import org.apache.asterix.common.context.DatasetLifecycleManager;
import org.apache.asterix.common.ioopcallbacks.LSMRTreeIOOperationCallbackFactory;
import org.apache.asterix.common.transactions.IAsterixAppRuntimeContextProvider;
import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
@@ -69,10 +68,10 @@
rtreePolicyType,
runtimeContextProvider.getBloomFilterFalsePositiveRate(),
mergePolicyFactory.createMergePolicy(mergePolicyProperties,
- runtimeContextProvider.getIndexLifecycleManager()),
- new BaseOperationTracker((DatasetLifecycleManager) runtimeContextProvider
- .getIndexLifecycleManager(), datasetID, ((DatasetLifecycleManager) runtimeContextProvider
- .getIndexLifecycleManager()).getDatasetInfo(datasetID)), runtimeContextProvider
+ runtimeContextProvider.getDatasetLifecycleManager()),
+ new BaseOperationTracker(runtimeContextProvider
+ .getDatasetLifecycleManager(), datasetID, runtimeContextProvider
+ .getDatasetLifecycleManager().getDatasetInfo(datasetID)), runtimeContextProvider
.getLSMIOScheduler(), LSMRTreeIOOperationCallbackFactory.INSTANCE
.createIOOperationCallback(), linearizeCmpFactory, btreeFields, -1, true);
} catch (TreeIndexException e) {
diff --git a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/LSMBTreeLocalResourceMetadata.java b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/LSMBTreeLocalResourceMetadata.java
index 495d56e..0adcf53 100644
--- a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/LSMBTreeLocalResourceMetadata.java
+++ b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/LSMBTreeLocalResourceMetadata.java
@@ -76,10 +76,10 @@
bloomFilterKeyFields,
runtimeContextProvider.getBloomFilterFalsePositiveRate(),
mergePolicyFactory.createMergePolicy(mergePolicyProperties,
- runtimeContextProvider.getIndexLifecycleManager()),
+ runtimeContextProvider.getDatasetLifecycleManager()),
isPrimary ? runtimeContextProvider.getLSMBTreeOperationTracker(datasetID) : new BaseOperationTracker(
- (DatasetLifecycleManager) runtimeContextProvider.getIndexLifecycleManager(), datasetID,
- ((DatasetLifecycleManager) runtimeContextProvider.getIndexLifecycleManager())
+ runtimeContextProvider.getDatasetLifecycleManager(), datasetID,
+ runtimeContextProvider.getDatasetLifecycleManager()
.getDatasetInfo(datasetID)), runtimeContextProvider.getLSMIOScheduler(),
LSMBTreeIOOperationCallbackFactory.INSTANCE.createIOOperationCallback(), isPrimary, filterTypeTraits,
filterCmpFactories, btreeFields, filterFields, true);
diff --git a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/LSMInvertedIndexLocalResourceMetadata.java b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/LSMInvertedIndexLocalResourceMetadata.java
index d76702e..e635349 100644
--- a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/LSMInvertedIndexLocalResourceMetadata.java
+++ b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/LSMInvertedIndexLocalResourceMetadata.java
@@ -22,7 +22,6 @@
import java.util.Map;
import org.apache.asterix.common.context.BaseOperationTracker;
-import org.apache.asterix.common.context.DatasetLifecycleManager;
import org.apache.asterix.common.ioopcallbacks.LSMInvertedIndexIOOperationCallbackFactory;
import org.apache.asterix.common.transactions.IAsterixAppRuntimeContextProvider;
import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
@@ -90,10 +89,10 @@
filePath,
runtimeContextProvider.getBloomFilterFalsePositiveRate(),
mergePolicyFactory.createMergePolicy(mergePolicyProperties,
- runtimeContextProvider.getIndexLifecycleManager()),
- new BaseOperationTracker((DatasetLifecycleManager) runtimeContextProvider
- .getIndexLifecycleManager(), datasetID,
- ((DatasetLifecycleManager) runtimeContextProvider.getIndexLifecycleManager())
+ runtimeContextProvider.getDatasetLifecycleManager()),
+ new BaseOperationTracker(runtimeContextProvider
+ .getDatasetLifecycleManager(), datasetID,
+ runtimeContextProvider.getDatasetLifecycleManager()
.getDatasetInfo(datasetID)), runtimeContextProvider.getLSMIOScheduler(),
LSMInvertedIndexIOOperationCallbackFactory.INSTANCE.createIOOperationCallback(),
invertedIndexFields, filterTypeTraits, filterCmpFactories, filterFields,
@@ -111,10 +110,9 @@
filePath,
runtimeContextProvider.getBloomFilterFalsePositiveRate(),
mergePolicyFactory.createMergePolicy(mergePolicyProperties,
- runtimeContextProvider.getIndexLifecycleManager()),
- new BaseOperationTracker((DatasetLifecycleManager) runtimeContextProvider
- .getIndexLifecycleManager(), datasetID,
- ((DatasetLifecycleManager) runtimeContextProvider.getIndexLifecycleManager())
+ runtimeContextProvider.getDatasetLifecycleManager()),
+ new BaseOperationTracker(runtimeContextProvider.getDatasetLifecycleManager(), datasetID,
+ runtimeContextProvider.getDatasetLifecycleManager()
.getDatasetInfo(datasetID)), runtimeContextProvider.getLSMIOScheduler(),
LSMInvertedIndexIOOperationCallbackFactory.INSTANCE.createIOOperationCallback(),
invertedIndexFields, filterTypeTraits, filterCmpFactories, filterFields,
diff --git a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/LSMRTreeLocalResourceMetadata.java b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/LSMRTreeLocalResourceMetadata.java
index 31a3283..c99e052 100644
--- a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/LSMRTreeLocalResourceMetadata.java
+++ b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/LSMRTreeLocalResourceMetadata.java
@@ -23,7 +23,6 @@
import java.util.Map;
import org.apache.asterix.common.context.BaseOperationTracker;
-import org.apache.asterix.common.context.DatasetLifecycleManager;
import org.apache.asterix.common.ioopcallbacks.LSMRTreeIOOperationCallbackFactory;
import org.apache.asterix.common.transactions.IAsterixAppRuntimeContextProvider;
import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
@@ -91,10 +90,10 @@
rtreePolicyType,
runtimeContextProvider.getBloomFilterFalsePositiveRate(),
mergePolicyFactory.createMergePolicy(mergePolicyProperties,
- runtimeContextProvider.getIndexLifecycleManager()),
- new BaseOperationTracker((DatasetLifecycleManager) runtimeContextProvider
- .getIndexLifecycleManager(), datasetID, ((DatasetLifecycleManager) runtimeContextProvider
- .getIndexLifecycleManager()).getDatasetInfo(datasetID)), runtimeContextProvider
+ runtimeContextProvider.getDatasetLifecycleManager()),
+ new BaseOperationTracker(runtimeContextProvider
+ .getDatasetLifecycleManager(), datasetID, runtimeContextProvider
+ .getDatasetLifecycleManager().getDatasetInfo(datasetID)), runtimeContextProvider
.getLSMIOScheduler(), LSMRTreeIOOperationCallbackFactory.INSTANCE
.createIOOperationCallback(), linearizeCmpFactory, rtreeFields, btreeFields,
filterTypeTraits, filterCmpFactories, filterFields, true);
diff --git a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/locking/TestRuntimeContextProvider.java b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/locking/TestRuntimeContextProvider.java
index f6880db..65ed01c 100644
--- a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/locking/TestRuntimeContextProvider.java
+++ b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/locking/TestRuntimeContextProvider.java
@@ -18,17 +18,17 @@
*/
package org.apache.asterix.transaction.management.service.locking;
+import static org.mockito.Mockito.mock;
+
import java.util.List;
import java.util.concurrent.Executors;
import org.apache.asterix.common.api.AsterixThreadExecutor;
import org.apache.asterix.common.api.IAsterixAppRuntimeContext;
+import org.apache.asterix.common.api.IDatasetLifecycleManager;
import org.apache.asterix.common.transactions.IAsterixAppRuntimeContextProvider;
import org.apache.asterix.common.transactions.ITransactionSubsystem;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.io.IIOManager;
-import org.apache.hyracks.storage.am.common.api.IIndex;
-import org.apache.hyracks.storage.am.common.api.IIndexLifecycleManager;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationScheduler;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMOperationTracker;
import org.apache.hyracks.storage.am.lsm.common.api.IVirtualBufferCache;
@@ -40,7 +40,7 @@
class TestRuntimeContextProvider implements IAsterixAppRuntimeContextProvider {
AsterixThreadExecutor ate = new AsterixThreadExecutor(Executors.defaultThreadFactory());
- IIndexLifecycleManager ilm = new IndexLifecycleManager();
+ IDatasetLifecycleManager dlcm = mock(IDatasetLifecycleManager.class);
@Override
public AsterixThreadExecutor getThreadExecutor() {
@@ -63,8 +63,8 @@
}
@Override
- public IIndexLifecycleManager getIndexLifecycleManager() {
- return ilm;
+ public IDatasetLifecycleManager getDatasetLifecycleManager() {
+ return dlcm;
}
@Override
@@ -106,41 +106,4 @@
public IAsterixAppRuntimeContext getAppContext() {
throw new UnsupportedOperationException();
}
-
- static class IndexLifecycleManager implements IIndexLifecycleManager {
- @Override
- public List<IIndex> getOpenIndexes() {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public void register(String resourceName, IIndex index) throws HyracksDataException {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public void open(String resourceName) throws HyracksDataException {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public void close(String resourceName) throws HyracksDataException {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public IIndex getIndex(String resourceName) throws HyracksDataException {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public void unregister(String resourceName) throws HyracksDataException {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public IIndex getIndex(int datasetID, long resourceID) throws HyracksDataException {
- throw new UnsupportedOperationException();
- }
- }
}
\ No newline at end of file
diff --git a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/RecoveryManager.java b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/RecoveryManager.java
index fec04e4..9bc4c53 100644
--- a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/RecoveryManager.java
+++ b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/RecoveryManager.java
@@ -47,8 +47,8 @@
import java.util.logging.Level;
import java.util.logging.Logger;
+import org.apache.asterix.common.api.IDatasetLifecycleManager;
import org.apache.asterix.common.api.ILocalResourceMetadata;
-import org.apache.asterix.common.context.DatasetLifecycleManager;
import org.apache.asterix.common.exceptions.ACIDException;
import org.apache.asterix.common.ioopcallbacks.AbstractLSMIOOperationCallback;
import org.apache.asterix.common.transactions.DatasetId;
@@ -70,7 +70,6 @@
import org.apache.hyracks.api.lifecycle.ILifeCycleComponent;
import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
import org.apache.hyracks.storage.am.common.api.IIndex;
-import org.apache.hyracks.storage.am.common.api.IIndexLifecycleManager;
import org.apache.hyracks.storage.am.common.impls.NoOpOperationCallback;
import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
@@ -96,7 +95,7 @@
private final static String RECOVERY_FILES_DIR_NAME = "recovery_temp";
private static final long MEGABYTE = 1024L * 1024L;
private Map<Integer, JobEntityCommits> jobId2WinnerEntitiesMap = null;
- private static final long MAX_CACHED_ENTITY_COMMITS_PER_JOB_SIZE = 4 * MEGABYTE; //2MB;
+ private static final long MAX_CACHED_ENTITY_COMMITS_PER_JOB_SIZE = 4 * MEGABYTE; //4MB;
/**
* A file at a known location that contains the LSN of the last log record
@@ -105,7 +104,7 @@
private static final String CHECKPOINT_FILENAME_PREFIX = "checkpoint_";
private SystemState state;
- public RecoveryManager(TransactionSubsystem txnSubsystem) throws ACIDException {
+ public RecoveryManager(TransactionSubsystem txnSubsystem) {
this.txnSubsystem = txnSubsystem;
this.logMgr = (LogManager) txnSubsystem.getLogManager();
this.checkpointHistory = this.txnSubsystem.getTransactionProperties().getCheckpointHistory();
@@ -268,7 +267,9 @@
boolean foundWinner = false;
IAsterixAppRuntimeContextProvider appRuntimeContext = txnSubsystem.getAsterixAppRuntimeContextProvider();
- IIndexLifecycleManager indexLifecycleManager = appRuntimeContext.getIndexLifecycleManager();
+ //get datasetLifeCycleManager
+ IDatasetLifecycleManager datasetLifecycleManager = txnSubsystem.getAsterixAppRuntimeContextProvider()
+ .getDatasetLifecycleManager();
ILocalResourceRepository localResourceRepository = appRuntimeContext.getLocalResourceRepository();
Map<Long, LocalResource> resourcesMap = ((PersistentLocalResourceRepository) localResourceRepository)
.loadAndGetAllResources();
@@ -320,17 +321,17 @@
//get index instance from IndexLifeCycleManager
//if index is not registered into IndexLifeCycleManager,
//create the index using LocalMetadata stored in LocalResourceRepository
- index = (ILSMIndex) indexLifecycleManager.getIndex(localResource.getResourceName());
+ index = (ILSMIndex) datasetLifecycleManager.getIndex(localResource.getResourceName());
if (index == null) {
//#. create index instance and register to indexLifeCycleManager
localResourceMetadata = (ILocalResourceMetadata) localResource.getResourceObject();
index = localResourceMetadata.createIndexInstance(appRuntimeContext,
localResource.getResourceName(), localResource.getPartition());
- indexLifecycleManager.register(localResource.getResourceName(), index);
- indexLifecycleManager.open(localResource.getResourceName());
+ datasetLifecycleManager.register(localResource.getResourceName(), index);
+ datasetLifecycleManager.open(localResource.getResourceName());
//#. get maxDiskLastLSN
- ILSMIndex lsmIndex = (ILSMIndex) index;
+ ILSMIndex lsmIndex = index;
maxDiskLastLsn = ((AbstractLSMIOOperationCallback) lsmIndex.getIOOperationCallback())
.getComponentLSN(lsmIndex.getImmutableComponents());
@@ -341,7 +342,7 @@
}
if (LSN > maxDiskLastLsn) {
- redo(logRecord);
+ redo(logRecord, datasetLifecycleManager);
redoCount++;
}
}
@@ -361,7 +362,7 @@
//close all indexes
Set<Long> resourceIdList = resourceId2MaxLSNMap.keySet();
for (long r : resourceIdList) {
- indexLifecycleManager.close(resourcesMap.get(r).getResourceName());
+ datasetLifecycleManager.close(resourcesMap.get(r).getResourceName());
}
if (LOGGER.isLoggable(Level.INFO)) {
@@ -398,8 +399,8 @@
//right after the new checkpoint file is written.
File[] prevCheckpointFiles = getPreviousCheckpointFiles();
- DatasetLifecycleManager datasetLifecycleManager = (DatasetLifecycleManager) txnSubsystem
- .getAsterixAppRuntimeContextProvider().getIndexLifecycleManager();
+ IDatasetLifecycleManager datasetLifecycleManager = txnSubsystem.getAsterixAppRuntimeContextProvider()
+ .getDatasetLifecycleManager();
//flush all in-memory components if it is the sharp checkpoint
if (isSharpCheckpoint) {
datasetLifecycleManager.flushAllDatasets();
@@ -474,9 +475,9 @@
}
public long getMinFirstLSN() throws HyracksDataException {
- IIndexLifecycleManager indexLifecycleManager = txnSubsystem.getAsterixAppRuntimeContextProvider()
- .getIndexLifecycleManager();
- List<IIndex> openIndexList = indexLifecycleManager.getOpenIndexes();
+ IDatasetLifecycleManager datasetLifecycleManager = txnSubsystem.getAsterixAppRuntimeContextProvider()
+ .getDatasetLifecycleManager();
+ List<IIndex> openIndexList = datasetLifecycleManager.getOpenIndexes();
long firstLSN;
//the min first lsn can only be the current append or smaller
long minFirstLSN = logMgr.getAppendLSN();
@@ -557,7 +558,7 @@
return parentDir.listFiles(filter);
}
- private String getCheckpointFileName(String baseDir, String suffix) {
+ private static String getCheckpointFileName(String baseDir, String suffix) {
if (!baseDir.endsWith(System.getProperty("file.separator"))) {
baseDir += System.getProperty("file.separator");
}
@@ -703,6 +704,8 @@
//undo loserTxn's effect
LOGGER.log(Level.INFO, "undoing loser transaction's effect");
+ IDatasetLifecycleManager datasetLifecycleManager = txnSubsystem.getAsterixAppRuntimeContextProvider()
+ .getDatasetLifecycleManager();
//TODO sort loser entities by smallest LSN to undo in one pass.
Iterator<Entry<TxnId, List<Long>>> iter = jobLoserEntity2LSNsMap.entrySet().iterator();
int undoCount = 0;
@@ -719,7 +722,7 @@
if (IS_DEBUG_MODE) {
LOGGER.info(logRecord.getLogRecordForDisplay());
}
- undo(logRecord);
+ undo(logRecord, datasetLifecycleManager);
undoCount++;
}
}
@@ -749,10 +752,10 @@
// do nothing
}
- private void undo(ILogRecord logRecord) {
+ private static void undo(ILogRecord logRecord, IDatasetLifecycleManager datasetLifecycleManager) {
try {
- ILSMIndex index = (ILSMIndex) txnSubsystem.getAsterixAppRuntimeContextProvider().getIndexLifecycleManager()
- .getIndex(logRecord.getDatasetId(), logRecord.getResourceId());
+ ILSMIndex index = (ILSMIndex) datasetLifecycleManager.getIndex(logRecord.getDatasetId(),
+ logRecord.getResourceId());
ILSMIndexAccessor indexAccessor = index.createAccessor(NoOpOperationCallback.INSTANCE,
NoOpOperationCallback.INSTANCE);
if (logRecord.getNewOp() == IndexOperation.INSERT.ordinal()) {
@@ -767,10 +770,10 @@
}
}
- private void redo(ILogRecord logRecord) {
+ private static void redo(ILogRecord logRecord, IDatasetLifecycleManager datasetLifecycleManager) {
try {
- ILSMIndex index = (ILSMIndex) txnSubsystem.getAsterixAppRuntimeContextProvider().getIndexLifecycleManager()
- .getIndex(logRecord.getDatasetId(), logRecord.getResourceId());
+ ILSMIndex index = (ILSMIndex) datasetLifecycleManager.getIndex(logRecord.getDatasetId(),
+ logRecord.getResourceId());
ILSMIndexAccessor indexAccessor = index.createAccessor(NoOpOperationCallback.INSTANCE,
NoOpOperationCallback.INSTANCE);
if (logRecord.getNewOp() == IndexOperation.INSERT.ordinal()) {
@@ -982,7 +985,7 @@
public TxnId() {
}
- private void readPKValueIntoByteArray(ITupleReference pkValue, int pkSize, byte[] byteArrayPKValue) {
+ private static void readPKValueIntoByteArray(ITupleReference pkValue, int pkSize, byte[] byteArrayPKValue) {
int readOffset = pkValue.getFieldStart(0);
byte[] readBuffer = pkValue.getFieldData(0);
for (int i = 0; i < pkSize; i++) {
@@ -1034,7 +1037,7 @@
}
}
- private boolean isEqual(byte[] a, byte[] b, int size) {
+ private static boolean isEqual(byte[] a, byte[] b, int size) {
for (int i = 0; i < size; i++) {
if (a[i] != b[i]) {
return false;
@@ -1043,7 +1046,7 @@
return true;
}
- private boolean isEqual(byte[] a, ITupleReference b, int size) {
+ private static boolean isEqual(byte[] a, ITupleReference b, int size) {
int readOffset = b.getFieldStart(0);
byte[] readBuffer = b.getFieldData(0);
for (int i = 0; i < size; i++) {
@@ -1054,7 +1057,7 @@
return true;
}
- private boolean isEqual(ITupleReference a, ITupleReference b, int size) {
+ private static boolean isEqual(ITupleReference a, ITupleReference b, int size) {
int aOffset = a.getFieldStart(0);
byte[] aBuffer = a.getFieldData(0);
int bOffset = b.getFieldStart(0);
@@ -1067,7 +1070,7 @@
return true;
}
- public void serialize(ByteBuffer buffer) throws IOException {
+ public void serialize(ByteBuffer buffer) {
buffer.putInt(jobId);
buffer.putInt(datasetId);
buffer.putInt(pkHashValue);
diff --git a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/AsterixRuntimeComponentsProvider.java b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/AsterixRuntimeComponentsProvider.java
index 2551ee2..1686e17 100644
--- a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/AsterixRuntimeComponentsProvider.java
+++ b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/AsterixRuntimeComponentsProvider.java
@@ -19,8 +19,8 @@
package org.apache.asterix.transaction.management.service.transaction;
import org.apache.asterix.common.api.IAsterixAppRuntimeContext;
+import org.apache.asterix.common.api.IDatasetLifecycleManager;
import org.apache.hyracks.api.context.IHyracksTaskContext;
-import org.apache.hyracks.storage.am.common.api.IIndexLifecycleManager;
import org.apache.hyracks.storage.am.common.api.IIndexLifecycleManagerProvider;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationScheduler;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationSchedulerProvider;
@@ -65,9 +65,9 @@
}
@Override
- public IIndexLifecycleManager getLifecycleManager(IHyracksTaskContext ctx) {
+ public IDatasetLifecycleManager getLifecycleManager(IHyracksTaskContext ctx) {
return ((IAsterixAppRuntimeContext) ctx.getJobletContext().getApplicationContext().getApplicationObject())
- .getIndexLifecycleManager();
+ .getDatasetLifecycleManager();
}
@Override
diff --git a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionSubsystem.java b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionSubsystem.java
index 09fbb06..eeb65e2 100644
--- a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionSubsystem.java
+++ b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionSubsystem.java
@@ -56,7 +56,7 @@
this.recoveryManager = new RecoveryManager(this);
if (asterixAppRuntimeContextProvider != null) {
this.checkpointThread = new CheckpointThread(recoveryManager,
- asterixAppRuntimeContextProvider.getIndexLifecycleManager(),logManager,
+ asterixAppRuntimeContextProvider.getDatasetLifecycleManager(),logManager,
this.txnProperties.getCheckpointLSNThreshold(), this.txnProperties.getCheckpointPollFrequency());
this.checkpointThread.start();
} else {