Merge branch 'master' into raman/asterix_lsm_stabilization_coredump
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/api/common/AsterixAppRuntimeContext.java b/asterix-app/src/main/java/edu/uci/ics/asterix/api/common/AsterixAppRuntimeContext.java
index e64f68f..d4d7ac6 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/api/common/AsterixAppRuntimeContext.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/api/common/AsterixAppRuntimeContext.java
@@ -27,6 +27,8 @@
import edu.uci.ics.hyracks.api.application.INCApplicationContext;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.api.io.IIOManager;
+import edu.uci.ics.hyracks.api.lifecycle.ILifeCycleComponent;
+import edu.uci.ics.hyracks.api.lifecycle.LifeCycleComponentManager;
import edu.uci.ics.hyracks.storage.am.common.api.IIndex;
import edu.uci.ics.hyracks.storage.am.common.api.IIndexLifecycleManager;
import edu.uci.ics.hyracks.storage.am.common.dataflow.IndexLifecycleManager;
@@ -99,10 +101,9 @@
ioManager = ncApplicationContext.getRootContext().getIOManager();
bufferCache = new BufferCache(ioManager, allocator, prs, pcp, fileMapManager,
storageProperties.getBufferCachePageSize(), storageProperties.getBufferCacheNumPages(),
- storageProperties.getBufferCacheMaxOpenFiles());
+ storageProperties.getBufferCacheMaxOpenFiles(), ncApplicationContext.getThreadFactory());
indexLifecycleManager = new IndexLifecycleManager(storageProperties.getMemoryComponentGlobalBudget());
-
lsmIOScheduler = SynchronousScheduler.INSTANCE;
mergePolicy = new ConstantMergePolicy(storageProperties.getLSMIndexMergeThreshold(), this);
lsmBTreeOpTrackerFactory = new IndexOperationTrackerFactory(LSMBTreeIOOperationCallbackFactory.INSTANCE);
@@ -120,6 +121,14 @@
this);
txnSubsystem = new TransactionSubsystem(ncApplicationContext.getNodeId(), asterixAppRuntimeContextProvider);
isShuttingdown = false;
+
+ // The order of registration is important. The buffer cache must registered before recovery and transaction managers.
+ LifeCycleComponentManager.INSTANCE.register((ILifeCycleComponent) bufferCache);
+ LifeCycleComponentManager.INSTANCE.register((ILifeCycleComponent) indexLifecycleManager);
+ LifeCycleComponentManager.INSTANCE.register((ILifeCycleComponent) txnSubsystem.getTransactionManager());
+ LifeCycleComponentManager.INSTANCE.register((ILifeCycleComponent) txnSubsystem.getLogManager());
+ LifeCycleComponentManager.INSTANCE.register((ILifeCycleComponent) txnSubsystem.getLockManager());
+ LifeCycleComponentManager.INSTANCE.register((ILifeCycleComponent) txnSubsystem.getRecoveryManager());
}
public boolean isShuttingdown() {
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/CCApplicationEntryPoint.java b/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/CCApplicationEntryPoint.java
index 5d4f6ec..9b83f00 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/CCApplicationEntryPoint.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/CCApplicationEntryPoint.java
@@ -13,6 +13,7 @@
import edu.uci.ics.asterix.api.http.servlet.QueryResultAPIServlet;
import edu.uci.ics.asterix.api.http.servlet.QueryStatusAPIServlet;
import edu.uci.ics.asterix.api.http.servlet.UpdateAPIServlet;
+import edu.uci.ics.asterix.common.api.AsterixThreadFactory;
import edu.uci.ics.asterix.common.config.AsterixExternalProperties;
import edu.uci.ics.asterix.common.config.AsterixMetadataProperties;
import edu.uci.ics.asterix.metadata.MetadataManager;
@@ -37,12 +38,12 @@
@Override
public void start(ICCApplicationContext ccAppCtx, String[] args) throws Exception {
this.appCtx = ccAppCtx;
+
if (LOGGER.isLoggable(Level.INFO)) {
LOGGER.info("Starting Asterix cluster controller");
}
-
+ appCtx.setThreadFactory(AsterixThreadFactory.INSTANCE);
AsterixAppContextInfo.initialize(appCtx);
-
proxy = AsterixStateProxy.registerRemoteObject();
appCtx.setDistributedState(proxy);
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java b/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java
index 4569088..32bfeb5 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java
@@ -1,11 +1,15 @@
package edu.uci.ics.asterix.hyracks.bootstrap;
+import java.io.File;
import java.rmi.RemoteException;
import java.rmi.server.UnicastRemoteObject;
+import java.util.HashMap;
+import java.util.Map;
import java.util.logging.Level;
import java.util.logging.Logger;
import edu.uci.ics.asterix.api.common.AsterixAppRuntimeContext;
+import edu.uci.ics.asterix.common.api.AsterixThreadFactory;
import edu.uci.ics.asterix.common.api.IAsterixAppRuntimeContext;
import edu.uci.ics.asterix.common.config.AsterixMetadataProperties;
import edu.uci.ics.asterix.common.config.IAsterixPropertiesProvider;
@@ -19,6 +23,7 @@
import edu.uci.ics.asterix.transaction.management.resource.PersistentLocalResourceRepository;
import edu.uci.ics.hyracks.api.application.INCApplicationContext;
import edu.uci.ics.hyracks.api.application.INCApplicationEntryPoint;
+import edu.uci.ics.hyracks.api.lifecycle.LifeCycleComponentManager;
public class NCApplicationEntryPoint implements INCApplicationEntryPoint {
private static final Logger LOGGER = Logger.getLogger(NCApplicationEntryPoint.class.getName());
@@ -32,17 +37,19 @@
@Override
public void start(INCApplicationContext ncAppCtx, String[] args) throws Exception {
+ ncAppCtx.setThreadFactory(AsterixThreadFactory.INSTANCE);
ncApplicationContext = ncAppCtx;
nodeId = ncApplicationContext.getNodeId();
if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("Starting Asterix node controller: " + nodeId);
+ LOGGER.info("Starting Asterix node controller TAKE NOTE: " + nodeId);
}
+ JVMShutdownHook sHook = new JVMShutdownHook(this);
+ Runtime.getRuntime().addShutdownHook(sHook);
+
runtimeContext = new AsterixAppRuntimeContext(ncApplicationContext);
runtimeContext.initialize();
ncApplicationContext.setApplicationObject(runtimeContext);
- JVMShutdownHook sHook = new JVMShutdownHook(this);
- Runtime.getRuntime().addShutdownHook(sHook);
// #. recover if the system is corrupted by checking system state.
IRecoveryManager recoveryMgr = runtimeContext.getTransactionSubsystem().getRecoveryManager();
@@ -78,6 +85,8 @@
if (isMetadataNode) {
MetadataBootstrap.stopUniverse();
}
+
+ LifeCycleComponentManager.INSTANCE.stopAll(false);
runtimeContext.deinitialize();
} else {
if (LOGGER.isLoggable(Level.INFO)) {
@@ -89,7 +98,8 @@
@Override
public void notifyStartupComplete() throws Exception {
IAsterixStateProxy proxy = (IAsterixStateProxy) ncApplicationContext.getDistributedState();
- AsterixMetadataProperties metadataProperties = ((IAsterixPropertiesProvider)runtimeContext).getMetadataProperties();
+ AsterixMetadataProperties metadataProperties = ((IAsterixPropertiesProvider) runtimeContext)
+ .getMetadataProperties();
if (systemState == SystemState.NEW_UNIVERSE) {
if (LOGGER.isLoggable(Level.INFO)) {
@@ -113,11 +123,29 @@
}
MetadataManager.INSTANCE = new MetadataManager(proxy, metadataProperties);
MetadataManager.INSTANCE.init();
- MetadataBootstrap.startUniverse( ((IAsterixPropertiesProvider)runtimeContext), ncApplicationContext,
+ MetadataBootstrap.startUniverse(((IAsterixPropertiesProvider) runtimeContext), ncApplicationContext,
systemState == SystemState.NEW_UNIVERSE);
MetadataBootstrap.startDDLRecovery();
}
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Starting lifecycle components");
+ }
+
+ Map<String, String> lifecycleMgmtConfiguration = new HashMap<String, String>();
+ String key = LifeCycleComponentManager.Config.DUMP_PATH_KEY;
+ String value = metadataProperties.getCoredumpPath(nodeId);
+ lifecycleMgmtConfiguration.put(key, value);
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Coredump directory for NC is: " + value);
+ }
+ LifeCycleComponentManager.INSTANCE.configure(lifecycleMgmtConfiguration);
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Configured:" + LifeCycleComponentManager.INSTANCE);
+ }
+
+ LifeCycleComponentManager.INSTANCE.startAll();
+
IRecoveryManager recoveryMgr = runtimeContext.getTransactionSubsystem().getRecoveryManager();
recoveryMgr.checkpoint(true);
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/api/AsterixThreadExecutor.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/api/AsterixThreadExecutor.java
new file mode 100644
index 0000000..14975ff
--- /dev/null
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/api/AsterixThreadExecutor.java
@@ -0,0 +1,32 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.common.api;
+
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
+
+public class AsterixThreadExecutor implements Executor {
+ public final static AsterixThreadExecutor INSTANCE = new AsterixThreadExecutor();
+ private final Executor executor = Executors.newCachedThreadPool(AsterixThreadFactory.INSTANCE);
+
+ private AsterixThreadExecutor() {
+
+ }
+
+ @Override
+ public void execute(Runnable command) {
+ executor.execute(command);
+ }
+}
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/api/AsterixThreadFactory.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/api/AsterixThreadFactory.java
new file mode 100644
index 0000000..7e4735f
--- /dev/null
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/api/AsterixThreadFactory.java
@@ -0,0 +1,41 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.common.api;
+
+import java.util.concurrent.ThreadFactory;
+
+import edu.uci.ics.hyracks.api.lifecycle.LifeCycleComponentManager;
+
+public class AsterixThreadFactory implements ThreadFactory {
+
+ public final static AsterixThreadFactory INSTANCE = new AsterixThreadFactory();
+
+ private AsterixThreadFactory() {
+
+ }
+
+ @Override
+ public Thread newThread(Runnable r) {
+ Thread t;
+ if ((r instanceof Thread)) {
+ t = (Thread) r;
+ } else {
+ t = new Thread(r);
+ }
+ t.setUncaughtExceptionHandler(LifeCycleComponentManager.INSTANCE);
+ return t;
+ }
+
+}
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/config/AsterixMetadataProperties.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/config/AsterixMetadataProperties.java
index 6d47e78..6b6cded 100644
--- a/asterix-common/src/main/java/edu/uci/ics/asterix/common/config/AsterixMetadataProperties.java
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/config/AsterixMetadataProperties.java
@@ -24,5 +24,9 @@
public Set<String> getNodeNames() {
return accessor.getNodeNames();
}
+
+ public String getCoredumpPath(String nodeId){
+ return accessor.getCoredumpPath(nodeId);
+ }
}
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/config/AsterixPropertiesAccessor.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/config/AsterixPropertiesAccessor.java
index 7b2f2a6..d623ae5 100644
--- a/asterix-common/src/main/java/edu/uci/ics/asterix/common/config/AsterixPropertiesAccessor.java
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/config/AsterixPropertiesAccessor.java
@@ -16,6 +16,7 @@
import javax.xml.bind.Unmarshaller;
import edu.uci.ics.asterix.common.configuration.AsterixConfiguration;
+import edu.uci.ics.asterix.common.configuration.Coredump;
import edu.uci.ics.asterix.common.configuration.Property;
import edu.uci.ics.asterix.common.configuration.Store;
import edu.uci.ics.asterix.common.exceptions.AsterixException;
@@ -26,6 +27,7 @@
private final String metadataNodeName;
private final Set<String> nodeNames;
private final Map<String, String[]> stores;
+ private final Map<String, String> coredumpConfig;
private final Map<String, Property> asterixConfigurationParams;
public AsterixPropertiesAccessor() throws AsterixException {
@@ -64,6 +66,11 @@
for (Property p : asterixConfiguration.getProperty()) {
asterixConfigurationParams.put(p.getName(), p);
}
+ coredumpConfig = new HashMap<String, String>();
+ for (Coredump cd : asterixConfiguration.getCoredump()) {
+ coredumpConfig.put(cd.getNcId(), cd.getCoredumpPath());
+ }
+
}
public String getMetadataNodeName() {
@@ -82,6 +89,10 @@
return nodeNames;
}
+ public String getCoredumpPath(String nodeId) {
+ return coredumpConfig.get(nodeId);
+ }
+
public <T> T getProperty(String property, T defaultValue, IPropertyInterpreter<T> interpreter) {
Property p = asterixConfigurationParams.get(property);
if (p == null) {
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/ILogManager.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/ILogManager.java
index 9470d17..c90422c 100644
--- a/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/ILogManager.java
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/ILogManager.java
@@ -50,12 +50,10 @@
ACIDException;
/**
- * @param logicalLogLocator
+ * @param lsnValue
* TODO
* @param logicalLogLocator
* TODO
- * @param PhysicalLogLocator
- * specifies the location of the log record to be read
* @throws ACIDException
*/
public void readLog(long lsnValue, LogicalLogLocator logicalLogLocator) throws ACIDException;
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/ITransactionContext.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/ITransactionContext.java
index a06cc75..a7f2984 100644
--- a/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/ITransactionContext.java
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/ITransactionContext.java
@@ -1,7 +1,6 @@
package edu.uci.ics.asterix.common.transactions;
import edu.uci.ics.asterix.common.exceptions.ACIDException;
-import edu.uci.ics.asterix.common.transactions.ITransactionContext.TransactionType;
import edu.uci.ics.asterix.common.transactions.ITransactionManager.TransactionState;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndex;
@@ -42,6 +41,8 @@
public void setTransactionType(TransactionType transactionType);
+ public String prettyPrint();
+
public static final long INVALID_TIME = -1l; // used for showing a
// transaction is not waiting.
public static final int ACTIVE_STATUS = 0;
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/ITransactionManager.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/ITransactionManager.java
index 2bfc00d..e57cc64 100644
--- a/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/ITransactionManager.java
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/ITransactionManager.java
@@ -120,5 +120,4 @@
*/
public ITransactionSubsystem getTransactionProvider();
-
}
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/LogManagerProperties.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/LogManagerProperties.java
index 4dc943c..89816aa 100644
--- a/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/LogManagerProperties.java
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/LogManagerProperties.java
@@ -105,7 +105,7 @@
public String getLogDirKey() {
return logDirKey;
}
-
+
public int getDiskSectorSize() {
return diskSectorSize;
}
diff --git a/asterix-common/src/main/resources/schema/asterix-conf.xsd b/asterix-common/src/main/resources/schema/asterix-conf.xsd
index f53fb4b..5aefdbd 100644
--- a/asterix-common/src/main/resources/schema/asterix-conf.xsd
+++ b/asterix-common/src/main/resources/schema/asterix-conf.xsd
@@ -7,6 +7,7 @@
<xs:element name="metadataNode" type="xs:string" />
+ <xs:element name="coredumpPath" type="xs:string" />
<xs:element name="storeDirs" type="xs:string" />
<xs:element name="ncId" type="xs:string" />
<xs:element name="name" type="xs:string" />
@@ -23,6 +24,15 @@
</xs:complexType>
</xs:element>
+ <xs:element name="coredump">
+ <xs:complexType>
+ <xs:sequence>
+ <xs:element ref="mg:ncId" />
+ <xs:element ref="mg:coredumpPath" />
+ </xs:sequence>
+ </xs:complexType>
+ </xs:element>
+
<xs:element name="property">
<xs:complexType>
<xs:sequence>
@@ -39,6 +49,7 @@
<xs:sequence>
<xs:element ref="mg:metadataNode" minOccurs="0"/>
<xs:element ref="mg:store" maxOccurs="unbounded" />
+ <xs:element ref="mg:coredump" maxOccurs="unbounded" />
<xs:element ref="mg:property" minOccurs="0" maxOccurs="unbounded" />
</xs:sequence>
</xs:complexType>
diff --git a/asterix-events/src/main/java/edu/uci/ics/asterix/event/management/EventExecutor.java b/asterix-events/src/main/java/edu/uci/ics/asterix/event/management/EventExecutor.java
index e702ef3..63a791b 100644
--- a/asterix-events/src/main/java/edu/uci/ics/asterix/event/management/EventExecutor.java
+++ b/asterix-events/src/main/java/edu/uci/ics/asterix/event/management/EventExecutor.java
@@ -18,7 +18,6 @@
import java.io.File;
import java.io.IOException;
import java.io.StringWriter;
-import java.math.BigInteger;
import java.util.ArrayList;
import java.util.List;
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/data/operator/FeedIntakeOperatorNodePushable.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/data/operator/FeedIntakeOperatorNodePushable.java
index d0dbb98..fd40b03 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/data/operator/FeedIntakeOperatorNodePushable.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/data/operator/FeedIntakeOperatorNodePushable.java
@@ -17,6 +17,7 @@
import java.nio.ByteBuffer;
import java.util.concurrent.LinkedBlockingQueue;
+import edu.uci.ics.asterix.common.api.AsterixThreadExecutor;
import edu.uci.ics.asterix.external.dataset.adapter.IDatasourceAdapter;
import edu.uci.ics.asterix.external.feed.lifecycle.AlterFeedMessage;
import edu.uci.ics.asterix.external.feed.lifecycle.FeedId;
@@ -51,7 +52,7 @@
public void open() throws HyracksDataException {
if (adapter instanceof IManagedFeedAdapter) {
feedInboxMonitor = new FeedInboxMonitor((IManagedFeedAdapter) adapter, inbox, partition);
- feedInboxMonitor.start();
+ AsterixThreadExecutor.INSTANCE.execute(feedInboxMonitor);
feedManager.registerFeedMsgQueue(feedId, inbox);
}
writer.open();
diff --git a/asterix-installer/src/main/java/edu/uci/ics/asterix/installer/driver/InstallerUtil.java b/asterix-installer/src/main/java/edu/uci/ics/asterix/installer/driver/InstallerUtil.java
index 2b884ef..abf0420 100644
--- a/asterix-installer/src/main/java/edu/uci/ics/asterix/installer/driver/InstallerUtil.java
+++ b/asterix-installer/src/main/java/edu/uci/ics/asterix/installer/driver/InstallerUtil.java
@@ -49,6 +49,7 @@
import edu.uci.ics.asterix.common.configuration.AsterixConfiguration;
import edu.uci.ics.asterix.common.configuration.Store;
+import edu.uci.ics.asterix.common.configuration.Coredump;
import edu.uci.ics.asterix.event.driver.EventDriver;
import edu.uci.ics.asterix.event.management.EventrixClient;
import edu.uci.ics.asterix.event.management.EventUtil;
@@ -223,6 +224,14 @@
}
configuration.setStore(stores);
+ List<Coredump> coredump = new ArrayList<Coredump>();
+ String coredumpDir = null;
+ for (Node node : cluster.getNode()) {
+ coredumpDir = node.getLogDir() == null ? cluster.getLogDir() : node.getLogDir();
+ coredump.add(new Coredump(asterixInstanceName + "_" + node.getId(), coredumpDir));
+ }
+ configuration.setCoredump(coredump);
+
File asterixConfDir = new File(InstallerDriver.getAsterixDir() + File.separator + asterixInstanceName);
asterixConfDir.mkdirs();
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataManager.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataManager.java
index f9f5260..c33d130 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataManager.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataManager.java
@@ -43,38 +43,29 @@
* received from the metadata node, to avoid contacting the metadata node
* repeatedly. We assume that this metadata manager is the only metadata manager
* in an Asterix cluster. Therefore, no separate cache-invalidation mechanism is
- * needed at this point.
- * Assumptions/Limitations:
- * The metadata subsystem is started during NC Bootstrap start, i.e., when
- * Asterix is deployed.
- * The metadata subsystem is destroyed in NC Bootstrap end, i.e., when Asterix
- * is undeployed.
- * The metadata subsystem consists of the MetadataManager and the MatadataNode.
- * The MetadataManager provides users access to the metadata.
- * The MetadataNode implements direct access to the storage layer on behalf of
- * the MetadataManager, and translates the binary representation of ADM into
- * Java objects for consumption by the MetadataManager's users.
- * There is exactly one instance of the MetadataManager and of the MetadataNode
- * in the cluster, which may or may not be co-located on the same machine (or in
- * the same JVM).
- * The MetadataManager exists in the same JVM as its user's (e.g., the query
- * compiler).
- * The MetadataNode exists in the same JVM as it's transactional components
- * (LockManager, LogManager, etc.)
- * Users shall access the metadata only through the MetadataManager, and never
- * via the MetadataNode directly.
+ * needed at this point. Assumptions/Limitations: The metadata subsystem is
+ * started during NC Bootstrap start, i.e., when Asterix is deployed. The
+ * metadata subsystem is destroyed in NC Bootstrap end, i.e., when Asterix is
+ * undeployed. The metadata subsystem consists of the MetadataManager and the
+ * MatadataNode. The MetadataManager provides users access to the metadata. The
+ * MetadataNode implements direct access to the storage layer on behalf of the
+ * MetadataManager, and translates the binary representation of ADM into Java
+ * objects for consumption by the MetadataManager's users. There is exactly one
+ * instance of the MetadataManager and of the MetadataNode in the cluster, which
+ * may or may not be co-located on the same machine (or in the same JVM). The
+ * MetadataManager exists in the same JVM as its user's (e.g., the query
+ * compiler). The MetadataNode exists in the same JVM as it's transactional
+ * components (LockManager, LogManager, etc.) Users shall access the metadata
+ * only through the MetadataManager, and never via the MetadataNode directly.
* Multiple threads may issue requests to the MetadataManager concurrently. For
* the sake of accessing metadata, we assume a transaction consists of one
- * thread.
- * Users are responsible for locking the metadata (using the MetadataManager
- * API) before issuing requests.
- * The MetadataNode is responsible for acquiring finer-grained locks on behalf
- * of requests from the MetadataManager. Currently, locks are acquired per
- * BTree, since the BTree does not acquire even finer-grained locks yet
- * internally.
- * The metadata can be queried with AQL DML like any other dataset, but can only
- * be changed with AQL DDL.
- * The transaction ids for metadata transactions must be unique across the
+ * thread. Users are responsible for locking the metadata (using the
+ * MetadataManager API) before issuing requests. The MetadataNode is responsible
+ * for acquiring finer-grained locks on behalf of requests from the
+ * MetadataManager. Currently, locks are acquired per BTree, since the BTree
+ * does not acquire even finer-grained locks yet internally. The metadata can be
+ * queried with AQL DML like any other dataset, but can only be changed with AQL
+ * DDL. The transaction ids for metadata transactions must be unique across the
* cluster, i.e., metadata transaction ids shall never "accidentally" overlap
* with transaction ids of regular jobs or other metadata transactions.
*/
@@ -220,7 +211,7 @@
@Override
public void addDataset(MetadataTransactionContext ctx, Dataset dataset) throws MetadataException {
- // add dataset into metadataNode
+ // add dataset into metadataNode
try {
metadataNode.addDataset(ctx.getJobId(), dataset);
} catch (RemoteException e) {
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/DatasetLockInfo.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/DatasetLockInfo.java
index d5e525a..d583352 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/DatasetLockInfo.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/DatasetLockInfo.java
@@ -437,6 +437,20 @@
return s.toString();
}
+
+ public String coreDump() {
+ StringBuilder sb = new StringBuilder();
+ sb.append("\n\t firstUpgrader: " + firstUpgrader);
+ sb.append("\n\t firstWaiter: " + firstWaiter);
+ sb.append("\n\t lastHolder: " + lastHolder);
+ sb.append("\n\t ISCount: " + ISCount);
+ sb.append("\n\t IXCount: " + IXCount);
+ sb.append("\n\t SCount: " + SCount);
+ sb.append("\n\t XCount: " + XCount);
+ sb.append("\n\t entityResourceHT");
+ sb.append(entityResourceHT.prettyPrint());
+ return sb.toString();
+ }
/////////////////////////////////////////////////////////
// set/get method for private variable
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/EntityInfoManager.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/EntityInfoManager.java
index 5d81e8a..206a3bf 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/EntityInfoManager.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/EntityInfoManager.java
@@ -15,6 +15,8 @@
package edu.uci.ics.asterix.transaction.management.service.locking;
+import java.io.IOException;
+import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.util.ArrayList;
@@ -272,29 +274,35 @@
if (child.isDeinitialized()) {
continue;
}
- s.append("child[" + i + "]: occupiedSlots:" + child.getNumOfOccupiedSlots());
- s.append(" freeSlotNum:" + child.getFreeSlotNum() + "\n");
- s.append("\tjid\t").append("did\t").append("PK\t").append("DLM\t").append("DLC\t").append("ELM\t")
- .append("ELC\t").append("NEA\t").append("PJR\t").append("NJR\n");
- for (int j = 0; j < ChildEntityInfoArrayManager.NUM_OF_SLOTS; j++) {
- s.append(j).append(": ");
- s.append("\t" + child.getJobId(j));
- s.append("\t" + child.getDatasetId(j));
- s.append("\t" + child.getPKHashVal(j));
- s.append("\t" + child.getDatasetLockMode(j));
- s.append("\t" + child.getDatasetLockCount(j));
- s.append("\t" + child.getEntityLockMode(j));
- s.append("\t" + child.getEntityLockCount(j));
- s.append("\t" + child.getNextEntityActor(j));
- s.append("\t" + child.getPrevJobResource(j));
- s.append("\t" + child.getNextJobResource(j));
- //s.append("\t" + child.getNextDatasetActor(j));
- s.append("\n");
- }
- s.append("\n");
+ s.append("child[" + i + "]");
+ s.append(child.prettyPrint());
}
return s.toString();
}
+
+ public void coreDump(OutputStream os) {
+ StringBuilder sb = new StringBuilder("\n\t########### EntityLockInfoManager Status #############\n");
+ int size = pArray.size();
+ ChildEntityInfoArrayManager child;
+
+ sb.append("Number of Child: " + size + "\n");
+ for (int i = 0; i < size; i++) {
+ try {
+ child = pArray.get(i);
+ sb.append("child[" + i + "]");
+ sb.append(child.prettyPrint());
+
+ os.write(sb.toString().getBytes());
+ } catch (IOException e) {
+ //ignore IOException
+ }
+ sb = new StringBuilder();
+ }
+ }
+
+ public int getShrinkTimerThreshold() {
+ return SHRINK_TIMER_THRESHOLD;
+ }
public void initEntityInfo(int slotNum, int jobId, int datasetId, int PKHashVal, byte lockMode) {
pArray.get(slotNum / ChildEntityInfoArrayManager.NUM_OF_SLOTS).initEntityInfo(
@@ -567,6 +575,29 @@
public int getFreeSlotNum() {
return freeSlotNum;
}
+
+ public String prettyPrint() {
+ StringBuilder sb = new StringBuilder();
+ sb.append("\n\toccupiedSlots:" + getNumOfOccupiedSlots());
+ sb.append("\n\tfreeSlotNum:" + getFreeSlotNum() + "\n");
+ sb.append("\tjid\t").append("did\t").append("PK\t").append("DLM\t").append("DLC\t").append("ELM\t")
+ .append("ELC\t").append("NEA\t").append("PJR\t").append("NJR\n");
+ for (int j = 0; j < ChildEntityInfoArrayManager.NUM_OF_SLOTS; j++) {
+ sb.append(j).append(": ");
+ sb.append("\t" + getJobId(j));
+ sb.append("\t" + getDatasetId(j));
+ sb.append("\t" + getPKHashVal(j));
+ sb.append("\t" + getDatasetLockMode(j));
+ sb.append("\t" + getDatasetLockCount(j));
+ sb.append("\t" + getEntityLockMode(j));
+ sb.append("\t" + getEntityLockCount(j));
+ sb.append("\t" + getNextEntityActor(j));
+ sb.append("\t" + getPrevJobResource(j));
+ sb.append("\t" + getNextJobResource(j));
+ sb.append("\n");
+ }
+ return sb.toString();
+ }
//////////////////////////////////////////////////////////////////
// set/get method for each field of EntityInfo plus freeSlot
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/EntityLockInfoManager.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/EntityLockInfoManager.java
index ca00aa2..2fae460 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/EntityLockInfoManager.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/EntityLockInfoManager.java
@@ -15,6 +15,8 @@
package edu.uci.ics.asterix.transaction.management.service.locking;
+import java.io.IOException;
+import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.util.ArrayList;
@@ -271,22 +273,35 @@
if (child.isDeinitialized()) {
continue;
}
- s.append("child[" + i + "]: occupiedSlots:" + child.getNumOfOccupiedSlots());
- s.append(" freeSlotNum:" + child.getFreeSlotNum() + "\n");
- s.append("\tX\t").append("S\t").append("LH\t").append("FW\t").append("UP\n");
- for (int j = 0; j < ChildEntityLockInfoArrayManager.NUM_OF_SLOTS; j++) {
- s.append(j).append(": ");
- s.append("\t" + child.getXCount(j));
- s.append("\t" + child.getSCount(j));
- s.append("\t" + child.getLastHolder(j));
- s.append("\t" + child.getFirstWaiter(j));
- s.append("\t" + child.getUpgrader(j));
- s.append("\n");
- }
- s.append("\n");
+ s.append("child[" + i + "]");
+ s.append(child.prettyPrint());
}
return s.toString();
}
+
+ public void coreDump(OutputStream os) {
+ StringBuilder sb = new StringBuilder("\n\t########### EntityLockInfoManager Status #############\n");
+ int size = pArray.size();
+ ChildEntityLockInfoArrayManager child;
+
+ sb.append("Number of Child: " + size + "\n");
+ for (int i = 0; i < size; i++) {
+ try {
+ child = pArray.get(i);
+ sb.append("child[" + i + "]");
+ sb.append(child.prettyPrint());
+
+ os.write(sb.toString().getBytes());
+ } catch (IOException e) {
+ //ignore IOException
+ }
+ sb = new StringBuilder();
+ }
+ }
+
+ public int getShrinkTimerThreshold() {
+ return SHRINK_TIMER_THRESHOLD;
+ }
//debugging method
public String printWaiters(int slotNum) {
@@ -736,6 +751,23 @@
public int getFreeSlotNum() {
return freeSlotNum;
}
+
+ public String prettyPrint() {
+ StringBuilder sb = new StringBuilder();
+ sb.append("\n\toccupiedSlots:" + getNumOfOccupiedSlots());
+ sb.append("\n\tfreeSlotNum:" + getFreeSlotNum());
+ sb.append("\n\tX\t").append("S\t").append("LH\t").append("FW\t").append("UP\n");
+ for (int j = 0; j < ChildEntityLockInfoArrayManager.NUM_OF_SLOTS; j++) {
+ sb.append(j).append(": ");
+ sb.append("\t" + getXCount(j));
+ sb.append("\t" + getSCount(j));
+ sb.append("\t" + getLastHolder(j));
+ sb.append("\t" + getFirstWaiter(j));
+ sb.append("\t" + getUpgrader(j));
+ sb.append("\n");
+ }
+ return sb.toString();
+ }
//////////////////////////////////////////////////////////////////
// set/get method for each field of EntityLockInfo plus freeSlot
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/JobInfo.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/JobInfo.java
index d846603..8bf5304 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/JobInfo.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/JobInfo.java
@@ -275,6 +275,17 @@
}
return s.toString();
}
+
+ public String coreDump() {
+ StringBuilder sb = new StringBuilder();
+ sb.append("\n\t datasetISLockHT");
+ sb.append(datasetISLockHT.prettyPrint());
+ sb.append("\n\t firstWaitingResource: " + firstWaitingResource);
+ sb.append("\n\t lastHoldingResource: " + lastHoldingResource);
+ sb.append("\n\t upgradingResource: " + upgradingResource);
+ sb.append("\n\t jobCtx.jobId: " + jobCtx.getJobId());
+ return sb.toString();
+ }
/////////////////////////////////////////////////////////
// set/get method for private variable
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/LockManager.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/LockManager.java
index 8fb7494..cf41199 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/LockManager.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/LockManager.java
@@ -15,10 +15,13 @@
package edu.uci.ics.asterix.transaction.management.service.locking;
+import java.io.IOException;
+import java.io.OutputStream;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Map.Entry;
+import java.util.Set;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -33,6 +36,7 @@
import edu.uci.ics.asterix.transaction.management.service.logging.LogUtil;
import edu.uci.ics.asterix.transaction.management.service.transaction.TransactionManagementConstants.LockManagerConstants.LockMode;
import edu.uci.ics.asterix.transaction.management.service.transaction.TransactionSubsystem;
+import edu.uci.ics.hyracks.api.lifecycle.ILifeCycleComponent;
/**
* An implementation of the ILockManager interface for the
@@ -42,7 +46,7 @@
* @author pouria, kisskys
*/
-public class LockManager implements ILockManager {
+public class LockManager implements ILockManager, ILifeCycleComponent {
public static final boolean IS_DEBUG_MODE = false;//true
//This variable indicates that the dataset granule X lock request is allowed when
@@ -2046,6 +2050,182 @@
unlatchLockTable();
}
}
+
+ @Override
+ public void start() {
+ //no op
+ }
+
+ @Override
+ public void stop(boolean dumpState, OutputStream os) {
+ if (dumpState) {
+
+ //#. dump Configurable Variables
+ dumpConfVars(os);
+
+ //#. dump jobHT
+ dumpJobInfo(os);
+
+ //#. dump datasetResourceHT
+ dumpDatasetLockInfo(os);
+
+ //#. dump entityLockInfoManager
+ dumpEntityLockInfo(os);
+
+ //#. dump entityInfoManager
+ dumpEntityInfo(os);
+
+ //#. dump lockWaiterManager
+
+ dumpLockWaiterInfo(os);
+ try {
+ os.flush();
+ } catch (IOException e) {
+ //ignore
+ }
+ }
+ }
+
+ private void dumpConfVars(OutputStream os) {
+ try {
+ StringBuilder sb = new StringBuilder();
+ sb.append("\n>>dump_begin\t>>----- [ConfVars] -----");
+ sb.append("\nESCALATE_TRHESHOLD_ENTITY_TO_DATASET: " + ESCALATE_TRHESHOLD_ENTITY_TO_DATASET);
+ sb.append("\nSHRINK_TIMER_THRESHOLD (entityLockInfoManager): "
+ + entityLockInfoManager.getShrinkTimerThreshold());
+ sb.append("\nSHRINK_TIMER_THRESHOLD (entityInfoManager): " + entityInfoManager.getShrinkTimerThreshold());
+ sb.append("\nSHRINK_TIMER_THRESHOLD (lockWaiterManager): " + lockWaiterManager.getShrinkTimerThreshold());
+ sb.append("\n>>dump_end\t>>----- [ConfVars] -----\n");
+ os.write(sb.toString().getBytes());
+ } catch (Exception e) {
+ //ignore exception and continue dumping as much as possible.
+ if (IS_DEBUG_MODE) {
+ e.printStackTrace();
+ }
+ }
+ }
+
+ private void dumpJobInfo(OutputStream os) {
+ JobId jobId;
+ JobInfo jobInfo;
+ StringBuilder sb = new StringBuilder();
+
+ try {
+ sb.append("\n>>dump_begin\t>>----- [JobInfo] -----");
+ Set<Map.Entry<JobId, JobInfo>> entrySet = jobHT.entrySet();
+ if (entrySet != null) {
+ for (Map.Entry<JobId, JobInfo> entry : entrySet) {
+ if (entry != null) {
+ jobId = entry.getKey();
+ if (jobId != null) {
+ sb.append("\n" + jobId);
+ } else {
+ sb.append("\nJID:null");
+ }
+
+ jobInfo = entry.getValue();
+ if (jobInfo != null) {
+ sb.append(jobInfo.coreDump());
+ } else {
+ sb.append("\nJobInfo:null");
+ }
+ }
+ }
+ }
+ sb.append("\n>>dump_end\t>>----- [JobInfo] -----\n");
+ os.write(sb.toString().getBytes());
+ } catch (Exception e) {
+ //ignore exception and continue dumping as much as possible.
+ if (IS_DEBUG_MODE) {
+ e.printStackTrace();
+ }
+ }
+ }
+
+ private void dumpDatasetLockInfo(OutputStream os) {
+ DatasetId datasetId;
+ DatasetLockInfo datasetLockInfo;
+ StringBuilder sb = new StringBuilder();
+
+ try {
+ sb.append("\n>>dump_begin\t>>----- [DatasetLockInfo] -----");
+ Set<Map.Entry<DatasetId, DatasetLockInfo>> entrySet = datasetResourceHT.entrySet();
+ if (entrySet != null) {
+ for (Map.Entry<DatasetId, DatasetLockInfo> entry : entrySet) {
+ if (entry != null) {
+ datasetId = entry.getKey();
+ if (datasetId != null) {
+ sb.append("\nDatasetId:" + datasetId.getId());
+ } else {
+ sb.append("\nDatasetId:null");
+ }
+
+ datasetLockInfo = entry.getValue();
+ if (datasetLockInfo != null) {
+ sb.append(datasetLockInfo.coreDump());
+ } else {
+ sb.append("\nDatasetLockInfo:null");
+ }
+ }
+ sb.append("\n>>dump_end\t>>----- [DatasetLockInfo] -----\n");
+ os.write(sb.toString().getBytes());
+
+ //create a new sb to avoid possible OOM exception
+ sb = new StringBuilder();
+ }
+ }
+ } catch (Exception e) {
+ //ignore exception and continue dumping as much as possible.
+ if (IS_DEBUG_MODE) {
+ e.printStackTrace();
+ }
+ }
+ }
+
+ private void dumpEntityLockInfo(OutputStream os) {
+ StringBuilder sb = new StringBuilder();
+ try {
+ sb.append("\n>>dump_begin\t>>----- [EntityLockInfo] -----");
+ entityLockInfoManager.coreDump(os);
+ sb.append("\n>>dump_end\t>>----- [EntityLockInfo] -----\n");
+ os.write(sb.toString().getBytes());
+ } catch (Exception e) {
+ //ignore exception and continue dumping as much as possible.
+ if (IS_DEBUG_MODE) {
+ e.printStackTrace();
+ }
+ }
+ }
+
+ private void dumpEntityInfo(OutputStream os) {
+ StringBuilder sb = new StringBuilder();
+ try {
+ sb.append("\n>>dump_begin\t>>----- [EntityInfo] -----");
+ entityInfoManager.coreDump(os);
+ sb.append("\n>>dump_end\t>>----- [EntityInfo] -----\n");
+ os.write(sb.toString().getBytes());
+ } catch (Exception e) {
+ //ignore exception and continue dumping as much as possible.
+ if (IS_DEBUG_MODE) {
+ e.printStackTrace();
+ }
+ }
+ }
+
+ private void dumpLockWaiterInfo(OutputStream os) {
+ StringBuilder sb = new StringBuilder();
+ try {
+ sb.append("\n>>dump_begin\t>>----- [LockWaiterInfo] -----");
+ lockWaiterManager.coreDump(os);
+ sb.append("\n>>dump_end\t>>----- [LockWaiterInfo] -----\n");
+ os.write(sb.toString().getBytes());
+ } catch (Exception e) {
+ //ignore exception and continue dumping as much as possible.
+ if (IS_DEBUG_MODE) {
+ e.printStackTrace();
+ }
+ }
+ }
}
class ConsecutiveWakeupContext {
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/LockWaiterManager.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/LockWaiterManager.java
index bd414de..c6fcbd5 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/LockWaiterManager.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/LockWaiterManager.java
@@ -15,6 +15,8 @@
package edu.uci.ics.asterix.transaction.management.service.locking;
+import java.io.IOException;
+import java.io.OutputStream;
import java.util.ArrayList;
/**
@@ -258,28 +260,42 @@
StringBuilder s = new StringBuilder("\n########### LockWaiterManager Status #############\n");
int size = pArray.size();
ChildLockWaiterArrayManager child;
- LockWaiter waiter;
for (int i = 0; i < size; i++) {
child = pArray.get(i);
if (child.isDeinitialized()) {
continue;
}
- s.append("child[" + i + "]: occupiedSlots:" + child.getNumOfOccupiedSlots());
- s.append(" freeSlotNum:" + child.getFreeSlotNum() + "\n");
- for (int j = 0; j < ChildLockWaiterArrayManager.NUM_OF_SLOTS; j++) {
- waiter = child.getLockWaiter(j);
- s.append(j).append(": ");
- s.append("\t" + waiter.getEntityInfoSlot());
- s.append("\t" + waiter.needWait());
- s.append("\t" + waiter.isVictim());
- s.append("\n");
- }
- s.append("\n");
+ s.append("child[" + i + "]");
+ s.append(child.prettyPrint());
}
return s.toString();
}
+ public void coreDump(OutputStream os) {
+ StringBuilder sb = new StringBuilder("\n########### LockWaiterManager Status #############\n");
+ int size = pArray.size();
+ ChildLockWaiterArrayManager child;
+
+ sb.append("Number of Child: " + size + "\n");
+ for (int i = 0; i < size; i++) {
+ try {
+ child = pArray.get(i);
+ sb.append("child[" + i + "]");
+ sb.append(child.prettyPrint());
+
+ os.write(sb.toString().getBytes());
+ } catch (IOException e) {
+ //ignore IOException
+ }
+ sb = new StringBuilder();
+ }
+ }
+
+ public int getShrinkTimerThreshold() {
+ return SHRINK_TIMER_THRESHOLD;
+ }
+
public LockWaiter getLockWaiter(int slotNum) {
return pArray.get(slotNum / ChildLockWaiterArrayManager.NUM_OF_SLOTS).getLockWaiter(
slotNum % ChildLockWaiterArrayManager.NUM_OF_SLOTS);
@@ -364,4 +380,20 @@
public int getFreeSlotNum() {
return freeSlotNum;
}
+
+ public String prettyPrint() {
+ LockWaiter waiter;
+ StringBuilder sb = new StringBuilder();
+ sb.append("\n\toccupiedSlots:" + getNumOfOccupiedSlots());
+ sb.append("\n\tfreeSlotNum:" + getFreeSlotNum() + "\n");
+ for (int j = 0; j < ChildLockWaiterArrayManager.NUM_OF_SLOTS; j++) {
+ waiter = getLockWaiter(j);
+ sb.append(j).append(": ");
+ sb.append("\t" + waiter.getEntityInfoSlot());
+ sb.append("\t" + waiter.needWait());
+ sb.append("\t" + waiter.isVictim());
+ sb.append("\n");
+ }
+ return sb.toString();
+ }
}
\ No newline at end of file
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/TimeOutDetector.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/TimeOutDetector.java
index a53c890..05052f0 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/TimeOutDetector.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/TimeOutDetector.java
@@ -2,6 +2,7 @@
import java.util.LinkedList;
+import edu.uci.ics.asterix.common.api.AsterixThreadExecutor;
import edu.uci.ics.asterix.common.exceptions.ACIDException;
/**
@@ -26,7 +27,7 @@
this.lockMgr = lockMgr;
this.trigger = new Thread(new TimeoutTrigger(this));
trigger.setDaemon(true);
- trigger.start();
+ AsterixThreadExecutor.INSTANCE.execute(trigger);
}
public void sweep() throws ACIDException {
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogManager.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogManager.java
index 318996a..205c5f2 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogManager.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogManager.java
@@ -17,6 +17,7 @@
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
+import java.io.OutputStream;
import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
@@ -31,6 +32,7 @@
import java.util.logging.Level;
import java.util.logging.Logger;
+import edu.uci.ics.asterix.common.api.AsterixThreadExecutor;
import edu.uci.ics.asterix.common.exceptions.ACIDException;
import edu.uci.ics.asterix.common.transactions.FileBasedBuffer;
import edu.uci.ics.asterix.common.transactions.FileUtil;
@@ -49,8 +51,9 @@
import edu.uci.ics.asterix.transaction.management.service.transaction.TransactionManagementConstants;
import edu.uci.ics.asterix.transaction.management.service.transaction.TransactionSubsystem;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.lifecycle.ILifeCycleComponent;
-public class LogManager implements ILogManager {
+public class LogManager implements ILogManager, ILifeCycleComponent {
public static final boolean IS_DEBUG_MODE = false;//true
private static final Logger LOGGER = Logger.getLogger(LogManager.class.getName());
@@ -185,7 +188,7 @@
*/
logPageFlusher = new LogPageFlushThread(this);
logPageFlusher.setDaemon(true);
- logPageFlusher.start();
+ AsterixThreadExecutor.INSTANCE.execute(logPageFlusher);
}
public int getLogPageIndex(long lsnValue) {
@@ -758,6 +761,60 @@
map.clear();
}
+
+ @Override
+ public void start() {
+ //no op
+ }
+
+ @Override
+ public void stop(boolean dumpState, OutputStream os) {
+ if (dumpState) {
+ //#. dump Configurable Variables
+ dumpConfVars(os);
+
+ //#. dump LSNInfo
+ dumpLSNInfo(os);
+
+ try {
+ os.flush();
+ } catch (IOException e) {
+ //ignore
+ }
+ }
+ }
+
+ private void dumpConfVars(OutputStream os) {
+ try {
+ StringBuilder sb = new StringBuilder();
+ sb.append("\n>>dump_begin\t>>----- [ConfVars] -----");
+ sb.append(logManagerProperties.toString());
+ sb.append("\n>>dump_end\t>>----- [ConfVars] -----\n");
+ os.write(sb.toString().getBytes());
+ } catch (Exception e) {
+ //ignore exception and continue dumping as much as possible.
+ if (IS_DEBUG_MODE) {
+ e.printStackTrace();
+ }
+ }
+ }
+
+ private void dumpLSNInfo(OutputStream os) {
+ try {
+ StringBuilder sb = new StringBuilder();
+ sb.append("\n>>dump_begin\t>>----- [LSNInfo] -----");
+ sb.append("\nstartingLSN: " + startingLSN);
+ sb.append("\ncurrentLSN: " + lsn.get());
+ sb.append("\nlastFlushedLSN: " + lastFlushedLSN.get());
+ sb.append("\n>>dump_end\t>>----- [LSNInfo] -----\n");
+ os.write(sb.toString().getBytes());
+ } catch (Exception e) {
+ //ignore exception and continue dumping as much as possible.
+ if (IS_DEBUG_MODE) {
+ e.printStackTrace();
+ }
+ }
+ }
}
/*
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/recovery/RecoveryManager.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/recovery/RecoveryManager.java
index 50d4625..1e85c35 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/recovery/RecoveryManager.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/recovery/RecoveryManager.java
@@ -22,6 +22,7 @@
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
+import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
@@ -57,6 +58,7 @@
import edu.uci.ics.asterix.transaction.management.service.transaction.TransactionManager;
import edu.uci.ics.asterix.transaction.management.service.transaction.TransactionSubsystem;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.lifecycle.ILifeCycleComponent;
import edu.uci.ics.hyracks.storage.am.btree.impls.BTree;
import edu.uci.ics.hyracks.storage.am.common.api.IIndex;
import edu.uci.ics.hyracks.storage.am.common.api.IIndexLifecycleManager;
@@ -83,7 +85,7 @@
* not in place completely. Once we have physical logging implemented, we would
* add support for crash recovery.
*/
-public class RecoveryManager implements IRecoveryManager {
+public class RecoveryManager implements IRecoveryManager, ILifeCycleComponent {
public static final boolean IS_DEBUG_MODE = false;//true
private static final Logger LOGGER = Logger.getLogger(RecoveryManager.class.getName());
@@ -430,7 +432,7 @@
if (isSharpCheckpoint && LOGGER.isLoggable(Level.INFO)) {
LOGGER.info("Starting sharp checkpoint ... ");
}
-
+
LogManager logMgr = (LogManager) txnSubsystem.getLogManager();
TransactionManager txnMgr = (TransactionManager) txnSubsystem.getTransactionManager();
String logDir = logMgr.getLogManagerProperties().getLogDir();
@@ -522,7 +524,7 @@
if (isSharpCheckpoint) {
logMgr.renewLogFiles();
}
-
+
if (isSharpCheckpoint && LOGGER.isLoggable(Level.INFO)) {
LOGGER.info("Completed sharp checkpoint.");
}
@@ -782,6 +784,16 @@
+ undoCount);
}
}
+
+ @Override
+ public void start() {
+ //no op
+ }
+
+ @Override
+ public void stop(boolean dumpState, OutputStream os) {
+ //no op
+ }
}
class TxnId {
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionContext.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionContext.java
index 2659d8f..d4360ef 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionContext.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionContext.java
@@ -172,5 +172,15 @@
return (o == this);
}
-
+ public String prettyPrint() {
+ StringBuilder sb = new StringBuilder();
+ sb.append("\n" + jobId + "\n");
+ sb.append("transactionType: " + transactionType);
+ sb.append("firstLogLocator: " + firstLogLocator.getLsn() + "\n");
+ sb.append("lastLogLocator: " + lastLogLocator.getLsn() + "\n");
+ sb.append("TransactionState: " + txnState + "\n");
+ sb.append("startWaitTime: " + startWaitTime + "\n");
+ sb.append("status: " + status + "\n");
+ return sb.toString();
+ }
}
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionManager.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionManager.java
index 4e8808f..04f12ac 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionManager.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionManager.java
@@ -14,8 +14,11 @@
*/
package edu.uci.ics.asterix.transaction.management.service.transaction;
+import java.io.IOException;
+import java.io.OutputStream;
import java.util.HashMap;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.logging.Level;
import java.util.logging.Logger;
@@ -26,15 +29,18 @@
import edu.uci.ics.asterix.common.transactions.ITransactionManager;
import edu.uci.ics.asterix.common.transactions.JobId;
import edu.uci.ics.asterix.transaction.management.service.logging.LogType;
+import edu.uci.ics.hyracks.api.lifecycle.ILifeCycleComponent;
/**
* An implementation of the @see ITransactionManager interface that provides
* implementation of APIs for governing the lifecycle of a transaction.
*/
-public class TransactionManager implements ITransactionManager {
+public class TransactionManager implements ITransactionManager, ILifeCycleComponent {
+
+ public static final boolean IS_DEBUG_MODE = false;//true
private static final Logger LOGGER = Logger.getLogger(TransactionManager.class.getName());
private final TransactionSubsystem transactionProvider;
- private Map<JobId, ITransactionContext> ITransactionContextRepository = new HashMap<JobId, ITransactionContext>();
+ private Map<JobId, ITransactionContext> transactionContextRepository = new HashMap<JobId, ITransactionContext>();
private AtomicInteger maxJobId = new AtomicInteger(0);
public TransactionManager(TransactionSubsystem provider) {
@@ -61,7 +67,7 @@
} finally {
txnContext.releaseResources();
transactionProvider.getLockManager().releaseLocks(txnContext);
- ITransactionContextRepository.remove(txnContext.getJobId());
+ transactionContextRepository.remove(txnContext.getJobId());
txnContext.setTxnState(TransactionState.ABORTED);
}
}
@@ -72,7 +78,7 @@
setMaxJobId(jobId.getId());
ITransactionContext txnContext = new TransactionContext(jobId, transactionProvider);
synchronized (this) {
- ITransactionContextRepository.put(jobId, txnContext);
+ transactionContextRepository.put(jobId, txnContext);
}
return txnContext;
}
@@ -80,13 +86,13 @@
@Override
public ITransactionContext getTransactionContext(JobId jobId) throws ACIDException {
setMaxJobId(jobId.getId());
- synchronized (ITransactionContextRepository) {
+ synchronized (transactionContextRepository) {
- ITransactionContext context = ITransactionContextRepository.get(jobId);
+ ITransactionContext context = transactionContextRepository.get(jobId);
if (context == null) {
- context = ITransactionContextRepository.get(jobId);
+ context = transactionContextRepository.get(jobId);
context = new TransactionContext(jobId, transactionProvider);
- ITransactionContextRepository.put(jobId, context);
+ transactionContextRepository.put(jobId, context);
}
return context;
}
@@ -107,13 +113,13 @@
if (PKHashVal != -1) {
transactionProvider.getLockManager().unlock(datasetId, PKHashVal, txnContext, true);
/*****************************
- try {
- //decrease the transaction reference count on index
- txnContext.decreaseActiveTransactionCountOnIndexes();
- } catch (HyracksDataException e) {
- throw new ACIDException("failed to complete index operation", e);
- }
- *****************************/
+ * try {
+ * //decrease the transaction reference count on index
+ * txnContext.decreaseActiveTransactionCountOnIndexes();
+ * } catch (HyracksDataException e) {
+ * throw new ACIDException("failed to complete index operation", e);
+ * }
+ *****************************/
return;
}
@@ -131,7 +137,7 @@
} finally {
txnContext.releaseResources();
transactionProvider.getLockManager().releaseLocks(txnContext); // release
- ITransactionContextRepository.remove(txnContext.getJobId());
+ transactionContextRepository.remove(txnContext.getJobId());
txnContext.setTxnState(TransactionState.COMMITTED);
}
}
@@ -151,12 +157,69 @@
public TransactionSubsystem getTransactionProvider() {
return transactionProvider;
}
-
+
public void setMaxJobId(int jobId) {
maxJobId.set(Math.max(maxJobId.get(), jobId));
}
-
+
public int getMaxJobId() {
return maxJobId.get();
}
+
+ @Override
+ public void start() {
+ //no op
+ }
+
+ @Override
+ public void stop(boolean dumpState, OutputStream os) {
+ if (dumpState) {
+ //#. dump TxnContext
+ dumpTxnContext(os);
+
+ try {
+ os.flush();
+ } catch (IOException e) {
+ //ignore
+ }
+ }
+ }
+
+ private void dumpTxnContext(OutputStream os) {
+ JobId jobId;
+ ITransactionContext txnCtx;
+ StringBuilder sb = new StringBuilder();
+
+ try {
+ sb.append("\n>>dump_begin\t>>----- [ConfVars] -----");
+ Set<Map.Entry<JobId, ITransactionContext>> entrySet = transactionContextRepository.entrySet();
+ if (entrySet != null) {
+ for (Map.Entry<JobId, ITransactionContext> entry : entrySet) {
+ if (entry != null) {
+ jobId = entry.getKey();
+ if (jobId != null) {
+ sb.append("\n" + jobId);
+ } else {
+ sb.append("\nJID:null");
+ }
+
+ txnCtx = entry.getValue();
+ if (txnCtx != null) {
+ sb.append(txnCtx.prettyPrint());
+ } else {
+ sb.append("\nTxnCtx:null");
+ }
+ }
+ }
+ }
+
+ sb.append("\n>>dump_end\t>>----- [ConfVars] -----\n");
+ os.write(sb.toString().getBytes());
+ } catch (Exception e) {
+ //ignore exception and continue dumping as much as possible.
+ if (IS_DEBUG_MODE) {
+ e.printStackTrace();
+ }
+ }
+ }
}