Prevented merges upon shutdown, made sure to deactivate all indexes upon shutdown. (code reviewed by Zack).
git-svn-id: https://asterixdb.googlecode.com/svn/branches/asterix_lsm_stabilization@1536 eaa15691-b419-025a-1212-ee371bd00084
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java b/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java
index 8d38b56..f332155 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java
@@ -63,6 +63,7 @@
@Override
public void stop() throws Exception {
if (!stopInitiated) {
+ runtimeContext.setShuttingdown(true);
stopInitiated = true;
if (LOGGER.isLoggable(Level.INFO)) {
LOGGER.info("Stopping Asterix node controller: " + nodeId);
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/context/AsterixAppRuntimeContext.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/context/AsterixAppRuntimeContext.java
index 21aa203..5070938 100644
--- a/asterix-common/src/main/java/edu/uci/ics/asterix/common/context/AsterixAppRuntimeContext.java
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/context/AsterixAppRuntimeContext.java
@@ -14,14 +14,15 @@
import edu.uci.ics.asterix.transaction.management.service.recovery.IAsterixAppRuntimeContextProvider;
import edu.uci.ics.asterix.transaction.management.service.transaction.TransactionSubsystem;
import edu.uci.ics.hyracks.api.application.INCApplicationContext;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.api.io.IIOManager;
+import edu.uci.ics.hyracks.storage.am.common.api.IIndex;
import edu.uci.ics.hyracks.storage.am.common.api.IIndexLifecycleManager;
import edu.uci.ics.hyracks.storage.am.common.dataflow.IndexLifecycleManager;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallbackProvider;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationScheduler;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMMergePolicy;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMOperationTrackerFactory;
-import edu.uci.ics.hyracks.storage.am.lsm.common.impls.ConstantMergePolicy;
import edu.uci.ics.hyracks.storage.am.lsm.common.impls.SynchronousScheduler;
import edu.uci.ics.hyracks.storage.common.buffercache.BufferCache;
import edu.uci.ics.hyracks.storage.common.buffercache.ClockPageReplacementStrategy;
@@ -57,6 +58,7 @@
private PersistentLocalResourceRepository localResourceRepository;
private ResourceIdFactory resourceIdFactory;
private IIOManager ioManager;
+ private boolean isShuttingdown;
public AsterixAppRuntimeContext(INCApplicationContext ncApplicationContext) {
this.ncApplicationContext = ncApplicationContext;
@@ -79,7 +81,7 @@
DEFAULT_MAX_OPEN_FILES);
lsmIOScheduler = SynchronousScheduler.INSTANCE;
- mergePolicy = new ConstantMergePolicy(3);
+ mergePolicy = new ConstantMergePolicy(3, this);
lsmBTreeOpTrackerFactory = new IndexOperationTrackerFactory(LSMBTreeIOOperationCallbackFactory.INSTANCE);
lsmRTreeOpTrackerFactory = new IndexOperationTrackerFactory(LSMRTreeIOOperationCallbackFactory.INSTANCE);
lsmInvertedIndexOpTrackerFactory = new IndexOperationTrackerFactory(
@@ -90,6 +92,15 @@
localResourceRepository = (PersistentLocalResourceRepository) persistentLocalResourceRepositoryFactory
.createRepository();
resourceIdFactory = (new ResourceIdFactoryProvider(localResourceRepository)).createResourceIdFactory();
+ isShuttingdown = false;
+ }
+
+ public boolean isShuttingdown() {
+ return isShuttingdown;
+ }
+
+ public void setShuttingdown(boolean isShuttingdown) {
+ this.isShuttingdown = isShuttingdown;
}
private int getBufferCachePageSize() {
@@ -135,8 +146,11 @@
return numPages;
}
- public void deinitialize() {
+ public void deinitialize() throws HyracksDataException {
bufferCache.close();
+ for (IIndex index : indexLifecycleManager.getOpenIndexes()) {
+ index.deactivate();
+ }
}
public IBufferCache getBufferCache() {
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/context/ConstantMergePolicy.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/context/ConstantMergePolicy.java
new file mode 100644
index 0000000..74b0aa6
--- /dev/null
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/context/ConstantMergePolicy.java
@@ -0,0 +1,44 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.asterix.common.context;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.storage.am.common.api.IndexException;
+import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallback;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndex;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMMergePolicy;
+import edu.uci.ics.hyracks.storage.am.lsm.common.impls.NoOpIOOperationCallback;
+
+public class ConstantMergePolicy implements ILSMMergePolicy {
+
+ private final int threshold;
+ private final AsterixAppRuntimeContext ctx;
+
+ public ConstantMergePolicy(int threshold, AsterixAppRuntimeContext ctx) {
+ this.threshold = threshold;
+ this.ctx = ctx;
+ }
+
+ public void diskComponentAdded(final ILSMIndex index, int totalNumDiskComponents) throws HyracksDataException,
+ IndexException {
+ if (!ctx.isShuttingdown() && totalNumDiskComponents >= threshold) {
+ ILSMIndexAccessor accessor = (ILSMIndexAccessor) index.createAccessor(NoOpOperationCallback.INSTANCE,
+ NoOpOperationCallback.INSTANCE);
+ accessor.scheduleMerge(NoOpIOOperationCallback.INSTANCE);
+ }
+ }
+}