remove staticness of lifecyclemanager and asterixthreadexecutor/factory; remove unnecessary asterixappruntimecontext used only for recovery
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/api/common/AsterixAppRuntimeContext.java b/asterix-app/src/main/java/edu/uci/ics/asterix/api/common/AsterixAppRuntimeContext.java
index e56ed92..268ed0e 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/api/common/AsterixAppRuntimeContext.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/api/common/AsterixAppRuntimeContext.java
@@ -15,8 +15,10 @@
package edu.uci.ics.asterix.api.common;
import java.io.IOException;
+import java.util.concurrent.Executor;
import java.util.logging.Logger;
+import edu.uci.ics.asterix.common.api.AsterixThreadExecutor;
import edu.uci.ics.asterix.common.api.IAsterixAppRuntimeContext;
import edu.uci.ics.asterix.common.config.AsterixCompilerProperties;
import edu.uci.ics.asterix.common.config.AsterixExternalProperties;
@@ -30,17 +32,18 @@
import edu.uci.ics.asterix.common.context.DatasetLifecycleManager;
import edu.uci.ics.asterix.common.exceptions.ACIDException;
import edu.uci.ics.asterix.common.exceptions.AsterixException;
-import edu.uci.ics.asterix.common.transactions.IAsterixAppRuntimeContextProvider;
import edu.uci.ics.asterix.common.transactions.ITransactionSubsystem;
import edu.uci.ics.asterix.transaction.management.resource.PersistentLocalResourceRepository;
import edu.uci.ics.asterix.transaction.management.resource.PersistentLocalResourceRepositoryFactory;
+import edu.uci.ics.asterix.transaction.management.service.transaction.AsterixRuntimeComponentsProvider;
import edu.uci.ics.asterix.transaction.management.service.transaction.TransactionSubsystem;
import edu.uci.ics.hyracks.api.application.INCApplicationContext;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.api.io.IIOManager;
import edu.uci.ics.hyracks.api.lifecycle.ILifeCycleComponent;
-import edu.uci.ics.hyracks.api.lifecycle.LifeCycleComponentManager;
+import edu.uci.ics.hyracks.api.lifecycle.ILifeCycleComponentManager;
import edu.uci.ics.hyracks.storage.am.common.api.IIndexLifecycleManager;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallbackProvider;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationScheduler;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMMergePolicy;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMOperationTracker;
@@ -72,6 +75,7 @@
private AsterixStorageProperties storageProperties;
private AsterixTransactionProperties txnProperties;
+ private AsterixThreadExecutor threadExecutor;
private DatasetLifecycleManager indexLifecycleManager;
private IFileMapManager fileMapManager;
private IBufferCache bufferCache;
@@ -98,6 +102,7 @@
Logger.getLogger("edu.uci.ics").setLevel(externalProperties.getLogLevel());
+ threadExecutor = new AsterixThreadExecutor(ncApplicationContext.getThreadFactory());
fileMapManager = new AsterixFileMapManager();
ICacheMemoryAllocator allocator = new HeapBufferAllocator();
IPageReplacementStrategy prs = new ClockPageReplacementStrategy();
@@ -116,19 +121,17 @@
.createRepository();
resourceIdFactory = (new ResourceIdFactoryProvider(localResourceRepository)).createResourceIdFactory();
indexLifecycleManager = new DatasetLifecycleManager(storageProperties, localResourceRepository);
- IAsterixAppRuntimeContextProvider asterixAppRuntimeContextProvider = new AsterixAppRuntimeContextProviderForRecovery(
- this);
- txnSubsystem = new TransactionSubsystem(ncApplicationContext.getNodeId(), asterixAppRuntimeContextProvider,
- txnProperties);
+ txnSubsystem = new TransactionSubsystem(ncApplicationContext.getNodeId(), this, txnProperties);
isShuttingdown = false;
// The order of registration is important. The buffer cache must registered before recovery and transaction managers.
- LifeCycleComponentManager.INSTANCE.register((ILifeCycleComponent) bufferCache);
- LifeCycleComponentManager.INSTANCE.register((ILifeCycleComponent) indexLifecycleManager);
- LifeCycleComponentManager.INSTANCE.register((ILifeCycleComponent) txnSubsystem.getTransactionManager());
- LifeCycleComponentManager.INSTANCE.register((ILifeCycleComponent) txnSubsystem.getLogManager());
- LifeCycleComponentManager.INSTANCE.register((ILifeCycleComponent) txnSubsystem.getLockManager());
- LifeCycleComponentManager.INSTANCE.register((ILifeCycleComponent) txnSubsystem.getRecoveryManager());
+ ILifeCycleComponentManager lccm = ncApplicationContext.getLifeCycleComponentManager();
+ lccm.register((ILifeCycleComponent) bufferCache);
+ lccm.register((ILifeCycleComponent) indexLifecycleManager);
+ lccm.register((ILifeCycleComponent) txnSubsystem.getTransactionManager());
+ lccm.register((ILifeCycleComponent) txnSubsystem.getLogManager());
+ lccm.register((ILifeCycleComponent) txnSubsystem.getLockManager());
+ lccm.register((ILifeCycleComponent) txnSubsystem.getRecoveryManager());
}
public boolean isShuttingdown() {
@@ -220,4 +223,28 @@
public ILSMOperationTracker getLSMBTreeOperationTracker(int datasetID) {
return indexLifecycleManager.getOperationTracker(datasetID);
}
+
+ @Override
+ public Executor getThreadExecutor() {
+ return threadExecutor;
+ }
+
+ @Override
+ public ILSMIOOperationCallbackProvider getLSMBTreeIOOperationCallbackProvider(boolean isPrimary) {
+ if (isPrimary) {
+ return AsterixRuntimeComponentsProvider.LSMBTREE_PRIMARY_PROVIDER;
+ } else {
+ return AsterixRuntimeComponentsProvider.LSMBTREE_SECONDARY_PROVIDER;
+ }
+ }
+
+ @Override
+ public ILSMIOOperationCallbackProvider getLSMRTreeIOOperationCallbackProvider() {
+ return AsterixRuntimeComponentsProvider.LSMRTREE_PROVIDER;
+ }
+
+ @Override
+ public ILSMIOOperationCallbackProvider getLSMInvertedIndexIOOperationCallbackProvider() {
+ return AsterixRuntimeComponentsProvider.LSMINVERTEDINDEX_PROVIDER;
+ }
}
\ No newline at end of file
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/api/common/AsterixAppRuntimeContextProviderForRecovery.java b/asterix-app/src/main/java/edu/uci/ics/asterix/api/common/AsterixAppRuntimeContextProviderForRecovery.java
deleted file mode 100644
index 712d993..0000000
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/api/common/AsterixAppRuntimeContextProviderForRecovery.java
+++ /dev/null
@@ -1,123 +0,0 @@
-/*
- * Copyright 2009-2013 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.asterix.api.common;
-
-import edu.uci.ics.asterix.common.transactions.IAsterixAppRuntimeContextProvider;
-import edu.uci.ics.asterix.common.transactions.ITransactionSubsystem;
-import edu.uci.ics.asterix.transaction.management.service.transaction.AsterixRuntimeComponentsProvider;
-import edu.uci.ics.hyracks.api.io.IIOManager;
-import edu.uci.ics.hyracks.storage.am.common.api.IIndexLifecycleManager;
-import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallbackProvider;
-import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationScheduler;
-import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMMergePolicy;
-import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMOperationTracker;
-import edu.uci.ics.hyracks.storage.am.lsm.common.api.IVirtualBufferCache;
-import edu.uci.ics.hyracks.storage.common.buffercache.IBufferCache;
-import edu.uci.ics.hyracks.storage.common.file.IFileMapProvider;
-import edu.uci.ics.hyracks.storage.common.file.ILocalResourceRepository;
-import edu.uci.ics.hyracks.storage.common.file.ResourceIdFactory;
-
-public class AsterixAppRuntimeContextProviderForRecovery implements IAsterixAppRuntimeContextProvider {
-
- private final AsterixAppRuntimeContext asterixAppRuntimeContext;
-
- public AsterixAppRuntimeContextProviderForRecovery(AsterixAppRuntimeContext asterixAppRuntimeContext) {
- this.asterixAppRuntimeContext = asterixAppRuntimeContext;
- }
-
- @Override
- public IBufferCache getBufferCache() {
- return asterixAppRuntimeContext.getBufferCache();
- }
-
- @Override
- public IFileMapProvider getFileMapManager() {
- return asterixAppRuntimeContext.getFileMapManager();
- }
-
- @Override
- public ITransactionSubsystem getTransactionSubsystem() {
- return asterixAppRuntimeContext.getTransactionSubsystem();
- }
-
- @Override
- public IIndexLifecycleManager getIndexLifecycleManager() {
- return asterixAppRuntimeContext.getIndexLifecycleManager();
- }
-
- @Override
- public double getBloomFilterFalsePositiveRate() {
- return asterixAppRuntimeContext.getBloomFilterFalsePositiveRate();
- }
-
- @Override
- public ILSMMergePolicy getLSMMergePolicy() {
- return asterixAppRuntimeContext.getLSMMergePolicy();
- }
-
- @Override
- public ILSMIOOperationScheduler getLSMIOScheduler() {
- return asterixAppRuntimeContext.getLSMIOScheduler();
- }
-
- @Override
- public ILocalResourceRepository getLocalResourceRepository() {
- return asterixAppRuntimeContext.getLocalResourceRepository();
- }
-
- @Override
- public ResourceIdFactory getResourceIdFactory() {
- return asterixAppRuntimeContext.getResourceIdFactory();
- }
-
- @Override
- public IIOManager getIOManager() {
- return asterixAppRuntimeContext.getIOManager();
- }
-
- @Override
- public IVirtualBufferCache getVirtualBufferCache(int datasetID) {
- return asterixAppRuntimeContext.getVirtualBufferCache(datasetID);
- }
-
- @Override
- public ILSMIOOperationCallbackProvider getNoOpIOOperationCallbackProvider() {
- return AsterixRuntimeComponentsProvider.NOINDEX_PROVIDER;
- }
-
- @Override
- public ILSMIOOperationCallbackProvider getLSMBTreeIOOperationCallbackProvider(boolean isPrimary) {
- if (isPrimary) {
- return AsterixRuntimeComponentsProvider.LSMBTREE_PRIMARY_PROVIDER;
- } else {
- return AsterixRuntimeComponentsProvider.LSMBTREE_SECONDARY_PROVIDER;
- }
- }
-
- @Override
- public ILSMIOOperationCallbackProvider getLSMRTreeIOOperationCallbackProvider() {
- return AsterixRuntimeComponentsProvider.LSMRTREE_PROVIDER;
- }
-
- @Override
- public ILSMIOOperationCallbackProvider getLSMInvertedIndexIOOperationCallbackProvider() {
- return AsterixRuntimeComponentsProvider.LSMINVERTEDINDEX_PROVIDER;
- }
-
- @Override
- public ILSMOperationTracker getLSMBTreeOperationTracker(int datasetID) {
- return asterixAppRuntimeContext.getLSMBTreeOperationTracker(datasetID);
- }
-}
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/CCApplicationEntryPoint.java b/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/CCApplicationEntryPoint.java
index dc201dd..e48c3e1 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/CCApplicationEntryPoint.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/CCApplicationEntryPoint.java
@@ -38,6 +38,7 @@
import edu.uci.ics.hyracks.api.application.ICCApplicationEntryPoint;
import edu.uci.ics.hyracks.api.client.HyracksConnection;
import edu.uci.ics.hyracks.api.client.IHyracksClientConnection;
+import edu.uci.ics.hyracks.api.lifecycle.LifeCycleComponentManager;
public class CCApplicationEntryPoint implements ICCApplicationEntryPoint {
private static final Logger LOGGER = Logger.getLogger(CCApplicationEntryPoint.class.getName());
@@ -56,7 +57,7 @@
if (LOGGER.isLoggable(Level.INFO)) {
LOGGER.info("Starting Asterix cluster controller");
}
- appCtx.setThreadFactory(AsterixThreadFactory.INSTANCE);
+ appCtx.setThreadFactory(new AsterixThreadFactory(new LifeCycleComponentManager()));
AsterixAppContextInfo.initialize(appCtx);
proxy = AsterixStateProxy.registerRemoteObject();
appCtx.setDistributedState(proxy);
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java b/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java
index 96d0617..7725111 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java
@@ -36,6 +36,7 @@
import edu.uci.ics.asterix.transaction.management.resource.PersistentLocalResourceRepository;
import edu.uci.ics.hyracks.api.application.INCApplicationContext;
import edu.uci.ics.hyracks.api.application.INCApplicationEntryPoint;
+import edu.uci.ics.hyracks.api.lifecycle.ILifeCycleComponentManager;
import edu.uci.ics.hyracks.api.lifecycle.LifeCycleComponentManager;
public class NCApplicationEntryPoint implements INCApplicationEntryPoint {
@@ -50,7 +51,7 @@
@Override
public void start(INCApplicationContext ncAppCtx, String[] args) throws Exception {
- ncAppCtx.setThreadFactory(AsterixThreadFactory.INSTANCE);
+ ncAppCtx.setThreadFactory(new AsterixThreadFactory(ncAppCtx.getLifeCycleComponentManager()));
ncApplicationContext = ncAppCtx;
nodeId = ncApplicationContext.getNodeId();
if (LOGGER.isLoggable(Level.INFO)) {
@@ -59,7 +60,6 @@
JVMShutdownHook sHook = new JVMShutdownHook(this);
Runtime.getRuntime().addShutdownHook(sHook);
-
runtimeContext = new AsterixAppRuntimeContext(ncApplicationContext);
runtimeContext.initialize();
ncApplicationContext.setApplicationObject(runtimeContext);
@@ -99,7 +99,7 @@
MetadataBootstrap.stopUniverse();
}
- LifeCycleComponentManager.INSTANCE.stopAll(false);
+ ncApplicationContext.getLifeCycleComponentManager().stopAll(false);
runtimeContext.deinitialize();
} else {
if (LOGGER.isLoggable(Level.INFO)) {
@@ -144,7 +144,7 @@
if (LOGGER.isLoggable(Level.INFO)) {
LOGGER.info("Starting lifecycle components");
}
-
+
Map<String, String> lifecycleMgmtConfiguration = new HashMap<String, String>();
String key = LifeCycleComponentManager.Config.DUMP_PATH_KEY;
String value = metadataProperties.getCoredumpPath(nodeId);
@@ -152,12 +152,13 @@
if (LOGGER.isLoggable(Level.INFO)) {
LOGGER.info("Coredump directory for NC is: " + value);
}
- LifeCycleComponentManager.INSTANCE.configure(lifecycleMgmtConfiguration);
+ ILifeCycleComponentManager lccm = ncApplicationContext.getLifeCycleComponentManager();
+ lccm.configure(lifecycleMgmtConfiguration);
if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("Configured:" + LifeCycleComponentManager.INSTANCE);
+ LOGGER.info("Configured:" + lccm);
}
-
- LifeCycleComponentManager.INSTANCE.startAll();
+
+ lccm.startAll();
IRecoveryManager recoveryMgr = runtimeContext.getTransactionSubsystem().getRecoveryManager();
recoveryMgr.checkpoint(true);
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/api/AsterixThreadExecutor.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/api/AsterixThreadExecutor.java
index 14975ff..990278c 100644
--- a/asterix-common/src/main/java/edu/uci/ics/asterix/common/api/AsterixThreadExecutor.java
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/api/AsterixThreadExecutor.java
@@ -16,13 +16,13 @@
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
public class AsterixThreadExecutor implements Executor {
- public final static AsterixThreadExecutor INSTANCE = new AsterixThreadExecutor();
- private final Executor executor = Executors.newCachedThreadPool(AsterixThreadFactory.INSTANCE);
+ private final Executor executor;
- private AsterixThreadExecutor() {
-
+ public AsterixThreadExecutor(ThreadFactory threadFactory) {
+ executor = Executors.newCachedThreadPool(threadFactory);
}
@Override
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/api/AsterixThreadFactory.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/api/AsterixThreadFactory.java
index e14e549..d86a4b3 100644
--- a/asterix-common/src/main/java/edu/uci/ics/asterix/common/api/AsterixThreadFactory.java
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/api/AsterixThreadFactory.java
@@ -16,14 +16,14 @@
import java.util.concurrent.ThreadFactory;
-import edu.uci.ics.hyracks.api.lifecycle.LifeCycleComponentManager;
+import edu.uci.ics.hyracks.api.lifecycle.ILifeCycleComponentManager;
public class AsterixThreadFactory implements ThreadFactory {
- public final static AsterixThreadFactory INSTANCE = new AsterixThreadFactory();
+ private final ILifeCycleComponentManager lccm;
- private AsterixThreadFactory() {
-
+ public AsterixThreadFactory(ILifeCycleComponentManager lifeCycleComponentManager) {
+ this.lccm = lifeCycleComponentManager;
}
@Override
@@ -34,7 +34,7 @@
} else {
t = new Thread(r);
}
- t.setUncaughtExceptionHandler(LifeCycleComponentManager.INSTANCE);
+ t.setUncaughtExceptionHandler(lccm);
return t;
}
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/api/IAsterixAppRuntimeContext.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/api/IAsterixAppRuntimeContext.java
index d035303..cfc5221 100644
--- a/asterix-common/src/main/java/edu/uci/ics/asterix/common/api/IAsterixAppRuntimeContext.java
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/api/IAsterixAppRuntimeContext.java
@@ -15,12 +15,15 @@
package edu.uci.ics.asterix.common.api;
import java.io.IOException;
+import java.util.concurrent.Executor;
import edu.uci.ics.asterix.common.exceptions.ACIDException;
import edu.uci.ics.asterix.common.exceptions.AsterixException;
import edu.uci.ics.asterix.common.transactions.ITransactionSubsystem;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.io.IIOManager;
import edu.uci.ics.hyracks.storage.am.common.api.IIndexLifecycleManager;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallbackProvider;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationScheduler;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMMergePolicy;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMOperationTracker;
@@ -32,6 +35,10 @@
public interface IAsterixAppRuntimeContext {
+ public IIOManager getIOManager();
+
+ public Executor getThreadExecutor();
+
public ITransactionSubsystem getTransactionSubsystem();
public boolean isShuttingdown();
@@ -63,4 +70,10 @@
public double getBloomFilterFalsePositiveRate();
public IVirtualBufferCache getVirtualBufferCache(int datasetID);
+
+ public ILSMIOOperationCallbackProvider getLSMBTreeIOOperationCallbackProvider(boolean isPrimary);
+
+ public ILSMIOOperationCallbackProvider getLSMInvertedIndexIOOperationCallbackProvider();
+
+ public ILSMIOOperationCallbackProvider getLSMRTreeIOOperationCallbackProvider();
}
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/api/ILocalResourceMetadata.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/api/ILocalResourceMetadata.java
index 90c9bad..64580a1 100644
--- a/asterix-common/src/main/java/edu/uci/ics/asterix/common/api/ILocalResourceMetadata.java
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/api/ILocalResourceMetadata.java
@@ -16,14 +16,13 @@
import java.io.Serializable;
-import edu.uci.ics.asterix.common.transactions.IAsterixAppRuntimeContextProvider;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndex;
public interface ILocalResourceMetadata extends Serializable {
- public ILSMIndex createIndexInstance(IAsterixAppRuntimeContextProvider runtimeContextProvider, String filePath,
- int partition) throws HyracksDataException;
+ public ILSMIndex createIndexInstance(IAsterixAppRuntimeContext runtimeContext, String filePath, int partition)
+ throws HyracksDataException;
public int getDatasetID();
}
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/IAsterixAppRuntimeContextProvider.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/IAsterixAppRuntimeContextProvider.java
deleted file mode 100644
index 05ac025..0000000
--- a/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/IAsterixAppRuntimeContextProvider.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Copyright 2009-2013 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.asterix.common.transactions;
-
-import edu.uci.ics.hyracks.api.io.IIOManager;
-import edu.uci.ics.hyracks.storage.am.common.api.IIndexLifecycleManager;
-import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallbackProvider;
-import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationScheduler;
-import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMMergePolicy;
-import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMOperationTracker;
-import edu.uci.ics.hyracks.storage.am.lsm.common.api.IVirtualBufferCache;
-import edu.uci.ics.hyracks.storage.common.buffercache.IBufferCache;
-import edu.uci.ics.hyracks.storage.common.file.IFileMapProvider;
-import edu.uci.ics.hyracks.storage.common.file.ILocalResourceRepository;
-import edu.uci.ics.hyracks.storage.common.file.ResourceIdFactory;
-
-public interface IAsterixAppRuntimeContextProvider {
-
- public IBufferCache getBufferCache();
-
- public IFileMapProvider getFileMapManager();
-
- public ITransactionSubsystem getTransactionSubsystem();
-
- public IIndexLifecycleManager getIndexLifecycleManager();
-
- public double getBloomFilterFalsePositiveRate();
-
- public ILSMMergePolicy getLSMMergePolicy();
-
- public ILSMOperationTracker getLSMBTreeOperationTracker(int datasetID);
-
- public ILSMIOOperationCallbackProvider getLSMBTreeIOOperationCallbackProvider(boolean isPrimary);
-
- public ILSMIOOperationCallbackProvider getLSMRTreeIOOperationCallbackProvider();
-
- public ILSMIOOperationCallbackProvider getLSMInvertedIndexIOOperationCallbackProvider();
-
- public ILSMIOOperationCallbackProvider getNoOpIOOperationCallbackProvider();
-
- public ILSMIOOperationScheduler getLSMIOScheduler();
-
- public ILocalResourceRepository getLocalResourceRepository();
-
- public ResourceIdFactory getResourceIdFactory();
-
- public IIOManager getIOManager();
-
- public IVirtualBufferCache getVirtualBufferCache(int datasetID);
-}
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/ITransactionSubsystem.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/ITransactionSubsystem.java
index 63ee5d0..1d06d7c 100644
--- a/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/ITransactionSubsystem.java
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/ITransactionSubsystem.java
@@ -14,6 +14,7 @@
*/
package edu.uci.ics.asterix.common.transactions;
+import edu.uci.ics.asterix.common.api.IAsterixAppRuntimeContext;
public interface ITransactionSubsystem {
@@ -29,7 +30,7 @@
public ILoggerRepository getTreeLoggerRepository();
- public IAsterixAppRuntimeContextProvider getAsterixAppRuntimeContextProvider();
+ public IAsterixAppRuntimeContext getAsterixAppRuntimeContext();
public String getId();
}
diff --git a/asterix-external-data/pom.xml b/asterix-external-data/pom.xml
index f8d5ea2..7366c54 100644
--- a/asterix-external-data/pom.xml
+++ b/asterix-external-data/pom.xml
@@ -153,6 +153,11 @@
<artifactId>jdom</artifactId>
<version>1.0</version>
</dependency>
+ <dependency>
+ <groupId>edu.uci.ics.asterix</groupId>
+ <artifactId>asterix-common</artifactId>
+ <version>0.8.1-SNAPSHOT</version>
+ </dependency>
</dependencies>
</project>
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/data/operator/FeedIntakeOperatorDescriptor.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/data/operator/FeedIntakeOperatorDescriptor.java
index a48cfb8..64f947c 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/data/operator/FeedIntakeOperatorDescriptor.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/data/operator/FeedIntakeOperatorDescriptor.java
@@ -75,6 +75,6 @@
} catch (Exception e) {
throw new HyracksDataException("initialization of adapter failed", e);
}
- return new FeedIntakeOperatorNodePushable(feedId, adapter, partition);
+ return new FeedIntakeOperatorNodePushable(ctx, feedId, adapter, partition);
}
}
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/data/operator/FeedIntakeOperatorNodePushable.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/data/operator/FeedIntakeOperatorNodePushable.java
index 31470f3..699f3d6 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/data/operator/FeedIntakeOperatorNodePushable.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/data/operator/FeedIntakeOperatorNodePushable.java
@@ -17,7 +17,7 @@
import java.nio.ByteBuffer;
import java.util.concurrent.LinkedBlockingQueue;
-import edu.uci.ics.asterix.common.api.AsterixThreadExecutor;
+import edu.uci.ics.asterix.common.api.IAsterixAppRuntimeContext;
import edu.uci.ics.asterix.external.dataset.adapter.IDatasourceAdapter;
import edu.uci.ics.asterix.external.feed.lifecycle.AlterFeedMessage;
import edu.uci.ics.asterix.external.feed.lifecycle.FeedId;
@@ -25,6 +25,7 @@
import edu.uci.ics.asterix.external.feed.lifecycle.IFeedManager;
import edu.uci.ics.asterix.external.feed.lifecycle.IFeedMessage;
import edu.uci.ics.asterix.feed.managed.adapter.IManagedFeedAdapter;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputUnaryOutputOperatorNodePushable;
@@ -33,6 +34,7 @@
*/
public class FeedIntakeOperatorNodePushable extends AbstractUnaryInputUnaryOutputOperatorNodePushable {
+ private final IHyracksTaskContext ctx;
private final IDatasourceAdapter adapter;
private final int partition;
private final IFeedManager feedManager;
@@ -40,7 +42,9 @@
private final LinkedBlockingQueue<IFeedMessage> inbox;
private FeedInboxMonitor feedInboxMonitor;
- public FeedIntakeOperatorNodePushable(FeedId feedId, IDatasourceAdapter adapter, int partition) {
+ public FeedIntakeOperatorNodePushable(IHyracksTaskContext ctx, FeedId feedId, IDatasourceAdapter adapter,
+ int partition) {
+ this.ctx = ctx;
this.adapter = adapter;
this.partition = partition;
this.feedManager = (IFeedManager) FeedManager.INSTANCE;
@@ -52,7 +56,8 @@
public void open() throws HyracksDataException {
if (adapter instanceof IManagedFeedAdapter) {
feedInboxMonitor = new FeedInboxMonitor((IManagedFeedAdapter) adapter, inbox, partition);
- AsterixThreadExecutor.INSTANCE.execute(feedInboxMonitor);
+ ((IAsterixAppRuntimeContext) ctx.getJobletContext().getApplicationContext().getApplicationObject())
+ .getThreadExecutor().execute(feedInboxMonitor);
feedManager.registerFeedMsgQueue(feedId, inbox);
}
writer.open();
@@ -82,7 +87,7 @@
@Override
public void close() throws HyracksDataException {
-
+
}
@Override
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallbackFactory.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallbackFactory.java
index 428a04e..b54f198 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallbackFactory.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallbackFactory.java
@@ -50,7 +50,7 @@
IHyracksTaskContext ctx) throws HyracksDataException {
ITransactionSubsystem txnSubsystem = txnSubsystemProvider.getTransactionSubsystem(ctx);
- IIndexLifecycleManager indexLifeCycleManager = txnSubsystem.getAsterixAppRuntimeContextProvider()
+ IIndexLifecycleManager indexLifeCycleManager = txnSubsystem.getAsterixAppRuntimeContext()
.getIndexLifecycleManager();
ILSMIndex index = (ILSMIndex) indexLifeCycleManager.getIndex(resourceId);
if (index == null) {
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/SecondaryIndexModificationOperationCallbackFactory.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/SecondaryIndexModificationOperationCallbackFactory.java
index 55ca399..82981b0 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/SecondaryIndexModificationOperationCallbackFactory.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/SecondaryIndexModificationOperationCallbackFactory.java
@@ -46,7 +46,7 @@
IHyracksTaskContext ctx) throws HyracksDataException {
ITransactionSubsystem txnSubsystem = txnSubsystemProvider.getTransactionSubsystem(ctx);
- IIndexLifecycleManager indexLifeCycleManager = txnSubsystem.getAsterixAppRuntimeContextProvider()
+ IIndexLifecycleManager indexLifeCycleManager = txnSubsystem.getAsterixAppRuntimeContext()
.getIndexLifecycleManager();
ILSMIndex index = (ILSMIndex) indexLifeCycleManager.getIndex(resourceId);
if (index == null) {
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/resource/LSMBTreeLocalResourceMetadata.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/resource/LSMBTreeLocalResourceMetadata.java
index 8ce0174..75a50f2 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/resource/LSMBTreeLocalResourceMetadata.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/resource/LSMBTreeLocalResourceMetadata.java
@@ -16,9 +16,9 @@
import java.io.File;
+import edu.uci.ics.asterix.common.api.IAsterixAppRuntimeContext;
import edu.uci.ics.asterix.common.context.BaseOperationTracker;
import edu.uci.ics.asterix.common.ioopcallbacks.LSMBTreeIOOperationCallbackFactory;
-import edu.uci.ics.asterix.common.transactions.IAsterixAppRuntimeContextProvider;
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
import edu.uci.ics.hyracks.api.dataflow.value.ITypeTraits;
import edu.uci.ics.hyracks.api.io.FileReference;
@@ -46,16 +46,15 @@
}
@Override
- public ILSMIndex createIndexInstance(IAsterixAppRuntimeContextProvider runtimeContextProvider, String filePath,
- int partition) {
+ public ILSMIndex createIndexInstance(IAsterixAppRuntimeContext runtimeContext, String filePath, int partition) {
FileReference file = new FileReference(new File(filePath));
- IVirtualBufferCache virtualBufferCache = runtimeContextProvider.getVirtualBufferCache(datasetID);
- LSMBTree lsmBTree = LSMBTreeUtils.createLSMTree(virtualBufferCache, file, runtimeContextProvider
- .getBufferCache(), runtimeContextProvider.getFileMapManager(), typeTraits, cmpFactories,
- bloomFilterKeyFields, runtimeContextProvider.getBloomFilterFalsePositiveRate(), runtimeContextProvider
- .getLSMMergePolicy(), isPrimary ? runtimeContextProvider.getLSMBTreeOperationTracker(datasetID)
- : new BaseOperationTracker(LSMBTreeIOOperationCallbackFactory.INSTANCE), runtimeContextProvider
- .getLSMIOScheduler(), runtimeContextProvider.getLSMBTreeIOOperationCallbackProvider(isPrimary));
+ IVirtualBufferCache virtualBufferCache = runtimeContext.getVirtualBufferCache(datasetID);
+ LSMBTree lsmBTree = LSMBTreeUtils.createLSMTree(virtualBufferCache, file, runtimeContext.getBufferCache(),
+ runtimeContext.getFileMapManager(), typeTraits, cmpFactories, bloomFilterKeyFields, runtimeContext
+ .getBloomFilterFalsePositiveRate(), runtimeContext.getLSMMergePolicy(),
+ isPrimary ? runtimeContext.getLSMBTreeOperationTracker(datasetID) : new BaseOperationTracker(
+ LSMBTreeIOOperationCallbackFactory.INSTANCE), runtimeContext.getLSMIOScheduler(),
+ runtimeContext.getLSMBTreeIOOperationCallbackProvider(isPrimary));
return lsmBTree;
}
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/resource/LSMInvertedIndexLocalResourceMetadata.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/resource/LSMInvertedIndexLocalResourceMetadata.java
index b3da3ee..a22fe27 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/resource/LSMInvertedIndexLocalResourceMetadata.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/resource/LSMInvertedIndexLocalResourceMetadata.java
@@ -14,9 +14,9 @@
*/
package edu.uci.ics.asterix.transaction.management.resource;
+import edu.uci.ics.asterix.common.api.IAsterixAppRuntimeContext;
import edu.uci.ics.asterix.common.context.BaseOperationTracker;
import edu.uci.ics.asterix.common.ioopcallbacks.LSMInvertedIndexIOOperationCallbackFactory;
-import edu.uci.ics.asterix.common.transactions.IAsterixAppRuntimeContextProvider;
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
import edu.uci.ics.hyracks.api.dataflow.value.ITypeTraits;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
@@ -51,28 +51,26 @@
}
@Override
- public ILSMIndex createIndexInstance(IAsterixAppRuntimeContextProvider runtimeContextProvider, String filePath,
- int partition) throws HyracksDataException {
- IVirtualBufferCache virtualBufferCache = runtimeContextProvider.getVirtualBufferCache(datasetID);
+ public ILSMIndex createIndexInstance(IAsterixAppRuntimeContext runtimeContext, String filePath, int partition)
+ throws HyracksDataException {
+ IVirtualBufferCache virtualBufferCache = runtimeContext.getVirtualBufferCache(datasetID);
try {
if (isPartitioned) {
- return InvertedIndexUtils.createPartitionedLSMInvertedIndex(virtualBufferCache, runtimeContextProvider
- .getFileMapManager(), invListTypeTraits, invListCmpFactories, tokenTypeTraits,
- tokenCmpFactories, tokenizerFactory, runtimeContextProvider.getBufferCache(), filePath,
- runtimeContextProvider.getBloomFilterFalsePositiveRate(), runtimeContextProvider
- .getLSMMergePolicy(), new BaseOperationTracker(
- LSMInvertedIndexIOOperationCallbackFactory.INSTANCE), runtimeContextProvider
- .getLSMIOScheduler(), runtimeContextProvider
- .getLSMInvertedIndexIOOperationCallbackProvider());
+ return InvertedIndexUtils.createPartitionedLSMInvertedIndex(virtualBufferCache,
+ runtimeContext.getFileMapManager(), invListTypeTraits, invListCmpFactories, tokenTypeTraits,
+ tokenCmpFactories, tokenizerFactory, runtimeContext.getBufferCache(), filePath,
+ runtimeContext.getBloomFilterFalsePositiveRate(), runtimeContext.getLSMMergePolicy(),
+ new BaseOperationTracker(LSMInvertedIndexIOOperationCallbackFactory.INSTANCE),
+ runtimeContext.getLSMIOScheduler(),
+ runtimeContext.getLSMInvertedIndexIOOperationCallbackProvider());
} else {
- return InvertedIndexUtils.createLSMInvertedIndex(virtualBufferCache, runtimeContextProvider
- .getFileMapManager(), invListTypeTraits, invListCmpFactories, tokenTypeTraits,
- tokenCmpFactories, tokenizerFactory, runtimeContextProvider.getBufferCache(), filePath,
- runtimeContextProvider.getBloomFilterFalsePositiveRate(), runtimeContextProvider
- .getLSMMergePolicy(), new BaseOperationTracker(
- LSMInvertedIndexIOOperationCallbackFactory.INSTANCE), runtimeContextProvider
- .getLSMIOScheduler(), runtimeContextProvider
- .getLSMInvertedIndexIOOperationCallbackProvider());
+ return InvertedIndexUtils.createLSMInvertedIndex(virtualBufferCache,
+ runtimeContext.getFileMapManager(), invListTypeTraits, invListCmpFactories, tokenTypeTraits,
+ tokenCmpFactories, tokenizerFactory, runtimeContext.getBufferCache(), filePath,
+ runtimeContext.getBloomFilterFalsePositiveRate(), runtimeContext.getLSMMergePolicy(),
+ new BaseOperationTracker(LSMInvertedIndexIOOperationCallbackFactory.INSTANCE),
+ runtimeContext.getLSMIOScheduler(),
+ runtimeContext.getLSMInvertedIndexIOOperationCallbackProvider());
}
} catch (IndexException e) {
throw new HyracksDataException(e);
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/resource/LSMRTreeLocalResourceMetadata.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/resource/LSMRTreeLocalResourceMetadata.java
index 9b9faef..49348018 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/resource/LSMRTreeLocalResourceMetadata.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/resource/LSMRTreeLocalResourceMetadata.java
@@ -16,9 +16,9 @@
import java.io.File;
+import edu.uci.ics.asterix.common.api.IAsterixAppRuntimeContext;
import edu.uci.ics.asterix.common.context.BaseOperationTracker;
import edu.uci.ics.asterix.common.ioopcallbacks.LSMRTreeIOOperationCallbackFactory;
-import edu.uci.ics.asterix.common.transactions.IAsterixAppRuntimeContextProvider;
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
import edu.uci.ics.hyracks.api.dataflow.value.ILinearizeComparatorFactory;
import edu.uci.ics.hyracks.api.dataflow.value.ITypeTraits;
@@ -55,17 +55,17 @@
}
@Override
- public ILSMIndex createIndexInstance(IAsterixAppRuntimeContextProvider runtimeContextProvider, String filePath,
- int partition) throws HyracksDataException {
+ public ILSMIndex createIndexInstance(IAsterixAppRuntimeContext runtimeContext, String filePath, int partition)
+ throws HyracksDataException {
FileReference file = new FileReference(new File(filePath));
- IVirtualBufferCache virtualBufferCache = runtimeContextProvider.getVirtualBufferCache(datasetID);
+ IVirtualBufferCache virtualBufferCache = runtimeContext.getVirtualBufferCache(datasetID);
try {
- return LSMRTreeUtils.createLSMTree(virtualBufferCache, file, runtimeContextProvider.getBufferCache(),
- runtimeContextProvider.getFileMapManager(), typeTraits, rtreeCmpFactories, btreeCmpFactories,
- valueProviderFactories, rtreePolicyType, runtimeContextProvider.getBloomFilterFalsePositiveRate(),
- runtimeContextProvider.getLSMMergePolicy(), new BaseOperationTracker(
- LSMRTreeIOOperationCallbackFactory.INSTANCE), runtimeContextProvider.getLSMIOScheduler(),
- runtimeContextProvider.getLSMRTreeIOOperationCallbackProvider(), linearizeCmpFactory);
+ return LSMRTreeUtils.createLSMTree(virtualBufferCache, file, runtimeContext.getBufferCache(),
+ runtimeContext.getFileMapManager(), typeTraits, rtreeCmpFactories, btreeCmpFactories,
+ valueProviderFactories, rtreePolicyType, runtimeContext.getBloomFilterFalsePositiveRate(),
+ runtimeContext.getLSMMergePolicy(), new BaseOperationTracker(
+ LSMRTreeIOOperationCallbackFactory.INSTANCE), runtimeContext.getLSMIOScheduler(),
+ runtimeContext.getLSMRTreeIOOperationCallbackProvider(), linearizeCmpFactory);
} catch (TreeIndexException e) {
throw new HyracksDataException(e);
}
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/LockManager.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/LockManager.java
index b09cd52..9e43d10 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/LockManager.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/LockManager.java
@@ -94,12 +94,13 @@
this.waiterLatch = new ReentrantReadWriteLock(true);
this.jobHT = new HashMap<JobId, JobInfo>();
this.datasetResourceHT = new HashMap<DatasetId, DatasetLockInfo>();
- this.entityInfoManager = new EntityInfoManager(txnSubsystem.getTransactionProperties().getLockManagerShrinkTimer());
+ this.entityInfoManager = new EntityInfoManager(txnSubsystem.getTransactionProperties()
+ .getLockManagerShrinkTimer());
this.lockWaiterManager = new LockWaiterManager();
this.entityLockInfoManager = new EntityLockInfoManager(entityInfoManager, lockWaiterManager);
this.deadlockDetector = new DeadlockDetector(jobHT, datasetResourceHT, entityLockInfoManager,
entityInfoManager, lockWaiterManager);
- this.toutDetector = new TimeOutDetector(this);
+ this.toutDetector = new TimeOutDetector(this, txnSubsystem.getAsterixAppRuntimeContext().getThreadExecutor());
this.tempDatasetIdObj = new DatasetId(0);
this.consecutiveWakeupContext = new ConsecutiveWakeupContext();
@@ -109,7 +110,7 @@
this.lockRequestTracker = new LockRequestTracker();
}
}
-
+
public AsterixTransactionProperties getTransactionProperties() {
return this.txnSubsystem.getTransactionProperties();
}
@@ -200,7 +201,8 @@
if (doEscalate) {
throw new IllegalStateException(
"ESCALATE_TRHESHOLD_ENTITY_TO_DATASET should not be set to "
- + txnSubsystem.getTransactionProperties().getEntityToDatasetLockEscalationThreshold());
+ + txnSubsystem.getTransactionProperties()
+ .getEntityToDatasetLockEscalationThreshold());
}
}
}
@@ -782,7 +784,8 @@
if (ALLOW_ESCALATE_FROM_ENTITY_TO_DATASET) {
if (!isInstant && datasetLockMode == LockMode.IS) {
- jobInfo.decreaseDatasetISLockCount(datasetId.getId(), txnSubsystem.getTransactionProperties().getEntityToDatasetLockEscalationThreshold());
+ jobInfo.decreaseDatasetISLockCount(datasetId.getId(), txnSubsystem.getTransactionProperties()
+ .getEntityToDatasetLockEscalationThreshold());
}
}
@@ -1297,7 +1300,8 @@
//We don't want to allow the lock escalation when there is a first lock request on a dataset.
throw new IllegalStateException(
"ESCALATE_TRHESHOLD_ENTITY_TO_DATASET should not be set to "
- + txnSubsystem.getTransactionProperties().getEntityToDatasetLockEscalationThreshold());
+ + txnSubsystem.getTransactionProperties()
+ .getEntityToDatasetLockEscalationThreshold());
}
}
}
@@ -2097,7 +2101,8 @@
try {
StringBuilder sb = new StringBuilder();
sb.append("\n>>dump_begin\t>>----- [ConfVars] -----");
- sb.append("\nESCALATE_TRHESHOLD_ENTITY_TO_DATASET: " + txnSubsystem.getTransactionProperties().getEntityToDatasetLockEscalationThreshold());
+ sb.append("\nESCALATE_TRHESHOLD_ENTITY_TO_DATASET: "
+ + txnSubsystem.getTransactionProperties().getEntityToDatasetLockEscalationThreshold());
sb.append("\nSHRINK_TIMER_THRESHOLD (entityLockInfoManager): "
+ entityLockInfoManager.getShrinkTimerThreshold());
sb.append("\nSHRINK_TIMER_THRESHOLD (entityInfoManager): " + entityInfoManager.getShrinkTimerThreshold());
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/TimeOutDetector.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/TimeOutDetector.java
index d9c6097..cab0f7c 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/TimeOutDetector.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/TimeOutDetector.java
@@ -15,8 +15,8 @@
package edu.uci.ics.asterix.transaction.management.service.locking;
import java.util.LinkedList;
+import java.util.concurrent.Executor;
-import edu.uci.ics.asterix.common.api.AsterixThreadExecutor;
import edu.uci.ics.asterix.common.exceptions.ACIDException;
/**
@@ -36,14 +36,14 @@
int timeoutThreshold;
int sweepThreshold;
- public TimeOutDetector(LockManager lockMgr) {
+ public TimeOutDetector(LockManager lockMgr, Executor threadExecutor) {
this.victimList = new LinkedList<LockWaiter>();
this.lockMgr = lockMgr;
this.trigger = new Thread(new TimeoutTrigger(this));
this.timeoutThreshold = lockMgr.getTransactionProperties().getTimeoutWaitThreshold();
this.sweepThreshold = lockMgr.getTransactionProperties().getTimeoutSweepThreshold();
trigger.setDaemon(true);
- AsterixThreadExecutor.INSTANCE.execute(trigger);
+ threadExecutor.execute(trigger);
}
public void sweep() throws ACIDException {
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/IndexLoggerRepository.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/IndexLoggerRepository.java
index b95b943..1d08fb9 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/IndexLoggerRepository.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/IndexLoggerRepository.java
@@ -44,7 +44,7 @@
MutableResourceId newMutableResourceId = new MutableResourceId(resourceId);
IIndex index;
try {
- index = (IIndex) txnSubsystem.getAsterixAppRuntimeContextProvider().getIndexLifecycleManager()
+ index = (IIndex) txnSubsystem.getAsterixAppRuntimeContext().getIndexLifecycleManager()
.getIndex(resourceId);
} catch (HyracksDataException e) {
throw new ACIDException(e);
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/IndexResourceManager.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/IndexResourceManager.java
index 16ffa69..522526f 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/IndexResourceManager.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/IndexResourceManager.java
@@ -48,8 +48,7 @@
IIndex index;
try {
- index = (IIndex) txnSubsystem.getAsterixAppRuntimeContextProvider().getIndexLifecycleManager()
- .getIndex(resourceId);
+ index = (IIndex) txnSubsystem.getAsterixAppRuntimeContext().getIndexLifecycleManager().getIndex(resourceId);
} catch (HyracksDataException e1) {
throw new ACIDException("Cannot undo: unable to find index");
}
@@ -119,8 +118,7 @@
IIndex index;
try {
- index = (IIndex) txnSubsystem.getAsterixAppRuntimeContextProvider().getIndexLifecycleManager()
- .getIndex(resourceId);
+ index = (IIndex) txnSubsystem.getAsterixAppRuntimeContext().getIndexLifecycleManager().getIndex(resourceId);
} catch (HyracksDataException e1) {
throw new ACIDException("Cannot redo: unable to find index");
}
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogManager.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogManager.java
index f037521..99f035e 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogManager.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogManager.java
@@ -31,7 +31,6 @@
import java.util.logging.Level;
import java.util.logging.Logger;
-import edu.uci.ics.asterix.common.api.AsterixThreadExecutor;
import edu.uci.ics.asterix.common.exceptions.ACIDException;
import edu.uci.ics.asterix.common.transactions.FileBasedBuffer;
import edu.uci.ics.asterix.common.transactions.FileUtil;
@@ -55,7 +54,7 @@
public static final boolean IS_DEBUG_MODE = false;//true
private static final Logger LOGGER = Logger.getLogger(LogManager.class.getName());
- private final TransactionSubsystem provider;
+ private final TransactionSubsystem txnSubsystem;
private LogManagerProperties logManagerProperties;
private LogPageFlushThread logPageFlusher;
private final int logPageSize;
@@ -110,8 +109,9 @@
}
public LogManager(TransactionSubsystem provider) throws ACIDException {
- this.provider = provider;
- logManagerProperties = new LogManagerProperties(this.provider.getTransactionProperties(), this.provider.getId());
+ this.txnSubsystem = provider;
+ logManagerProperties = new LogManagerProperties(this.txnSubsystem.getTransactionProperties(),
+ this.txnSubsystem.getId());
logPageSize = logManagerProperties.getLogPageSize();
initLogManager();
statLogSize = 0;
@@ -119,7 +119,7 @@
}
public LogManager(TransactionSubsystem provider, String nodeId) throws ACIDException {
- this.provider = provider;
+ this.txnSubsystem = provider;
logManagerProperties = new LogManagerProperties(provider.getTransactionProperties(), nodeId);
logPageSize = logManagerProperties.getLogPageSize();
initLogManager();
@@ -156,7 +156,7 @@
*/
logPageFlusher = new LogPageFlushThread(this);
logPageFlusher.setDaemon(true);
- AsterixThreadExecutor.INSTANCE.execute(logPageFlusher);
+ txnSubsystem.getAsterixAppRuntimeContext().getThreadExecutor().execute(logPageFlusher);
}
public int getLogPageIndex(long lsnValue) {
@@ -604,7 +604,7 @@
}
public void renewLogFiles() throws ACIDException {
- closeLogPages();
+ closeLogPages();
List<String> logFileNames = LogUtil.getLogFiles(logManagerProperties);
for (String name : logFileNames) {
File file = new File(LogUtil.getLogFilePath(logManagerProperties, Long.parseLong(name)));
@@ -706,7 +706,7 @@
@Override
public TransactionSubsystem getTransactionSubsystem() {
- return provider;
+ return txnSubsystem;
}
static AtomicInteger t = new AtomicInteger();
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/recovery/RecoveryManager.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/recovery/RecoveryManager.java
index ad1db1f..acc59fa 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/recovery/RecoveryManager.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/recovery/RecoveryManager.java
@@ -36,10 +36,10 @@
import java.util.logging.Level;
import java.util.logging.Logger;
+import edu.uci.ics.asterix.common.api.IAsterixAppRuntimeContext;
import edu.uci.ics.asterix.common.api.ILocalResourceMetadata;
import edu.uci.ics.asterix.common.context.BaseOperationTracker;
import edu.uci.ics.asterix.common.exceptions.ACIDException;
-import edu.uci.ics.asterix.common.transactions.IAsterixAppRuntimeContextProvider;
import edu.uci.ics.asterix.common.transactions.IBuffer;
import edu.uci.ics.asterix.common.transactions.ILogCursor;
import edu.uci.ics.asterix.common.transactions.ILogFilter;
@@ -262,7 +262,7 @@
boolean foundWinnerTxn;
//#. get indexLifeCycleManager
- IAsterixAppRuntimeContextProvider appRuntimeContext = txnSubsystem.getAsterixAppRuntimeContextProvider();
+ IAsterixAppRuntimeContext appRuntimeContext = txnSubsystem.getAsterixAppRuntimeContext();
IIndexLifecycleManager indexLifecycleManager = appRuntimeContext.getIndexLifecycleManager();
ILocalResourceRepository localResourceRepository = appRuntimeContext.getLocalResourceRepository();
@@ -444,7 +444,7 @@
// right after the new checkpoint file is written.
File[] prevCheckpointFiles = getPreviousCheckpointFiles();
- IIndexLifecycleManager indexLifecycleManager = txnSubsystem.getAsterixAppRuntimeContextProvider()
+ IIndexLifecycleManager indexLifecycleManager = txnSubsystem.getAsterixAppRuntimeContext()
.getIndexLifecycleManager();
List<IIndex> openIndexList = indexLifecycleManager.getOpenIndexes();
List<BlockingIOOperationCallbackWrapper> callbackList = new LinkedList<BlockingIOOperationCallbackWrapper>();
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionSubsystem.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionSubsystem.java
index b76cf11..d49a86b 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionSubsystem.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionSubsystem.java
@@ -14,9 +14,9 @@
*/
package edu.uci.ics.asterix.transaction.management.service.transaction;
+import edu.uci.ics.asterix.common.api.IAsterixAppRuntimeContext;
import edu.uci.ics.asterix.common.config.AsterixTransactionProperties;
import edu.uci.ics.asterix.common.exceptions.ACIDException;
-import edu.uci.ics.asterix.common.transactions.IAsterixAppRuntimeContextProvider;
import edu.uci.ics.asterix.common.transactions.ILockManager;
import edu.uci.ics.asterix.common.transactions.ILogManager;
import edu.uci.ics.asterix.common.transactions.IRecoveryManager;
@@ -41,12 +41,13 @@
private final IRecoveryManager recoveryManager;
private final TransactionalResourceManagerRepository resourceRepository;
private final IndexLoggerRepository loggerRepository;
- private final IAsterixAppRuntimeContextProvider asterixAppRuntimeContextProvider;
+ private final IAsterixAppRuntimeContext asterixAppRuntimeContext;
private final CheckpointThread checkpointThread;
private final AsterixTransactionProperties txnProperties;
- public TransactionSubsystem(String id, IAsterixAppRuntimeContextProvider asterixAppRuntimeContextProvider,
+ public TransactionSubsystem(String id, IAsterixAppRuntimeContext asterixAppRuntimeContext,
AsterixTransactionProperties txnProperties) throws ACIDException {
+ this.asterixAppRuntimeContext = asterixAppRuntimeContext;
this.id = id;
this.txnProperties = txnProperties;
this.transactionManager = new TransactionManager(this);
@@ -55,10 +56,9 @@
this.recoveryManager = new RecoveryManager(this);
this.loggerRepository = new IndexLoggerRepository(this);
this.resourceRepository = new TransactionalResourceManagerRepository();
- this.asterixAppRuntimeContextProvider = asterixAppRuntimeContextProvider;
- if (asterixAppRuntimeContextProvider != null) {
+ if (asterixAppRuntimeContext != null) {
this.checkpointThread = new CheckpointThread(recoveryManager,
- asterixAppRuntimeContextProvider.getIndexLifecycleManager(),
+ asterixAppRuntimeContext.getIndexLifecycleManager(),
this.txnProperties.getCheckpointLSNThreshold(), this.txnProperties.getCheckpointPollFrequency());
} else {
this.checkpointThread = null;
@@ -89,8 +89,8 @@
return loggerRepository;
}
- public IAsterixAppRuntimeContextProvider getAsterixAppRuntimeContextProvider() {
- return asterixAppRuntimeContextProvider;
+ public IAsterixAppRuntimeContext getAsterixAppRuntimeContext() {
+ return asterixAppRuntimeContext;
}
public AsterixTransactionProperties getTransactionProperties() {