ASTERIXDB-1053: Allow lazy loading for persistent local resources
- Allow indexes to be accessed by name.
- Allow lazy loading for persistent local resources.
- Caching for local resources.
Change-Id: I48b9260a3280750145f6ddb3783673a299055910
Reviewed-on: https://asterix-gerrit.ics.uci.edu/344
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Yingyi Bu <buyingyi@gmail.com>
diff --git a/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixAppRuntimeContext.java b/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixAppRuntimeContext.java
index 13d7e21..35afd8b 100644
--- a/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixAppRuntimeContext.java
+++ b/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixAppRuntimeContext.java
@@ -96,7 +96,6 @@
private AsterixTransactionProperties txnProperties;
private AsterixFeedProperties feedProperties;
-
private AsterixThreadExecutor threadExecutor;
private DatasetLifecycleManager indexLifecycleManager;
private IFileMapManager fileMapManager;
@@ -140,7 +139,7 @@
metadataMergePolicyFactory = new PrefixMergePolicyFactory();
ILocalResourceRepositoryFactory persistentLocalResourceRepositoryFactory = new PersistentLocalResourceRepositoryFactory(
- ioManager);
+ ioManager, ncApplicationContext.getNodeId());
localResourceRepository = (PersistentLocalResourceRepository) persistentLocalResourceRepositoryFactory
.createRepository();
resourceIdFactory = (new ResourceIdFactoryProvider(localResourceRepository)).createResourceIdFactory();
@@ -149,10 +148,10 @@
this);
txnSubsystem = new TransactionSubsystem(ncApplicationContext.getNodeId(), asterixAppRuntimeContextProvider,
txnProperties);
-
+
indexLifecycleManager = new DatasetLifecycleManager(storageProperties, localResourceRepository,
- MetadataPrimaryIndexes.FIRST_AVAILABLE_USER_DATASET_ID,(LogManager)txnSubsystem.getLogManager());
-
+ MetadataPrimaryIndexes.FIRST_AVAILABLE_USER_DATASET_ID, (LogManager) txnSubsystem.getLogManager());
+
isShuttingdown = false;
feedManager = new FeedManager(ncApplicationContext.getNodeId(), feedProperties,
@@ -243,7 +242,7 @@
public AsterixExternalProperties getExternalProperties() {
return externalProperties;
}
-
+
@Override
public AsterixFeedProperties getFeedProperties() {
return feedProperties;
diff --git a/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplicationEntryPoint.java b/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplicationEntryPoint.java
index 1a3953d..1ad34da 100644
--- a/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplicationEntryPoint.java
+++ b/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplicationEntryPoint.java
@@ -84,7 +84,7 @@
MetadataManager.INSTANCE = new MetadataManager(proxy, metadataProperties);
AsterixAppContextInfo.getInstance().getCCApplicationContext()
- .addJobLifecycleListener(FeedLifecycleListener.INSTANCE);
+ .addJobLifecycleListener(FeedLifecycleListener.INSTANCE);
AsterixExternalProperties externalProperties = AsterixAppContextInfo.getInstance().getExternalProperties();
setupWebServer(externalProperties);
@@ -92,17 +92,18 @@
setupJSONAPIServer(externalProperties);
jsonAPIServer.start();
- ExternalLibraryBootstrap.setUpExternaLibraries(false);
setupFeedServer(externalProperties);
feedServer.start();
- centralFeedManager = CentralFeedManager.getInstance();
- centralFeedManager.start();
waitUntilServerStart(webServer);
waitUntilServerStart(jsonAPIServer);
waitUntilServerStart(feedServer);
+ ExternalLibraryBootstrap.setUpExternaLibraries(false);
+ centralFeedManager = CentralFeedManager.getInstance();
+ centralFeedManager.start();
+
AsterixGlobalRecoveryManager.INSTANCE = new AsterixGlobalRecoveryManager(
(HyracksConnection) getNewHyracksClientConnection());
ClusterManager.INSTANCE.registerSubscriber(AsterixGlobalRecoveryManager.INSTANCE);
@@ -182,7 +183,7 @@
feedServer.setHandler(context);
context.addServlet(new ServletHolder(new FeedServlet()), "/");
-
+
// add paths here
}
}
\ No newline at end of file
diff --git a/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java b/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java
index bb21465..63f862c 100644
--- a/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java
+++ b/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java
@@ -112,12 +112,6 @@
LOGGER.info("System is in a state: " + systemState);
}
- if (systemState != SystemState.NEW_UNIVERSE) {
- PersistentLocalResourceRepository localResourceRepository = (PersistentLocalResourceRepository) runtimeContext
- .getLocalResourceRepository();
- localResourceRepository.initialize(nodeId, null, false, runtimeContext.getResourceIdFactory());
- }
-
if (systemState == SystemState.CORRUPTED) {
recoveryMgr.startRecovery(true);
}
@@ -165,7 +159,7 @@
PersistentLocalResourceRepository localResourceRepository = (PersistentLocalResourceRepository) runtimeContext
.getLocalResourceRepository();
- localResourceRepository.initialize(nodeId, metadataProperties.getStores().get(nodeId)[0], true, null);
+ localResourceRepository.initialize(nodeId, metadataProperties.getStores().get(nodeId)[0]);
}
IAsterixStateProxy proxy = null;
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java b/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
index 9df4970..741e106 100644
--- a/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
+++ b/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
@@ -77,8 +77,15 @@
}
@Override
- public synchronized IIndex getIndex(long resourceID) throws HyracksDataException {
- DatasetInfo dsInfo = datasetInfos.get(getDIDfromRID(resourceID));
+ public synchronized IIndex getIndex(String resourceName) throws HyracksDataException {
+ int datasetID = getDIDfromResourceName(resourceName);
+ long resourceID = getResourceIDfromResourceName(resourceName);
+ return getIndex(datasetID, resourceID);
+ }
+
+ @Override
+ public synchronized IIndex getIndex(int datasetID, long resourceID) throws HyracksDataException {
+ DatasetInfo dsInfo = datasetInfos.get(datasetID);
if (dsInfo == null) {
return null;
}
@@ -90,8 +97,9 @@
}
@Override
- public synchronized void register(long resourceID, IIndex index) throws HyracksDataException {
- int did = getDIDfromRID(resourceID);
+ public synchronized void register(String resourceName, IIndex index) throws HyracksDataException {
+ int did = getDIDfromResourceName(resourceName);
+ long resourceID = getResourceIDfromResourceName(resourceName);
DatasetInfo dsInfo = datasetInfos.get(did);
if (dsInfo == null) {
dsInfo = getDatasetInfo(did);
@@ -106,17 +114,27 @@
dsInfo.indexes.put(resourceID, new IndexInfo((ILSMIndex) index));
}
- private int getDIDfromRID(long resourceID) throws HyracksDataException {
- LocalResource lr = resourceRepository.getResourceById(resourceID);
+ private int getDIDfromResourceName(String resourceName) throws HyracksDataException {
+ LocalResource lr = resourceRepository.getResourceByName(resourceName);
if (lr == null) {
return -1;
}
return ((ILocalResourceMetadata) lr.getResourceObject()).getDatasetID();
}
+ private long getResourceIDfromResourceName(String resourceName) throws HyracksDataException {
+ LocalResource lr = resourceRepository.getResourceByName(resourceName);
+ if (lr == null) {
+ return -1;
+ }
+ return lr.getResourceId();
+ }
+
@Override
- public synchronized void unregister(long resourceID) throws HyracksDataException {
- int did = getDIDfromRID(resourceID);
+ public synchronized void unregister(String resourceName) throws HyracksDataException {
+ int did = getDIDfromResourceName(resourceName);
+ long resourceID = getResourceIDfromResourceName(resourceName);
+
DatasetInfo dsInfo = datasetInfos.get(did);
IndexInfo iInfo = dsInfo.indexes.get(resourceID);
@@ -155,20 +173,15 @@
dsInfo.indexes.remove(resourceID);
if (dsInfo.referenceCount == 0 && dsInfo.isOpen && dsInfo.indexes.isEmpty() && !dsInfo.isExternal) {
- List<IVirtualBufferCache> vbcs = getVirtualBufferCaches(did);
- assert vbcs != null;
- for (IVirtualBufferCache vbc : vbcs) {
- used -= (vbc.getNumPages() * vbc.getPageSize());
- }
- datasetInfos.remove(did);
- datasetVirtualBufferCaches.remove(did);
- datasetOpTrackers.remove(did);
+ removeDatasetFromCache(dsInfo.datasetID);
}
}
@Override
- public synchronized void open(long resourceID) throws HyracksDataException {
- int did = getDIDfromRID(resourceID);
+ public synchronized void open(String resourceName) throws HyracksDataException {
+ int did = getDIDfromResourceName(resourceName);
+ long resourceID = getResourceIDfromResourceName(resourceName);
+
DatasetInfo dsInfo = datasetInfos.get(did);
if (dsInfo == null || !dsInfo.isRegistered) {
throw new HyracksDataException("Failed to open index with resource ID " + resourceID
@@ -260,8 +273,10 @@
}
@Override
- public synchronized void close(long resourceID) throws HyracksDataException {
- int did = getDIDfromRID(resourceID);
+ public synchronized void close(String resourceName) throws HyracksDataException {
+ int did = getDIDfromResourceName(resourceName);
+ long resourceID = getResourceIDfromResourceName(resourceName);
+
DatasetInfo dsInfo = datasetInfos.get(did);
if (dsInfo == null) {
throw new HyracksDataException("No index found with resourceID " + resourceID);
@@ -306,6 +321,17 @@
}
}
+ private void removeDatasetFromCache(int datasetID) {
+ List<IVirtualBufferCache> vbcs = getVirtualBufferCaches(datasetID);
+ assert vbcs != null;
+ for (IVirtualBufferCache vbc : vbcs) {
+ used -= (vbc.getNumPages() * vbc.getPageSize());
+ }
+ datasetInfos.remove(datasetID);
+ datasetVirtualBufferCaches.remove(datasetID);
+ datasetOpTrackers.remove(datasetID);
+ }
+
public ILSMOperationTracker getOperationTracker(int datasetID) {
synchronized (datasetOpTrackers) {
ILSMOperationTracker opTracker = datasetOpTrackers.get(datasetID);
@@ -505,7 +531,6 @@
}
if (asyncFlush) {
-
for (IndexInfo iInfo : dsInfo.indexes.values()) {
ILSMIndexAccessor accessor = iInfo.index.createAccessor(NoOpOperationCallback.INSTANCE,
NoOpOperationCallback.INSTANCE);
@@ -552,10 +577,7 @@
}
dsInfo.isOpen = false;
- List<IVirtualBufferCache> vbcs = getVirtualBufferCaches(dsInfo.datasetID);
- for (IVirtualBufferCache vbc : vbcs) {
- used -= vbc.getNumPages() * vbc.getPageSize();
- }
+ removeDatasetFromCache(dsInfo.datasetID);
}
@Override
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/dataflow/AsterixLSMInsertDeleteOperatorNodePushable.java b/asterix-common/src/main/java/org/apache/asterix/common/dataflow/AsterixLSMInsertDeleteOperatorNodePushable.java
index 41f86b4..a5dcf18 100644
--- a/asterix-common/src/main/java/org/apache/asterix/common/dataflow/AsterixLSMInsertDeleteOperatorNodePushable.java
+++ b/asterix-common/src/main/java/org/apache/asterix/common/dataflow/AsterixLSMInsertDeleteOperatorNodePushable.java
@@ -58,7 +58,7 @@
AbstractLSMIndex lsmIndex = (AbstractLSMIndex) indexHelper.getIndexInstance();
try {
modCallback = opDesc.getModificationOpCallbackFactory().createModificationOperationCallback(
- indexHelper.getResourceID(), lsmIndex, ctx);
+ indexHelper.getResourceName(), indexHelper.getResourceID(), lsmIndex, ctx);
indexAccessor = lsmIndex.createAccessor(modCallback, NoOpOperationCallback.INSTANCE);
ITupleFilterFactory tupleFilterFactory = opDesc.getTupleFilterFactory();
if (tupleFilterFactory != null) {
diff --git a/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataNode.java b/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataNode.java
index 2b1d678..d600f57 100644
--- a/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataNode.java
+++ b/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataNode.java
@@ -284,9 +284,10 @@
private void insertTupleIntoIndex(JobId jobId, IMetadataIndex metadataIndex, ITupleReference tuple)
throws Exception {
long resourceID = metadataIndex.getResourceID();
- ILSMIndex lsmIndex = (ILSMIndex) indexLifecycleManager.getIndex(resourceID);
+ String resourceName = metadataIndex.getFile().toString();
+ ILSMIndex lsmIndex = (ILSMIndex) indexLifecycleManager.getIndex(resourceName);
try {
- indexLifecycleManager.open(resourceID);
+ indexLifecycleManager.open(resourceName);
// prepare a Callback for logging
IModificationOperationCallback modCallback = createIndexModificationCallback(jobId, resourceID,
@@ -307,7 +308,7 @@
} catch (Exception e) {
throw e;
} finally {
- indexLifecycleManager.close(resourceID);
+ indexLifecycleManager.close(resourceName);
}
}
@@ -633,9 +634,10 @@
private void deleteTupleFromIndex(JobId jobId, IMetadataIndex metadataIndex, ITupleReference tuple)
throws Exception {
long resourceID = metadataIndex.getResourceID();
- ILSMIndex lsmIndex = (ILSMIndex) indexLifecycleManager.getIndex(resourceID);
+ String resourceName = metadataIndex.getFile().toString();
+ ILSMIndex lsmIndex = (ILSMIndex) indexLifecycleManager.getIndex(resourceName);
try {
- indexLifecycleManager.open(resourceID);
+ indexLifecycleManager.open(resourceName);
// prepare a Callback for logging
IModificationOperationCallback modCallback = createIndexModificationCallback(jobId, resourceID,
metadataIndex, lsmIndex, IndexOperation.DELETE);
@@ -653,7 +655,7 @@
} catch (Exception e) {
throw e;
} finally {
- indexLifecycleManager.close(resourceID);
+ indexLifecycleManager.close(resourceName);
}
}
@@ -966,9 +968,9 @@
StringBuilder sb = new StringBuilder();
try {
IMetadataIndex index = MetadataPrimaryIndexes.DATAVERSE_DATASET;
- long resourceID = index.getResourceID();
- IIndex indexInstance = indexLifecycleManager.getIndex(resourceID);
- indexLifecycleManager.open(resourceID);
+ String resourceName = index.getFile().toString();
+ IIndex indexInstance = indexLifecycleManager.getIndex(resourceName);
+ indexLifecycleManager.open(resourceName);
IIndexAccessor indexAccessor = indexInstance.createAccessor(NoOpOperationCallback.INSTANCE,
NoOpOperationCallback.INSTANCE);
ITreeIndexCursor rangeCursor = (ITreeIndexCursor) indexAccessor.createSearchCursor(false);
@@ -986,12 +988,11 @@
} finally {
rangeCursor.close();
}
- indexLifecycleManager.close(resourceID);
+ indexLifecycleManager.close(resourceName);
index = MetadataPrimaryIndexes.DATASET_DATASET;
- resourceID = index.getResourceID();
- indexInstance = indexLifecycleManager.getIndex(resourceID);
- indexLifecycleManager.open(resourceID);
+ indexInstance = indexLifecycleManager.getIndex(resourceName);
+ indexLifecycleManager.open(resourceName);
indexAccessor = indexInstance
.createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE);
rangeCursor = (ITreeIndexCursor) indexAccessor.createSearchCursor(false);
@@ -1009,12 +1010,11 @@
} finally {
rangeCursor.close();
}
- indexLifecycleManager.close(resourceID);
+ indexLifecycleManager.close(resourceName);
index = MetadataPrimaryIndexes.INDEX_DATASET;
- resourceID = index.getResourceID();
- indexInstance = indexLifecycleManager.getIndex(resourceID);
- indexLifecycleManager.open(resourceID);
+ indexInstance = indexLifecycleManager.getIndex(resourceName);
+ indexLifecycleManager.open(resourceName);
indexAccessor = indexInstance
.createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE);
rangeCursor = (ITreeIndexCursor) indexAccessor.createSearchCursor(false);
@@ -1033,7 +1033,7 @@
} finally {
rangeCursor.close();
}
- indexLifecycleManager.close(resourceID);
+ indexLifecycleManager.close(resourceName);
} catch (Exception e) {
e.printStackTrace();
}
@@ -1043,9 +1043,9 @@
private <ResultType> void searchIndex(JobId jobId, IMetadataIndex index, ITupleReference searchKey,
IValueExtractor<ResultType> valueExtractor, List<ResultType> results) throws Exception {
IBinaryComparatorFactory[] comparatorFactories = index.getKeyBinaryComparatorFactory();
- long resourceID = index.getResourceID();
- IIndex indexInstance = indexLifecycleManager.getIndex(resourceID);
- indexLifecycleManager.open(resourceID);
+ String resourceName = index.getFile().toString();
+ IIndex indexInstance = indexLifecycleManager.getIndex(resourceName);
+ indexLifecycleManager.open(resourceName);
IIndexAccessor indexAccessor = indexInstance.createAccessor(NoOpOperationCallback.INSTANCE,
NoOpOperationCallback.INSTANCE);
ITreeIndexCursor rangeCursor = (ITreeIndexCursor) indexAccessor.createSearchCursor(false);
@@ -1074,16 +1074,16 @@
} finally {
rangeCursor.close();
}
- indexLifecycleManager.close(resourceID);
+ indexLifecycleManager.close(resourceName);
}
@Override
public void initializeDatasetIdFactory(JobId jobId) throws MetadataException, RemoteException {
int mostRecentDatasetId = MetadataPrimaryIndexes.FIRST_AVAILABLE_USER_DATASET_ID;
- long resourceID = MetadataPrimaryIndexes.DATASET_DATASET.getResourceID();
try {
- IIndex indexInstance = indexLifecycleManager.getIndex(resourceID);
- indexLifecycleManager.open(resourceID);
+ String resourceName = MetadataPrimaryIndexes.DATASET_DATASET.getFile().toString();
+ IIndex indexInstance = indexLifecycleManager.getIndex(resourceName);
+ indexLifecycleManager.open(resourceName);
try {
IIndexAccessor indexAccessor = indexInstance.createAccessor(NoOpOperationCallback.INSTANCE,
NoOpOperationCallback.INSTANCE);
@@ -1110,7 +1110,7 @@
rangeCursor.close();
}
} finally {
- indexLifecycleManager.close(resourceID);
+ indexLifecycleManager.close(resourceName);
}
} catch (Exception e) {
diff --git a/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java b/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java
index 83d6c8f..c94133a 100644
--- a/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java
+++ b/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java
@@ -420,11 +420,11 @@
localResourceMetadata, LocalResource.LSMBTreeResource);
ILocalResourceFactory localResourceFactory = localResourceFactoryProvider.getLocalResourceFactory();
localResourceRepository.insert(localResourceFactory.createLocalResource(resourceID, path, 0));
- indexLifecycleManager.register(resourceID, lsmBtree);
+ indexLifecycleManager.register(path, lsmBtree);
} else {
final LocalResource resource = localResourceRepository.getResourceByName(path);
resourceID = resource.getResourceId();
- lsmBtree = (LSMBTree) indexLifecycleManager.getIndex(resourceID);
+ lsmBtree = (LSMBTree) indexLifecycleManager.getIndex(resource.getResourceName());
if (lsmBtree == null) {
lsmBtree = LSMBTreeUtils.createLSMTree(
virtualBufferCaches,
@@ -439,7 +439,7 @@
GlobalConfig.DEFAULT_COMPACTION_POLICY_PROPERTIES, indexLifecycleManager), opTracker,
runtimeContext.getLSMIOScheduler(), LSMBTreeIOOperationCallbackFactory.INSTANCE
.createIOOperationCallback(), index.isPrimaryIndex(), null, null, null, null, true);
- indexLifecycleManager.register(resourceID, lsmBtree);
+ indexLifecycleManager.register(path, lsmBtree);
}
}
diff --git a/asterix-transactions/pom.xml b/asterix-transactions/pom.xml
index 447c762..b9a26a0 100644
--- a/asterix-transactions/pom.xml
+++ b/asterix-transactions/pom.xml
@@ -109,13 +109,18 @@
<groupId>org.apache.hyracks</groupId>
<artifactId>hyracks-storage-am-lsm-invertedindex</artifactId>
</dependency>
- <dependency>
- <groupId>org.apache.asterix</groupId>
- <artifactId>asterix-common</artifactId>
- <version>0.8.7-SNAPSHOT</version>
- <type>jar</type>
- <scope>compile</scope>
- </dependency>
+ <dependency>
+ <groupId>org.apache.asterix</groupId>
+ <artifactId>asterix-common</artifactId>
+ <version>0.8.7-SNAPSHOT</version>
+ <type>jar</type>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ <version>18.0</version>
+ </dependency>
</dependencies>
</project>
diff --git a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallbackFactory.java b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallbackFactory.java
index 21b90ed..4c2f25d 100644
--- a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallbackFactory.java
+++ b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallbackFactory.java
@@ -50,13 +50,13 @@
}
@Override
- public IModificationOperationCallback createModificationOperationCallback(long resourceId, Object resource,
- IHyracksTaskContext ctx) throws HyracksDataException {
+ public IModificationOperationCallback createModificationOperationCallback(String resourceName, long resourceId,
+ Object resource, IHyracksTaskContext ctx) throws HyracksDataException {
ITransactionSubsystem txnSubsystem = txnSubsystemProvider.getTransactionSubsystem(ctx);
IIndexLifecycleManager indexLifeCycleManager = txnSubsystem.getAsterixAppRuntimeContextProvider()
.getIndexLifecycleManager();
- ILSMIndex index = (ILSMIndex) indexLifeCycleManager.getIndex(resourceId);
+ ILSMIndex index = (ILSMIndex) indexLifeCycleManager.getIndex(resourceName);
if (index == null) {
throw new HyracksDataException("Index(id:" + resourceId + ") is not registered.");
}
diff --git a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexModificationOperationCallbackFactory.java b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexModificationOperationCallbackFactory.java
index df29491..9c897f2 100644
--- a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexModificationOperationCallbackFactory.java
+++ b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexModificationOperationCallbackFactory.java
@@ -47,13 +47,12 @@
}
@Override
- public IModificationOperationCallback createModificationOperationCallback(long resourceId, Object resource,
- IHyracksTaskContext ctx) throws HyracksDataException {
-
+ public IModificationOperationCallback createModificationOperationCallback(String resourceName, long resourceId,
+ Object resource, IHyracksTaskContext ctx) throws HyracksDataException {
ITransactionSubsystem txnSubsystem = txnSubsystemProvider.getTransactionSubsystem(ctx);
IIndexLifecycleManager indexLifeCycleManager = txnSubsystem.getAsterixAppRuntimeContextProvider()
.getIndexLifecycleManager();
- ILSMIndex index = (ILSMIndex) indexLifeCycleManager.getIndex(resourceId);
+ ILSMIndex index = (ILSMIndex) indexLifeCycleManager.getIndex(resourceName);
if (index == null) {
throw new HyracksDataException("Index(id:" + resourceId + ") is not registered.");
}
diff --git a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/TempDatasetPrimaryIndexModificationOperationCallbackFactory.java b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/TempDatasetPrimaryIndexModificationOperationCallbackFactory.java
index 5f9fd0e..4de0749 100644
--- a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/TempDatasetPrimaryIndexModificationOperationCallbackFactory.java
+++ b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/TempDatasetPrimaryIndexModificationOperationCallbackFactory.java
@@ -34,25 +34,26 @@
import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
-public class TempDatasetPrimaryIndexModificationOperationCallbackFactory extends AbstractOperationCallbackFactory implements
- IModificationOperationCallbackFactory {
+public class TempDatasetPrimaryIndexModificationOperationCallbackFactory extends AbstractOperationCallbackFactory
+ implements IModificationOperationCallbackFactory {
private static final long serialVersionUID = 1L;
private final IndexOperation indexOp;
- public TempDatasetPrimaryIndexModificationOperationCallbackFactory(JobId jobId, int datasetId, int[] primaryKeyFields,
- ITransactionSubsystemProvider txnSubsystemProvider, IndexOperation indexOp, byte resourceType) {
+ public TempDatasetPrimaryIndexModificationOperationCallbackFactory(JobId jobId, int datasetId,
+ int[] primaryKeyFields, ITransactionSubsystemProvider txnSubsystemProvider, IndexOperation indexOp,
+ byte resourceType) {
super(jobId, datasetId, primaryKeyFields, txnSubsystemProvider, resourceType);
this.indexOp = indexOp;
}
@Override
- public IModificationOperationCallback createModificationOperationCallback(long resourceId, Object resource,
- IHyracksTaskContext ctx) throws HyracksDataException {
+ public IModificationOperationCallback createModificationOperationCallback(String resourceName, long resourceId,
+ Object resource, IHyracksTaskContext ctx) throws HyracksDataException {
ITransactionSubsystem txnSubsystem = txnSubsystemProvider.getTransactionSubsystem(ctx);
IIndexLifecycleManager indexLifeCycleManager = txnSubsystem.getAsterixAppRuntimeContextProvider()
.getIndexLifecycleManager();
- ILSMIndex index = (ILSMIndex) indexLifeCycleManager.getIndex(resourceId);
+ ILSMIndex index = (ILSMIndex) indexLifeCycleManager.getIndex(resourceName);
if (index == null) {
throw new HyracksDataException("Index(id:" + resourceId + ") is not registered.");
}
diff --git a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/TempDatasetSecondaryIndexModificationOperationCallbackFactory.java b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/TempDatasetSecondaryIndexModificationOperationCallbackFactory.java
index 8a51c20..c2f56a0 100644
--- a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/TempDatasetSecondaryIndexModificationOperationCallbackFactory.java
+++ b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/TempDatasetSecondaryIndexModificationOperationCallbackFactory.java
@@ -34,25 +34,26 @@
import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
-public class TempDatasetSecondaryIndexModificationOperationCallbackFactory extends AbstractOperationCallbackFactory implements
- IModificationOperationCallbackFactory {
+public class TempDatasetSecondaryIndexModificationOperationCallbackFactory extends AbstractOperationCallbackFactory
+ implements IModificationOperationCallbackFactory {
private static final long serialVersionUID = 1L;
private final IndexOperation indexOp;
- public TempDatasetSecondaryIndexModificationOperationCallbackFactory(JobId jobId, int datasetId, int[] primaryKeyFields,
- ITransactionSubsystemProvider txnSubsystemProvider, IndexOperation indexOp, byte resourceType) {
+ public TempDatasetSecondaryIndexModificationOperationCallbackFactory(JobId jobId, int datasetId,
+ int[] primaryKeyFields, ITransactionSubsystemProvider txnSubsystemProvider, IndexOperation indexOp,
+ byte resourceType) {
super(jobId, datasetId, primaryKeyFields, txnSubsystemProvider, resourceType);
this.indexOp = indexOp;
}
@Override
- public IModificationOperationCallback createModificationOperationCallback(long resourceId, Object resource,
- IHyracksTaskContext ctx) throws HyracksDataException {
+ public IModificationOperationCallback createModificationOperationCallback(String resourceName, long resourceId,
+ Object resource, IHyracksTaskContext ctx) throws HyracksDataException {
ITransactionSubsystem txnSubsystem = txnSubsystemProvider.getTransactionSubsystem(ctx);
IIndexLifecycleManager indexLifeCycleManager = txnSubsystem.getAsterixAppRuntimeContextProvider()
.getIndexLifecycleManager();
- ILSMIndex index = (ILSMIndex) indexLifeCycleManager.getIndex(resourceId);
+ ILSMIndex index = (ILSMIndex) indexLifeCycleManager.getIndex(resourceName);
if (index == null) {
throw new HyracksDataException("Index(id:" + resourceId + ") is not registered.");
}
diff --git a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
index abc98c7..1eda9cc 100644
--- a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
+++ b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
@@ -25,10 +25,8 @@
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
-import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
-import java.util.Map;
import java.util.logging.Level;
import java.util.logging.Logger;
@@ -36,7 +34,9 @@
import org.apache.hyracks.api.io.IODeviceHandle;
import org.apache.hyracks.storage.common.file.ILocalResourceRepository;
import org.apache.hyracks.storage.common.file.LocalResource;
-import org.apache.hyracks.storage.common.file.ResourceIdFactory;
+
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
public class PersistentLocalResourceRepository implements ILocalResourceRepository {
@@ -46,14 +46,14 @@
private static final String ROOT_METADATA_FILE_NAME_PREFIX = ".asterix_root_metadata";
private static final long ROOT_LOCAL_RESOURCE_ID = -4321;
private static final String METADATA_FILE_NAME = ".metadata";
- private Map<String, LocalResource> name2ResourceMap = new HashMap<String, LocalResource>();
- private Map<Long, LocalResource> id2ResourceMap = new HashMap<Long, LocalResource>();
- private final int numIODevices;
+ private final Cache<String, LocalResource> resourceCache;
+ private final String nodeId;
+ private static final int MAX_CACHED_RESOURCES = 1000;
- public PersistentLocalResourceRepository(List<IODeviceHandle> devices) throws HyracksDataException {
- numIODevices = devices.size();
- this.mountPoints = new String[numIODevices];
- for (int i = 0; i < numIODevices; i++) {
+ public PersistentLocalResourceRepository(List<IODeviceHandle> devices, String nodeId) throws HyracksDataException {
+ mountPoints = new String[devices.size()];
+ this.nodeId = nodeId;
+ for (int i = 0; i < mountPoints.length; i++) {
String mountPoint = devices.get(i).getPath().getPath();
File mountPointDir = new File(mountPoint);
if (!mountPointDir.exists()) {
@@ -65,167 +65,89 @@
mountPoints[i] = new String(mountPoint);
}
}
+
+ resourceCache = CacheBuilder.newBuilder().maximumSize(MAX_CACHED_RESOURCES).build();
}
private String prepareRootMetaDataFileName(String mountPoint, String nodeId, int ioDeviceId) {
return mountPoint + ROOT_METADATA_DIRECTORY + File.separator + nodeId + "_" + "iodevice" + ioDeviceId;
}
- public void initialize(String nodeId, String rootDir, boolean isNewUniverse, ResourceIdFactory resourceIdFactory)
- throws HyracksDataException {
+ public void initialize(String nodeId, String rootDir) throws HyracksDataException {
if (LOGGER.isLoggable(Level.INFO)) {
LOGGER.info("Initializing local resource repository ... ");
}
- if (isNewUniverse) {
- //#. if the rootMetadataFile doesn't exist, create it and return.
- for (int i = 0; i < numIODevices; i++) {
- String rootMetadataFileName = prepareRootMetaDataFileName(mountPoints[i], nodeId, i) + File.separator
- + ROOT_METADATA_FILE_NAME_PREFIX;
- File rootMetadataFile = new File(rootMetadataFileName);
-
- File rootMetadataDir = new File(prepareRootMetaDataFileName(mountPoints[i], nodeId, i));
- if (!rootMetadataDir.exists()) {
- boolean success = rootMetadataDir.mkdirs();
- if (!success) {
- throw new IllegalStateException(
- "Unable to create root metadata directory of PersistentLocalResourceRepository in "
- + rootMetadataDir.getAbsolutePath());
- }
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("created the root-metadata-file's directory: " + rootMetadataDir.getAbsolutePath());
- }
- }
-
- rootMetadataFile.delete();
- String mountedRootDir;
- if (rootDir.startsWith(System.getProperty("file.separator"))) {
- mountedRootDir = new String(mountPoints[i]
- + rootDir.substring(System.getProperty("file.separator").length()));
- } else {
- mountedRootDir = new String(mountPoints[i] + rootDir);
- }
- LocalResource rootLocalResource = new LocalResource(ROOT_LOCAL_RESOURCE_ID, rootMetadataFileName, 0, 0,
- mountedRootDir);
- insert(rootLocalResource);
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("created the root-metadata-file: " + rootMetadataFileName);
- }
-
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("Completed the initialization of the local resource repository");
- }
- }
- return;
- }
-
- FilenameFilter filter = new FilenameFilter() {
- public boolean accept(File dir, String name) {
- if (name.equalsIgnoreCase(METADATA_FILE_NAME)) {
- return true;
- } else {
- return false;
- }
- }
- };
-
- long maxResourceId = 0;
- for (int i = 0; i < numIODevices; i++) {
+ //if the rootMetadataFile doesn't exist, create it.
+ for (int i = 0; i < mountPoints.length; i++) {
String rootMetadataFileName = prepareRootMetaDataFileName(mountPoints[i], nodeId, i) + File.separator
+ ROOT_METADATA_FILE_NAME_PREFIX;
File rootMetadataFile = new File(rootMetadataFileName);
- //#. if the rootMetadataFile exists, read it and set this.rootDir.
- LocalResource rootLocalResource = readLocalResource(rootMetadataFile);
- String mountedRootDir = (String) rootLocalResource.getResourceObject();
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("The root directory of the local resource repository is " + mountedRootDir);
- }
- //#. load all local resources.
- File rootDirFile = new File(mountedRootDir);
- if (!rootDirFile.exists()) {
- //rootDir may not exist if this node is not the metadata node and doesn't have any user data.
+ File rootMetadataDir = new File(prepareRootMetaDataFileName(mountPoints[i], nodeId, i));
+ if (!rootMetadataDir.exists()) {
+ boolean success = rootMetadataDir.mkdirs();
+ if (!success) {
+ throw new IllegalStateException(
+ "Unable to create root metadata directory of PersistentLocalResourceRepository in "
+ + rootMetadataDir.getAbsolutePath());
+ }
if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("The root directory of the local resource repository doesn't exist: there is no local resource.");
- LOGGER.info("Completed the initialization of the local resource repository");
+ LOGGER.info("created the root-metadata-file's directory: " + rootMetadataDir.getAbsolutePath());
}
- continue;
}
- File[] dataverseFileList = rootDirFile.listFiles();
- if (dataverseFileList == null) {
- throw new HyracksDataException("Metadata dataverse doesn't exist.");
+ rootMetadataFile.delete();
+ String mountedRootDir;
+ if (rootDir.startsWith(System.getProperty("file.separator"))) {
+ mountedRootDir = new String(mountPoints[i]
+ + rootDir.substring(System.getProperty("file.separator").length()));
+ } else {
+ mountedRootDir = new String(mountPoints[i] + rootDir);
}
- for (File dataverseFile : dataverseFileList) {
- if (dataverseFile.isDirectory()) {
- File[] indexFileList = dataverseFile.listFiles();
- if (indexFileList != null) {
- for (File indexFile : indexFileList) {
- if (indexFile.isDirectory()) {
- File[] ioDevicesList = indexFile.listFiles();
- if (ioDevicesList != null) {
- for (File ioDeviceFile : ioDevicesList) {
- if (ioDeviceFile.isDirectory()) {
- File[] metadataFiles = ioDeviceFile.listFiles(filter);
- if (metadataFiles != null) {
- for (File metadataFile : metadataFiles) {
- LocalResource localResource = readLocalResource(metadataFile);
- id2ResourceMap.put(localResource.getResourceId(), localResource);
- name2ResourceMap
- .put(localResource.getResourceName(), localResource);
- maxResourceId = Math.max(localResource.getResourceId(),
- maxResourceId);
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("loaded local resource - [id: "
- + localResource.getResourceId() + ", name: "
- + localResource.getResourceName() + "]");
- }
- }
- }
- }
- }
- }
- }
- }
- }
- }
+ LocalResource rootLocalResource = new LocalResource(ROOT_LOCAL_RESOURCE_ID, rootMetadataFileName, 0, 0,
+ mountedRootDir);
+ insert(rootLocalResource);
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("created the root-metadata-file: " + rootMetadataFileName);
}
}
- resourceIdFactory.initId(maxResourceId + 1);
+
if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("The resource id factory is intialized with the value: " + (maxResourceId + 1));
LOGGER.info("Completed the initialization of the local resource repository");
}
}
@Override
- public LocalResource getResourceById(long id) throws HyracksDataException {
- return id2ResourceMap.get(id);
- }
-
- @Override
public LocalResource getResourceByName(String name) throws HyracksDataException {
- return name2ResourceMap.get(name);
+ LocalResource resource = resourceCache.getIfPresent(name);
+ if (resource == null) {
+ File resourceFile = getLocalResourceFileByName(name);
+ if (resourceFile.exists()) {
+ resource = readLocalResource(resourceFile);
+ resourceCache.put(name, resource);
+ }
+ }
+ return resource;
}
@Override
public synchronized void insert(LocalResource resource) throws HyracksDataException {
- long id = resource.getResourceId();
+ File resourceFile = new File(getFileName(resource.getResourceName(), resource.getResourceId()));
- if (id2ResourceMap.containsKey(id)) {
+ if (resourceFile.exists()) {
throw new HyracksDataException("Duplicate resource");
}
if (resource.getResourceId() != ROOT_LOCAL_RESOURCE_ID) {
- id2ResourceMap.put(id, resource);
- name2ResourceMap.put(resource.getResourceName(), resource);
+ resourceCache.put(resource.getResourceName(), resource);
}
FileOutputStream fos = null;
ObjectOutputStream oosToFos = null;
try {
- fos = new FileOutputStream(getFileName(resource.getResourceName(), resource.getResourceId()));
+ fos = new FileOutputStream(resourceFile);
oosToFos = new ObjectOutputStream(fos);
oosToFos.writeObject(resource);
oosToFos.flush();
@@ -250,40 +172,133 @@
}
@Override
- public synchronized void deleteResourceById(long id) throws HyracksDataException {
- LocalResource resource = id2ResourceMap.get(id);
- if (resource == null) {
- throw new HyracksDataException("Resource doesn't exist");
- }
- id2ResourceMap.remove(id);
- name2ResourceMap.remove(resource.getResourceName());
- File file = new File(getFileName(resource.getResourceName(), resource.getResourceId()));
- file.delete();
- }
-
- @Override
public synchronized void deleteResourceByName(String name) throws HyracksDataException {
- LocalResource resource = name2ResourceMap.get(name);
- if (resource == null) {
+ File resourceFile = getLocalResourceFileByName(name);
+ if (resourceFile.exists()) {
+ resourceFile.delete();
+ resourceCache.invalidate(name);
+ } else {
throw new HyracksDataException("Resource doesn't exist");
}
- id2ResourceMap.remove(resource.getResourceId());
- name2ResourceMap.remove(name);
- File file = new File(getFileName(resource.getResourceName(), resource.getResourceId()));
- file.delete();
+ }
+
+ private static File getLocalResourceFileByName(String resourceName) {
+ return new File(resourceName + File.separator + METADATA_FILE_NAME);
+ }
+
+ public HashMap<Long, LocalResource> loadAndGetAllResources() throws HyracksDataException {
+ //TODO During recovery, the memory usage currently is proportional to the number of resources available.
+ //This could be fixed by traversing all resources on disk until the required resource is found.
+ HashMap<Long, LocalResource> resourcesMap = new HashMap<Long, LocalResource>();
+
+ for (int i = 0; i < mountPoints.length; i++) {
+ String rootMetadataFileName = prepareRootMetaDataFileName(mountPoints[i], nodeId, i) + File.separator
+ + ROOT_METADATA_FILE_NAME_PREFIX;
+ File rootMetadataFile = new File(rootMetadataFileName);
+ if (!rootMetadataFile.exists()) {
+ continue;
+ }
+ //if the rootMetadataFile exists, read it and set it as mounting point root
+ LocalResource rootLocalResource = readLocalResource(rootMetadataFile);
+ String mountedRootDir = (String) rootLocalResource.getResourceObject();
+
+ File rootDirFile = new File(mountedRootDir);
+ if (!rootDirFile.exists()) {
+ //rootDir may not exist if this node is not the metadata node and doesn't have any user data.
+ continue;
+ }
+
+ //load all local resources.
+ File[] dataverseFileList = rootDirFile.listFiles();
+ if (dataverseFileList != null) {
+ for (File dataverseFile : dataverseFileList) {
+ if (dataverseFile.isDirectory()) {
+ File[] indexFileList = dataverseFile.listFiles();
+ if (indexFileList != null) {
+ for (File indexFile : indexFileList) {
+ if (indexFile.isDirectory()) {
+ File[] ioDevicesList = indexFile.listFiles();
+ if (ioDevicesList != null) {
+ for (File ioDeviceFile : ioDevicesList) {
+ if (ioDeviceFile.isDirectory()) {
+ File[] metadataFiles = ioDeviceFile.listFiles(METADATA_FILES_FILTER);
+ if (metadataFiles != null) {
+ for (File metadataFile : metadataFiles) {
+ LocalResource localResource = readLocalResource(metadataFile);
+ resourcesMap.put(localResource.getResourceId(), localResource);
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+
+ return resourcesMap;
}
@Override
- public List<LocalResource> getAllResources() throws HyracksDataException {
- List<LocalResource> resources = new ArrayList<LocalResource>();
- for (LocalResource resource : id2ResourceMap.values()) {
- resources.add(resource);
+ public long getMaxResourceID() throws HyracksDataException {
+ long maxResourceId = 0;
+
+ for (int i = 0; i < mountPoints.length; i++) {
+ String rootMetadataFileName = prepareRootMetaDataFileName(mountPoints[i], nodeId, i) + File.separator
+ + ROOT_METADATA_FILE_NAME_PREFIX;
+ File rootMetadataFile = new File(rootMetadataFileName);
+ if (!rootMetadataFile.exists()) {
+ continue;
+ }
+
+ //if the rootMetadataFile exists, read it and set it as mounting point root
+ LocalResource rootLocalResource = readLocalResource(rootMetadataFile);
+ String mountedRootDir = (String) rootLocalResource.getResourceObject();
+
+ File rootDirFile = new File(mountedRootDir);
+ if (!rootDirFile.exists()) {
+ continue;
+ }
+
+ //traverse all local resources.
+ File[] dataverseFileList = rootDirFile.listFiles();
+ if (dataverseFileList != null) {
+ for (File dataverseFile : dataverseFileList) {
+ if (dataverseFile.isDirectory()) {
+ File[] indexFileList = dataverseFile.listFiles();
+ if (indexFileList != null) {
+ for (File indexFile : indexFileList) {
+ if (indexFile.isDirectory()) {
+ File[] ioDevicesList = indexFile.listFiles();
+ if (ioDevicesList != null) {
+ for (File ioDeviceFile : ioDevicesList) {
+ if (ioDeviceFile.isDirectory()) {
+ File[] metadataFiles = ioDeviceFile.listFiles(METADATA_FILES_FILTER);
+ if (metadataFiles != null) {
+ for (File metadataFile : metadataFiles) {
+ LocalResource localResource = readLocalResource(metadataFile);
+ maxResourceId = Math.max(maxResourceId,
+ localResource.getResourceId());
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ }
}
- return resources;
+
+ return maxResourceId;
}
private String getFileName(String baseDir, long resourceId) {
-
if (resourceId == ROOT_LOCAL_RESOURCE_ID) {
return baseDir;
} else {
@@ -323,4 +338,14 @@
}
}
}
+
+ private static final FilenameFilter METADATA_FILES_FILTER = new FilenameFilter() {
+ public boolean accept(File dir, String name) {
+ if (name.equalsIgnoreCase(METADATA_FILE_NAME)) {
+ return true;
+ } else {
+ return false;
+ }
+ }
+ };
}
diff --git a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepositoryFactory.java b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepositoryFactory.java
index 0638cae..b6bb7dc 100644
--- a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepositoryFactory.java
+++ b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepositoryFactory.java
@@ -24,14 +24,16 @@
import org.apache.hyracks.storage.common.file.ILocalResourceRepositoryFactory;
public class PersistentLocalResourceRepositoryFactory implements ILocalResourceRepositoryFactory {
- private IIOManager ioManager;
+ private final IIOManager ioManager;
+ private final String nodeId;
- public PersistentLocalResourceRepositoryFactory(IIOManager ioManager) {
+ public PersistentLocalResourceRepositoryFactory(IIOManager ioManager, String nodeId) {
this.ioManager = ioManager;
+ this.nodeId = nodeId;
}
@Override
public ILocalResourceRepository createRepository() throws HyracksDataException {
- return new PersistentLocalResourceRepository(ioManager.getIODevices());
+ return new PersistentLocalResourceRepository(ioManager.getIODevices(), nodeId);
}
}
\ No newline at end of file
diff --git a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/locking/TestRuntimeContextProvider.java b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/locking/TestRuntimeContextProvider.java
index 730116a..f6880db 100644
--- a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/locking/TestRuntimeContextProvider.java
+++ b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/locking/TestRuntimeContextProvider.java
@@ -39,9 +39,9 @@
class TestRuntimeContextProvider implements IAsterixAppRuntimeContextProvider {
- AsterixThreadExecutor ate = new AsterixThreadExecutor(Executors.defaultThreadFactory());
+ AsterixThreadExecutor ate = new AsterixThreadExecutor(Executors.defaultThreadFactory());
IIndexLifecycleManager ilm = new IndexLifecycleManager();
-
+
@Override
public AsterixThreadExecutor getThreadExecutor() {
return ate;
@@ -108,36 +108,39 @@
}
static class IndexLifecycleManager implements IIndexLifecycleManager {
-
- @Override
- public IIndex getIndex(long resourceID) throws HyracksDataException {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public void register(long resourceID, IIndex index) throws HyracksDataException {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public void unregister(long resourceID) throws HyracksDataException {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public void open(long resourceID) throws HyracksDataException {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public void close(long resourceID) throws HyracksDataException {
- throw new UnsupportedOperationException();
- }
-
@Override
public List<IIndex> getOpenIndexes() {
throw new UnsupportedOperationException();
}
-
+
+ @Override
+ public void register(String resourceName, IIndex index) throws HyracksDataException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void open(String resourceName) throws HyracksDataException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void close(String resourceName) throws HyracksDataException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public IIndex getIndex(String resourceName) throws HyracksDataException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void unregister(String resourceName) throws HyracksDataException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public IIndex getIndex(int datasetID, long resourceID) throws HyracksDataException {
+ throw new UnsupportedOperationException();
+ }
}
}
\ No newline at end of file
diff --git a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/RecoveryManager.java b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/RecoveryManager.java
index f59bc84..82ad32d 100644
--- a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/RecoveryManager.java
+++ b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/RecoveryManager.java
@@ -51,6 +51,7 @@
import org.apache.asterix.common.transactions.IRecoveryManager;
import org.apache.asterix.common.transactions.ITransactionContext;
import org.apache.asterix.common.transactions.LogType;
+import org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository;
import org.apache.asterix.transaction.management.service.logging.LogManager;
import org.apache.asterix.transaction.management.service.transaction.TransactionManagementConstants;
import org.apache.asterix.transaction.management.service.transaction.TransactionManager;
@@ -82,7 +83,6 @@
private final LogManager logMgr;
private final int checkpointHistory;
private final long SHARP_CHECKPOINT_LSN = -1;
-
/**
* A file at a known location that contains the LSN of the last log record
* traversed doing a successful checkpoint.
@@ -249,6 +249,8 @@
IIndexLifecycleManager indexLifecycleManager = appRuntimeContext.getIndexLifecycleManager();
ILocalResourceRepository localResourceRepository = appRuntimeContext.getLocalResourceRepository();
+ Map<Long, LocalResource> resourcesMap = ((PersistentLocalResourceRepository) localResourceRepository)
+ .loadAndGetAllResources();
//#. set log reader to the lowWaterMarkLsn again.
logReader.initializeScan(lowWaterMarkLSN);
logRecord = logReader.next();
@@ -273,39 +275,38 @@
}
if (foundWinner) {
resourceId = logRecord.getResourceId();
- localResource = localResourceRepository.getResourceById(resourceId);
+ localResource = resourcesMap.get(resourceId);
+
+ /*******************************************************************
+ * [Notice]
+ * -> Issue
+ * Delete index may cause a problem during redo.
+ * The index operation to be redone couldn't be redone because the corresponding index
+ * may not exist in NC due to the possible index drop DDL operation.
+ * -> Approach
+ * Avoid the problem during redo.
+ * More specifically, the problem will be detected when the localResource of
+ * the corresponding index is retrieved, which will end up with 'null'.
+ * If null is returned, then just go and process the next
+ * log record.
+ *******************************************************************/
+ if (localResource == null) {
+ logRecord = logReader.next();
+ continue;
+ }
+ /*******************************************************************/
//get index instance from IndexLifeCycleManager
//if index is not registered into IndexLifeCycleManager,
//create the index using LocalMetadata stored in LocalResourceRepository
- index = (ILSMIndex) indexLifecycleManager.getIndex(resourceId);
+ index = (ILSMIndex) indexLifecycleManager.getIndex(localResource.getResourceName());
if (index == null) {
-
- /*******************************************************************
- * [Notice]
- * -> Issue
- * Delete index may cause a problem during redo.
- * The index operation to be redone couldn't be redone because the corresponding index
- * may not exist in NC due to the possible index drop DDL operation.
- * -> Approach
- * Avoid the problem during redo.
- * More specifically, the problem will be detected when the localResource of
- * the corresponding index is retrieved, which will end up with 'null' return from
- * localResourceRepository. If null is returned, then just go and process the next
- * log record.
- *******************************************************************/
- if (localResource == null) {
- logRecord = logReader.next();
- continue;
- }
- /*******************************************************************/
-
//#. create index instance and register to indexLifeCycleManager
localResourceMetadata = (ILocalResourceMetadata) localResource.getResourceObject();
index = localResourceMetadata.createIndexInstance(appRuntimeContext,
localResource.getResourceName(), localResource.getPartition());
- indexLifecycleManager.register(resourceId, index);
- indexLifecycleManager.open(resourceId);
+ indexLifecycleManager.register(localResource.getResourceName(), index);
+ indexLifecycleManager.open(localResource.getResourceName());
//#. get maxDiskLastLSN
ILSMIndex lsmIndex = (ILSMIndex) index;
@@ -342,7 +343,7 @@
//close all indexes
Set<Long> resourceIdList = resourceId2MaxLSNMap.keySet();
for (long r : resourceIdList) {
- indexLifecycleManager.close(r);
+ indexLifecycleManager.close(resourcesMap.get(r).getResourceName());
}
logReader.close();
@@ -356,11 +357,12 @@
}
@Override
- public synchronized long checkpoint(boolean isSharpCheckpoint, long nonSharpCheckpointTargetLSN) throws ACIDException, HyracksDataException {
+ public synchronized long checkpoint(boolean isSharpCheckpoint, long nonSharpCheckpointTargetLSN)
+ throws ACIDException, HyracksDataException {
long minMCTFirstLSN;
boolean nonSharpCheckpointSucceeded = false;
-
+
if (isSharpCheckpoint && LOGGER.isLoggable(Level.INFO)) {
LOGGER.info("Starting sharp checkpoint ... ");
}
@@ -372,7 +374,8 @@
// right after the new checkpoint file is written.
File[] prevCheckpointFiles = getPreviousCheckpointFiles();
- DatasetLifecycleManager datasetLifecycleManager = (DatasetLifecycleManager)txnSubsystem.getAsterixAppRuntimeContextProvider().getIndexLifecycleManager();
+ DatasetLifecycleManager datasetLifecycleManager = (DatasetLifecycleManager) txnSubsystem
+ .getAsterixAppRuntimeContextProvider().getIndexLifecycleManager();
//#. flush all in-memory components if it is the sharp checkpoint
if (isSharpCheckpoint) {
@@ -382,11 +385,10 @@
} else {
minMCTFirstLSN = getMinFirstLSN();
-
- if(minMCTFirstLSN >= nonSharpCheckpointTargetLSN){
+
+ if (minMCTFirstLSN >= nonSharpCheckpointTargetLSN) {
nonSharpCheckpointSucceeded = true;
- }
- else{
+ } else {
//flush datasets with indexes behind target checkpoint LSN
datasetLifecycleManager.scheduleAsyncFlushForLaggingDatasets(nonSharpCheckpointTargetLSN);
}
@@ -439,10 +441,10 @@
}
}
- if(nonSharpCheckpointSucceeded){
+ if (nonSharpCheckpointSucceeded) {
logMgr.deleteOldLogFiles(minMCTFirstLSN);
}
-
+
if (isSharpCheckpoint && LOGGER.isLoggable(Level.INFO)) {
LOGGER.info("Completed sharp checkpoint.");
}
@@ -451,9 +453,9 @@
return minMCTFirstLSN;
}
- public long getMinFirstLSN() throws HyracksDataException
- {
- IIndexLifecycleManager indexLifecycleManager = txnSubsystem.getAsterixAppRuntimeContextProvider().getIndexLifecycleManager();
+ public long getMinFirstLSN() throws HyracksDataException {
+ IIndexLifecycleManager indexLifecycleManager = txnSubsystem.getAsterixAppRuntimeContextProvider()
+ .getIndexLifecycleManager();
List<IIndex> openIndexList = indexLifecycleManager.getOpenIndexes();
long firstLSN;
//the min first lsn can only be the current append or smaller
@@ -463,8 +465,9 @@
for (IIndex index : openIndexList) {
- AbstractLSMIOOperationCallback ioCallback = (AbstractLSMIOOperationCallback)((ILSMIndex) index).getIOOperationCallback();
- if(!((AbstractLSMIndex)index).isCurrentMutableComponentEmpty() || ioCallback.hasPendingFlush()){
+ AbstractLSMIOOperationCallback ioCallback = (AbstractLSMIOOperationCallback) ((ILSMIndex) index)
+ .getIOOperationCallback();
+ if (!((AbstractLSMIndex) index).isCurrentMutableComponentEmpty() || ioCallback.hasPendingFlush()) {
firstLSN = ioCallback.getFirstLSN();
minFirstLSN = Math.min(minFirstLSN, firstLSN);
}
@@ -594,6 +597,7 @@
ILogReader logReader = logMgr.getLogReader(false);
logReader.initializeScan(firstLSN);
ILogRecord logRecord = null;
+
while (currentLSN < lastLSN) {
logRecord = logReader.next();
if (logRecord == null) {
@@ -710,7 +714,7 @@
private void undo(ILogRecord logRecord) {
try {
ILSMIndex index = (ILSMIndex) txnSubsystem.getAsterixAppRuntimeContextProvider().getIndexLifecycleManager()
- .getIndex(logRecord.getResourceId());
+ .getIndex(logRecord.getDatasetId(), logRecord.getResourceId());
ILSMIndexAccessor indexAccessor = index.createAccessor(NoOpOperationCallback.INSTANCE,
NoOpOperationCallback.INSTANCE);
if (logRecord.getNewOp() == IndexOperation.INSERT.ordinal()) {
@@ -728,7 +732,7 @@
private void redo(ILogRecord logRecord) {
try {
ILSMIndex index = (ILSMIndex) txnSubsystem.getAsterixAppRuntimeContextProvider().getIndexLifecycleManager()
- .getIndex(logRecord.getResourceId());
+ .getIndex(logRecord.getDatasetId(), logRecord.getResourceId());
ILSMIndexAccessor indexAccessor = index.createAccessor(NoOpOperationCallback.INSTANCE,
NoOpOperationCallback.INSTANCE);
if (logRecord.getNewOp() == IndexOperation.INSERT.ordinal()) {