Merge branch 'master' into raman/fullstack_lsm_staging_coredump
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/application/IApplicationContext.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/application/IApplicationContext.java
index 6ee719b..54aac5e 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/application/IApplicationContext.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/application/IApplicationContext.java
@@ -15,6 +15,7 @@
 package edu.uci.ics.hyracks.api.application;
 
 import java.io.Serializable;
+import java.util.concurrent.ThreadFactory;
 
 import edu.uci.ics.hyracks.api.job.IJobSerializerDeserializerContainer;
 import edu.uci.ics.hyracks.api.messages.IMessageBroker;
@@ -39,4 +40,8 @@
 
     public IJobSerializerDeserializerContainer getJobSerializerDeserializerContainer();
 
+    public ThreadFactory getThreadFactory();
+
+    public void setThreadFactory(ThreadFactory threadFactory);
+
 }
\ No newline at end of file
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/io/IIOManager.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/io/IIOManager.java
index 2e7feb5..668950b 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/io/IIOManager.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/io/IIOManager.java
@@ -16,6 +16,7 @@
 
 import java.nio.ByteBuffer;
 import java.util.List;
+import java.util.concurrent.Executor;
 
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 
@@ -47,4 +48,6 @@
     public void close(IFileHandle fHandle) throws HyracksDataException;
 
     public void sync(IFileHandle fileHandle, boolean metadata) throws HyracksDataException;
+
+    public void setExecutor(Executor executor);
 }
\ No newline at end of file
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/lifecycle/ILifeCycleComponent.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/lifecycle/ILifeCycleComponent.java
new file mode 100644
index 0000000..2cb757f
--- /dev/null
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/lifecycle/ILifeCycleComponent.java
@@ -0,0 +1,25 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.lifecycle;
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+public interface ILifeCycleComponent {
+
+    public void start();
+
+    public void stop(boolean dumpState, OutputStream ouputStream) throws IOException;
+}
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/lifecycle/ILifeCycleComponentManager.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/lifecycle/ILifeCycleComponentManager.java
new file mode 100644
index 0000000..6247c17
--- /dev/null
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/lifecycle/ILifeCycleComponentManager.java
@@ -0,0 +1,30 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.lifecycle;
+
+import java.io.IOException;
+import java.lang.Thread.UncaughtExceptionHandler;
+import java.util.Map;
+
+public interface ILifeCycleComponentManager extends UncaughtExceptionHandler {
+
+    public void register(ILifeCycleComponent component);
+
+    public void startAll();
+
+    public void stopAll(boolean dumpState) throws IOException;
+
+    public void configure(Map<String, String> configuration);
+}
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/lifecycle/LifeCycleComponentManager.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/lifecycle/LifeCycleComponentManager.java
new file mode 100644
index 0000000..ec27653
--- /dev/null
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/lifecycle/LifeCycleComponentManager.java
@@ -0,0 +1,145 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.lifecycle;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+public class LifeCycleComponentManager implements ILifeCycleComponentManager {
+
+    public final static LifeCycleComponentManager INSTANCE = new LifeCycleComponentManager();
+
+    public static final class Config {
+        public static final String DUMP_PATH_KEY = "DUMP_PATH";
+    }
+
+    private static final Logger LOGGER = Logger.getLogger(LifeCycleComponentManager.class.getName());
+
+    private final List<ILifeCycleComponent> components;
+    private boolean stopInitiated;
+    private String dumpPath;
+    private boolean configured;
+
+    private LifeCycleComponentManager() {
+        components = new ArrayList<ILifeCycleComponent>();
+        stopInitiated = false;
+        configured = false;
+    }
+
+    @Override
+    public void uncaughtException(Thread t, Throwable e) {
+        if (LOGGER.isLoggable(Level.SEVERE)) {
+            LOGGER.severe("Uncaught Exception from thread " + t.getName() + " message: " + e.getMessage());
+            e.printStackTrace();
+        }
+        try {
+            stopAll(true);
+        } catch (IOException e1) {
+            if (LOGGER.isLoggable(Level.SEVERE)) {
+                LOGGER.severe("Exception in stopping Asterix. " + e1.getMessage());
+            }
+        }
+    }
+
+    @Override
+    public synchronized void register(ILifeCycleComponent component) {
+        components.add(component);
+    }
+
+    @Override
+    public void startAll() {
+        for (ILifeCycleComponent component : components) {
+            component.start();
+        }
+    }
+
+    @Override
+    public synchronized void stopAll(boolean dumpState) throws IOException {
+        if (LOGGER.isLoggable(Level.INFO)) {
+            LOGGER.severe("Attempting to stop " + this);
+        }
+        if (stopInitiated) {
+            if (LOGGER.isLoggable(Level.INFO)) {
+                LOGGER.severe("Stop already in progress");
+            }
+            return;
+        }
+        if (!configured) {
+            if (LOGGER.isLoggable(Level.SEVERE)) {
+                LOGGER.severe("Lifecycle management not configured" + this);
+            }
+            return;
+        }
+
+        stopInitiated = true;
+        if (LOGGER.isLoggable(Level.SEVERE)) {
+            LOGGER.severe("Stopping Asterix instance");
+        }
+
+        FileOutputStream componentDumpStream = null;
+        String componentDumpPath = null;
+        for (int index = components.size() - 1; index >= 0; index--) {
+            ILifeCycleComponent component = components.get(index);
+            try {
+                if (dumpState) {
+                    componentDumpPath = dumpPath + File.separator + component.getClass().getName() + "-coredump";
+                    File f = new File(componentDumpPath);
+                    File parentDir = new File(f.getParent());
+                    if (!parentDir.exists()) {
+                        parentDir.mkdirs();
+                    }
+                    componentDumpStream = new FileOutputStream(f);
+                }
+                if (LOGGER.isLoggable(Level.SEVERE)) {
+                    LOGGER.severe("Stopping component instance" + component.getClass().getName() + " dump state "
+                            + dumpState + " dump path " + componentDumpPath);
+                }
+                component.stop(dumpState, componentDumpStream);
+            } catch (Exception e) {
+                if (LOGGER.isLoggable(Level.SEVERE)) {
+                    LOGGER.severe("Exception in stopping component " + component.getClass().getName() + e.getMessage());
+                }
+            } finally {
+                if (componentDumpStream != null) {
+                    componentDumpStream.close();
+                }
+            }
+        }
+        stopInitiated = false;
+
+    }
+
+    @Override
+    public void configure(Map<String, String> configuration) {
+        dumpPath = configuration.get(Config.DUMP_PATH_KEY);
+        if (dumpPath == null) {
+            dumpPath = System.getProperty("user.dir");
+            if (LOGGER.isLoggable(Level.SEVERE)) {
+                LOGGER.severe("dump path not configured. Using current directory " + dumpPath);
+            }
+        }
+        if (LOGGER.isLoggable(Level.INFO)) {
+            LOGGER.severe("LifecycleComponentManager configured " + this);
+        }
+        configured = true;
+    }
+
+}
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
index 5294428..1e78f09 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
@@ -122,7 +122,7 @@
 
     private final WorkQueue workQueue;
 
-    private final ExecutorService executor;
+    private ExecutorService executor;
 
     private final Timer timer;
 
@@ -143,7 +143,6 @@
         nodeRegistry = new LinkedHashMap<String, NodeControllerState>();
         ipAddressNodeNameMap = new HashMap<String, Set<String>>();
         serverCtx = new ServerContext(ServerContext.ServerType.CLUSTER_CONTROLLER, new File(ccConfig.ccRoot));
-        executor = Executors.newCachedThreadPool();
         IIPCI ccIPCI = new ClusterControllerIPCI();
         clusterIPC = new IPCSystem(new InetSocketAddress(ccConfig.clusterNetPort), ccIPCI,
                 new CCNCFunctions.SerializerDeserializer());
@@ -235,6 +234,7 @@
                     .size()]);
             aep.start(appCtx, args);
         }
+        executor = Executors.newCachedThreadPool(appCtx.getThreadFactory());
     }
 
     @Override
diff --git a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/application/ApplicationContext.java b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/application/ApplicationContext.java
index 58ae79e..04ee3bd 100644
--- a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/application/ApplicationContext.java
+++ b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/application/ApplicationContext.java
@@ -16,6 +16,7 @@
 
 import java.io.IOException;
 import java.io.Serializable;
+import java.util.concurrent.ThreadFactory;
 
 import edu.uci.ics.hyracks.api.application.IApplicationContext;
 import edu.uci.ics.hyracks.api.job.IJobSerializerDeserializerContainer;
@@ -28,6 +29,11 @@
     protected Serializable distributedState;
     protected IMessageBroker messageBroker;
     protected IJobSerializerDeserializerContainer jobSerDeContainer = new JobSerializerDeserializerContainer();
+    protected ThreadFactory threadFactory = new ThreadFactory() {
+        public Thread newThread(Runnable r) {
+            return new Thread(r);
+        }
+    };
 
     public ApplicationContext(ServerContext serverCtx) throws IOException {
         this.serverCtx = serverCtx;
@@ -52,4 +58,14 @@
     public IJobSerializerDeserializerContainer getJobSerializerDeserializerContainer() {
         return this.jobSerDeContainer;
     }
+
+    @Override
+    public ThreadFactory getThreadFactory() {
+        return threadFactory;
+    }
+
+    @Override
+    public void setThreadFactory(ThreadFactory threadFactory) {
+        this.threadFactory = threadFactory;
+    }
 }
\ No newline at end of file
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NCDriver.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NCDriver.java
index d31d09e..489f696 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NCDriver.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NCDriver.java
@@ -14,11 +14,17 @@
  */
 package edu.uci.ics.hyracks.control.nc;
 
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
 import org.kohsuke.args4j.CmdLineParser;
 
+import edu.uci.ics.hyracks.api.lifecycle.LifeCycleComponentManager;
 import edu.uci.ics.hyracks.control.common.controllers.NCConfig;
 
 public class NCDriver {
+    private static final Logger LOGGER = Logger.getLogger(NCDriver.class.getName());
+
     public static void main(String args[]) throws Exception {
         NCConfig ncConfig = new NCConfig();
         CmdLineParser cp = new CmdLineParser(ncConfig);
@@ -31,6 +37,10 @@
         }
 
         final NodeControllerService nService = new NodeControllerService(ncConfig);
+        if (LOGGER.isLoggable(Level.INFO)) {
+            LOGGER.severe("Setting uncaught exception handler " + LifeCycleComponentManager.INSTANCE);
+        }
+        Thread.currentThread().setUncaughtExceptionHandler(LifeCycleComponentManager.INSTANCE);
         nService.start();
         Runtime.getRuntime().addShutdownHook(new Thread() {
             @Override
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
index a1c3b08..f82aa37 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
@@ -99,9 +99,9 @@
 
     private final NetworkManager netManager;
 
-    private final IDatasetPartitionManager datasetPartitionManager;
+    private IDatasetPartitionManager datasetPartitionManager;
 
-    private final DatasetNetworkManager datasetNetworkManager;
+    private DatasetNetworkManager datasetNetworkManager;
 
     private final WorkQueue queue;
 
@@ -115,7 +115,7 @@
 
     private final Map<JobId, Joblet> jobletMap;
 
-    private final ExecutorService executor;
+    private ExecutorService executor;
 
     private NodeParameters nodeParameters;
 
@@ -142,22 +142,17 @@
     public NodeControllerService(NCConfig ncConfig) throws Exception {
         this.ncConfig = ncConfig;
         id = ncConfig.nodeId;
-        executor = Executors.newCachedThreadPool();
         NodeControllerIPCI ipci = new NodeControllerIPCI();
         ipc = new IPCSystem(new InetSocketAddress(ncConfig.clusterNetIPAddress, 0), ipci,
                 new CCNCFunctions.SerializerDeserializer());
-        this.ctx = new RootHyracksContext(this, new IOManager(getDevices(ncConfig.ioDevices), executor));
+
+        this.ctx = new RootHyracksContext(this, new IOManager(getDevices(ncConfig.ioDevices)));
         if (id == null) {
             throw new Exception("id not set");
         }
         partitionManager = new PartitionManager(this);
         netManager = new NetworkManager(getIpAddress(ncConfig.dataIPAddress), partitionManager, ncConfig.nNetThreads);
 
-        datasetPartitionManager = new DatasetPartitionManager(this, executor, ncConfig.resultManagerMemory,
-                ncConfig.resultHistorySize);
-        datasetNetworkManager = new DatasetNetworkManager(getIpAddress(ncConfig.datasetIPAddress),
-                datasetPartitionManager, ncConfig.nNetThreads);
-
         queue = new WorkQueue();
         jobletMap = new Hashtable<JobId, Joblet>();
         timer = new Timer(true);
@@ -219,6 +214,14 @@
         fv.setValue(ncInfos);
     }
 
+    private void init() throws Exception {
+        ctx.getIOManager().setExecutor(executor);
+        datasetPartitionManager = new DatasetPartitionManager(this, executor, ncConfig.resultManagerMemory,
+                ncConfig.resultHistorySize);
+        datasetNetworkManager = new DatasetNetworkManager(getIpAddress(ncConfig.datasetIPAddress),
+                datasetPartitionManager, ncConfig.nNetThreads);
+    }
+
     @Override
     public void start() throws Exception {
         LOGGER.log(Level.INFO, "Starting NodeControllerService");
@@ -226,6 +229,7 @@
         netManager.start();
 
         startApplication();
+        init();
 
         datasetNetworkManager.start();
         IIPCHandle ccIPCHandle = ipc.getHandle(new InetSocketAddress(ncConfig.ccHost, ncConfig.ccPort));
@@ -280,6 +284,7 @@
                     .toArray(new String[ncConfig.appArgs.size()]);
             ncAppEntryPoint.start(appCtx, args);
         }
+        executor = Executors.newCachedThreadPool(appCtx.getThreadFactory());
     }
 
     @Override
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/io/IOManager.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/io/IOManager.java
index 3b13f32..c94953b 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/io/IOManager.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/io/IOManager.java
@@ -33,15 +33,19 @@
 public class IOManager implements IIOManager {
     private final List<IODeviceHandle> ioDevices;
 
-    private final Executor executor;
+    private Executor executor;
 
     private final List<IODeviceHandle> workAreaIODevices;
 
     private int workAreaDeviceIndex;
 
     public IOManager(List<IODeviceHandle> devices, Executor executor) throws HyracksException {
-        this.ioDevices = Collections.unmodifiableList(devices);
+        this(devices);
         this.executor = executor;
+    }
+
+    public IOManager(List<IODeviceHandle> devices) throws HyracksException {
+        this.ioDevices = Collections.unmodifiableList(devices);
         workAreaIODevices = new ArrayList<IODeviceHandle>();
         for (IODeviceHandle d : ioDevices) {
             if (d.getWorkAreaPath() != null) {
@@ -55,6 +59,10 @@
         workAreaDeviceIndex = 0;
     }
 
+    public void setExecutor(Executor executor) {
+        this.executor = executor;
+    }
+
     @Override
     public List<IODeviceHandle> getIODevices() {
         return ioDevices;
diff --git a/hyracks/hyracks-examples/btree-example/btreehelper/src/main/java/edu/uci/ics/hyracks/examples/btree/helper/RuntimeContext.java b/hyracks/hyracks-examples/btree-example/btreehelper/src/main/java/edu/uci/ics/hyracks/examples/btree/helper/RuntimeContext.java
index aa5ce1d..336c3fc 100644
--- a/hyracks/hyracks-examples/btree-example/btreehelper/src/main/java/edu/uci/ics/hyracks/examples/btree/helper/RuntimeContext.java
+++ b/hyracks/hyracks-examples/btree-example/btreehelper/src/main/java/edu/uci/ics/hyracks/examples/btree/helper/RuntimeContext.java
@@ -15,6 +15,8 @@
 
 package edu.uci.ics.hyracks.examples.btree.helper;
 
+import java.util.concurrent.ThreadFactory;
+
 import edu.uci.ics.hyracks.api.application.INCApplicationContext;
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
@@ -42,13 +44,18 @@
     private ILocalResourceRepository localResourceRepository;
     private IIndexLifecycleManager lcManager;
     private ResourceIdFactory resourceIdFactory;
+    private ThreadFactory threadFactory = new ThreadFactory() {
+        public Thread newThread(Runnable r) {
+            return new Thread(r);
+        }
+    };
 
     public RuntimeContext(INCApplicationContext appCtx) throws HyracksDataException {
         fileMapManager = new TransientFileMapManager();
         ICacheMemoryAllocator allocator = new HeapBufferAllocator();
         IPageReplacementStrategy prs = new ClockPageReplacementStrategy();
         bufferCache = new BufferCache(appCtx.getRootContext().getIOManager(), allocator, prs,
-                new DelayPageCleanerPolicy(1000), fileMapManager, 32768, 50, 100);
+                new DelayPageCleanerPolicy(1000), fileMapManager, 32768, 50, 100, threadFactory);
         lcManager = new IndexLifecycleManager();
         ILocalResourceRepositoryFactory localResourceRepositoryFactory = new TransientLocalResourceRepositoryFactory();
         localResourceRepository = localResourceRepositoryFactory.createRepository();
diff --git a/hyracks/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/IndexLifecycleManager.java b/hyracks/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/IndexLifecycleManager.java
index 197aecc..1c8bd28 100644
--- a/hyracks/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/IndexLifecycleManager.java
+++ b/hyracks/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/IndexLifecycleManager.java
@@ -1,5 +1,7 @@
 package edu.uci.ics.hyracks.storage.am.common.dataflow;
 
+import java.io.IOException;
+import java.io.OutputStream;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
@@ -7,10 +9,11 @@
 import java.util.Map;
 
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.lifecycle.ILifeCycleComponent;
 import edu.uci.ics.hyracks.storage.am.common.api.IIndex;
 import edu.uci.ics.hyracks.storage.am.common.api.IIndexLifecycleManager;
 
-public class IndexLifecycleManager implements IIndexLifecycleManager {
+public class IndexLifecycleManager implements IIndexLifecycleManager, ILifeCycleComponent {
     private static final long DEFAULT_MEMORY_BUDGET = 1024 * 1024 * 100; // 100 megabytes
 
     private final Map<Long, IndexInfo> indexInfos;
@@ -29,8 +32,10 @@
     }
 
     private boolean evictCandidateIndex() throws HyracksDataException {
-        // Why min()? As a heuristic for eviction, we will take an open index (an index consuming memory) 
-        // that is not being used (refcount == 0) and has been least recently used. The sort order defined 
+        // Why min()? As a heuristic for eviction, we will take an open index
+        // (an index consuming memory)
+        // that is not being used (refcount == 0) and has been least recently
+        // used. The sort order defined
         // for IndexInfo maintains this. See IndexInfo.compareTo().
         IndexInfo info = Collections.min(indexInfos.values());
         if (info.referenceCount != 0 || !info.isOpen) {
@@ -130,16 +135,17 @@
 
         @Override
         public int compareTo(IndexInfo i) {
-            // sort by (isOpen, referenceCount, lastAccess) ascending, where true < false
+            // sort by (isOpen, referenceCount, lastAccess) ascending, where
+            // true < false
             //
             // Example sort order:
             // -------------------
-            // (F, 0, 70)       <-- largest
+            // (F, 0, 70) <-- largest
             // (F, 0, 60)
             // (T, 10, 80)
             // (T, 10, 70)
             // (T, 9, 90)
-            // (T, 0, 100)      <-- smallest
+            // (T, 0, 100) <-- smallest
             if (isOpen && !i.isOpen) {
                 return -1;
             } else if (!isOpen && i.isOpen) {
@@ -178,4 +184,38 @@
         }
         return openIndexes;
     }
+
+    @Override
+    public void start() {
+    }
+
+    @Override
+    public void stop(boolean dumpState, OutputStream outputStream) throws IOException {
+        if (dumpState) {
+            dumpState(outputStream);
+        }
+
+        for (IndexInfo i : indexInfos.values()) {
+            if (i.isOpen) {
+                i.index.deactivate();
+            }
+        }
+    }
+
+    private void dumpState(OutputStream os) throws IOException {
+        StringBuilder sb = new StringBuilder();
+
+        sb.append(String.format("Memory budget = %d\n", memoryBudget));
+        sb.append(String.format("Memory used = %d\n", memoryUsed));
+
+        String headerFormat = "%-20s %-10s %-20s %-20s %-20s\n";
+        String rowFormat = "%-20d %-10b %-20d %-20s %-20s\n";
+        sb.append(String.format(headerFormat, "ResourceID", "Open", "Reference Count", "Last Access", "Index Name"));
+        IndexInfo ii;
+        for (Map.Entry<Long, IndexInfo> entry : indexInfos.entrySet()) {
+            ii = entry.getValue();
+            sb.append(String.format(rowFormat, entry.getKey(), ii.isOpen, ii.referenceCount, ii.lastAccess, ii.index));
+        }
+        os.write(sb.toString().getBytes());
+    }
 }
\ No newline at end of file
diff --git a/hyracks/hyracks-storage-common/src/main/java/edu/uci/ics/hyracks/storage/common/buffercache/BufferCache.java b/hyracks/hyracks-storage-common/src/main/java/edu/uci/ics/hyracks/storage/common/buffercache/BufferCache.java
index 03e46f5..6b6c6a1 100644
--- a/hyracks/hyracks-storage-common/src/main/java/edu/uci/ics/hyracks/storage/common/buffercache/BufferCache.java
+++ b/hyracks/hyracks-storage-common/src/main/java/edu/uci/ics/hyracks/storage/common/buffercache/BufferCache.java
@@ -14,9 +14,14 @@
  */
 package edu.uci.ics.hyracks.storage.common.buffercache;
 
+import java.io.IOException;
+import java.io.OutputStream;
 import java.nio.ByteBuffer;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.locks.Lock;
@@ -30,10 +35,11 @@
 import edu.uci.ics.hyracks.api.io.FileReference;
 import edu.uci.ics.hyracks.api.io.IFileHandle;
 import edu.uci.ics.hyracks.api.io.IIOManager;
+import edu.uci.ics.hyracks.api.lifecycle.ILifeCycleComponent;
 import edu.uci.ics.hyracks.storage.common.file.BufferedFileHandle;
 import edu.uci.ics.hyracks.storage.common.file.IFileMapManager;
 
-public class BufferCache implements IBufferCacheInternal {
+public class BufferCache implements IBufferCacheInternal, ILifeCycleComponent {
     private static final Logger LOGGER = Logger.getLogger(BufferCache.class.getName());
     private static final int MAP_FACTOR = 2;
 
@@ -45,7 +51,6 @@
     private final IIOManager ioManager;
     private final int pageSize;
     private final int numPages;
-    private final CachedPage[] cachedPages;
     private final CacheBucket[] pageMap;
     private final IPageReplacementStrategy pageReplacementStrategy;
     private final IPageCleanerPolicy pageCleanerPolicy;
@@ -53,31 +58,34 @@
     private final CleanerThread cleanerThread;
     private final Map<Integer, BufferedFileHandle> fileInfoMap;
 
+    private CachedPage[] cachedPages;
+
     private boolean closed;
 
     public BufferCache(IIOManager ioManager, ICacheMemoryAllocator allocator,
             IPageReplacementStrategy pageReplacementStrategy, IPageCleanerPolicy pageCleanerPolicy,
-            IFileMapManager fileMapManager, int pageSize, int numPages, int maxOpenFiles) {
+            IFileMapManager fileMapManager, int pageSize, int numPages, int maxOpenFiles, ThreadFactory threadFactory) {
         this.ioManager = ioManager;
         this.pageSize = pageSize;
         this.numPages = numPages;
         this.maxOpenFiles = maxOpenFiles;
         pageReplacementStrategy.setBufferCache(this);
+        pageMap = new CacheBucket[numPages * MAP_FACTOR];
+        for (int i = 0; i < pageMap.length; ++i) {
+            pageMap[i] = new CacheBucket();
+        }
         ByteBuffer[] buffers = allocator.allocate(pageSize, numPages);
         cachedPages = new CachedPage[buffers.length];
         for (int i = 0; i < buffers.length; ++i) {
             cachedPages[i] = new CachedPage(i, buffers[i], pageReplacementStrategy);
         }
-        pageMap = new CacheBucket[numPages * MAP_FACTOR];
-        for (int i = 0; i < pageMap.length; ++i) {
-            pageMap[i] = new CacheBucket();
-        }
         this.pageReplacementStrategy = pageReplacementStrategy;
         this.pageCleanerPolicy = pageCleanerPolicy;
         this.fileMapManager = fileMapManager;
+        Executor executor = Executors.newCachedThreadPool(threadFactory);
         fileInfoMap = new HashMap<Integer, BufferedFileHandle>();
         cleanerThread = new CleanerThread();
-        cleanerThread.start();
+        executor.execute(cleanerThread);
         closed = false;
     }
 
@@ -137,7 +145,8 @@
         pinSanityCheck(dpid);
         CachedPage cPage = findPage(dpid, newPage);
         if (!newPage) {
-            // Resolve race of multiple threads trying to read the page from disk.
+            // Resolve race of multiple threads trying to read the page from
+            // disk.
             synchronized (cPage) {
                 if (!cPage.valid) {
                     read(cPage);
@@ -157,7 +166,8 @@
 
             CachedPage cPage = null;
             /*
-             * Hash dpid to get a bucket and then check if the page exists in the bucket.
+             * Hash dpid to get a bucket and then check if the page exists in
+             * the bucket.
              */
             int hash = hash(dpid);
             CacheBucket bucket = pageMap[hash];
@@ -175,29 +185,38 @@
                 bucket.bucketLock.unlock();
             }
             /*
-             * If we got here, the page was not in the hash table. Now we ask the page replacement strategy to find us a victim.
+             * If we got here, the page was not in the hash table. Now we ask
+             * the page replacement strategy to find us a victim.
              */
             CachedPage victim = (CachedPage) pageReplacementStrategy.findVictim();
             if (victim != null) {
                 /*
-                 * We have a victim with the following invariants.
-                 * 1. The dpid on the CachedPage may or may not be valid.
-                 * 2. We have a pin on the CachedPage. We have to deal with three cases here.
-                 *  Case 1: The dpid on the CachedPage is invalid (-1). This indicates that this buffer has never been used.
-                 *  So we are the only ones holding it. Get a lock on the required dpid's hash bucket, check if someone inserted
-                 *  the page we want into the table. If so, decrement the pincount on the victim and return the winner page in the
-                 *  table. If such a winner does not exist, insert the victim and return it.
-                 *  Case 2: The dpid on the CachedPage is valid.
-                 *      Case 2a: The current dpid and required dpid hash to the same bucket.
-                 *      Get the bucket lock, check that the victim is still at pinCount == 1 If so check if there is a winning
-                 *      CachedPage with the required dpid. If so, decrement the pinCount on the victim and return the winner.
-                 *      If not, update the contents of the CachedPage to hold the required dpid and return it. If the picCount
-                 *      on the victim was != 1 or CachedPage was dirty someone used the victim for its old contents -- Decrement
-                 *      the pinCount and retry.
-                 *  Case 2b: The current dpid and required dpid hash to different buckets. Get the two bucket locks in the order
-                 *  of the bucket indexes (Ordering prevents deadlocks). Check for the existence of a winner in the new bucket
-                 *  and for potential use of the victim (pinCount != 1). If everything looks good, remove the CachedPage from
-                 *  the old bucket, and add it to the new bucket and update its header with the new dpid.
+                 * We have a victim with the following invariants. 1. The dpid
+                 * on the CachedPage may or may not be valid. 2. We have a pin
+                 * on the CachedPage. We have to deal with three cases here.
+                 * Case 1: The dpid on the CachedPage is invalid (-1). This
+                 * indicates that this buffer has never been used. So we are the
+                 * only ones holding it. Get a lock on the required dpid's hash
+                 * bucket, check if someone inserted the page we want into the
+                 * table. If so, decrement the pincount on the victim and return
+                 * the winner page in the table. If such a winner does not
+                 * exist, insert the victim and return it. Case 2: The dpid on
+                 * the CachedPage is valid. Case 2a: The current dpid and
+                 * required dpid hash to the same bucket. Get the bucket lock,
+                 * check that the victim is still at pinCount == 1 If so check
+                 * if there is a winning CachedPage with the required dpid. If
+                 * so, decrement the pinCount on the victim and return the
+                 * winner. If not, update the contents of the CachedPage to hold
+                 * the required dpid and return it. If the picCount on the
+                 * victim was != 1 or CachedPage was dirty someone used the
+                 * victim for its old contents -- Decrement the pinCount and
+                 * retry. Case 2b: The current dpid and required dpid hash to
+                 * different buckets. Get the two bucket locks in the order of
+                 * the bucket indexes (Ordering prevents deadlocks). Check for
+                 * the existence of a winner in the new bucket and for potential
+                 * use of the victim (pinCount != 1). If everything looks good,
+                 * remove the CachedPage from the old bucket, and add it to the
+                 * new bucket and update its header with the new dpid.
                  */
                 if (victim.dpid < 0) {
                     /*
@@ -630,7 +649,8 @@
                             }
                             fileInfoMap.remove(entryFileId);
                             unreferencedFileFound = true;
-                            // for-each iterator is invalid because we changed fileInfoMap
+                            // for-each iterator is invalid because we changed
+                            // fileInfoMap
                             break;
                         }
                     }
@@ -764,7 +784,7 @@
             } finally {
                 fileMapManager.unregisterFile(fileId);
                 if (fInfo != null) {
-                    // Mark the fInfo as deleted, 
+                    // Mark the fInfo as deleted,
                     // such that when its pages are reclaimed in openFile(),
                     // the pages are not flushed to disk but only invalidated.
                     if (!fInfo.fileHasBeenDeleted()) {
@@ -775,4 +795,17 @@
             }
         }
     }
+
+    @Override
+    public void start() {
+        // no op
+    }
+
+    @Override
+    public void stop(boolean dumpState, OutputStream os) throws IOException {
+        if (dumpState) {
+            os.write(dumpState().getBytes());
+        }
+        close();
+    }
 }
\ No newline at end of file
diff --git a/hyracks/hyracks-storage-common/src/main/java/edu/uci/ics/hyracks/storage/common/buffercache/DebugBufferCache.java b/hyracks/hyracks-storage-common/src/main/java/edu/uci/ics/hyracks/storage/common/buffercache/DebugBufferCache.java
index 13f7d52..df8cc26 100644
--- a/hyracks/hyracks-storage-common/src/main/java/edu/uci/ics/hyracks/storage/common/buffercache/DebugBufferCache.java
+++ b/hyracks/hyracks-storage-common/src/main/java/edu/uci/ics/hyracks/storage/common/buffercache/DebugBufferCache.java
@@ -156,13 +156,13 @@
         return closeFileCount.get();
     }
 
-	@Override
-	public void flushDirtyPage(ICachedPage page) throws HyracksDataException {
-		bufferCache.flushDirtyPage(page);
-	}
+    @Override
+    public void flushDirtyPage(ICachedPage page) throws HyracksDataException {
+        bufferCache.flushDirtyPage(page);
+    }
 
-	@Override
-	public void force(int fileId, boolean metadata) throws HyracksDataException {
-		bufferCache.force(fileId, metadata);
-	}
+    @Override
+    public void force(int fileId, boolean metadata) throws HyracksDataException {
+        bufferCache.force(fileId, metadata);
+    }
 }
diff --git a/hyracks/hyracks-storage-common/src/main/java/edu/uci/ics/hyracks/storage/common/buffercache/IBufferCache.java b/hyracks/hyracks-storage-common/src/main/java/edu/uci/ics/hyracks/storage/common/buffercache/IBufferCache.java
index e8b407e..8b86a91 100644
--- a/hyracks/hyracks-storage-common/src/main/java/edu/uci/ics/hyracks/storage/common/buffercache/IBufferCache.java
+++ b/hyracks/hyracks-storage-common/src/main/java/edu/uci/ics/hyracks/storage/common/buffercache/IBufferCache.java
@@ -25,7 +25,7 @@
     public void closeFile(int fileId) throws HyracksDataException;
 
     public void deleteFile(int fileId, boolean flushDirtyPages) throws HyracksDataException;
-    
+
     public ICachedPage tryPin(long dpid) throws HyracksDataException;
 
     public ICachedPage pin(long dpid, boolean newPage) throws HyracksDataException;
@@ -33,9 +33,9 @@
     public void unpin(ICachedPage page) throws HyracksDataException;
 
     public void flushDirtyPage(ICachedPage page) throws HyracksDataException;
-    
+
     public void force(int fileId, boolean metadata) throws HyracksDataException;
-    
+
     public int getPageSize();
 
     public int getNumPages();
diff --git a/hyracks/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/test/support/TestNCApplicationContext.java b/hyracks/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/test/support/TestNCApplicationContext.java
index 285ab1f..842c0d1 100644
--- a/hyracks/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/test/support/TestNCApplicationContext.java
+++ b/hyracks/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/test/support/TestNCApplicationContext.java
@@ -15,6 +15,7 @@
 package edu.uci.ics.hyracks.test.support;
 
 import java.io.Serializable;
+import java.util.concurrent.ThreadFactory;
 
 import edu.uci.ics.hyracks.api.application.INCApplicationContext;
 import edu.uci.ics.hyracks.api.context.IHyracksRootContext;
@@ -70,9 +71,21 @@
         return null;
     }
 
-	@Override
-	public IJobSerializerDeserializerContainer getJobSerializerDeserializerContainer() {
-		// TODO Auto-generated method stub
-		return null;
-	}
+    @Override
+    public IJobSerializerDeserializerContainer getJobSerializerDeserializerContainer() {
+        // TODO Auto-generated method stub
+        return null;
+    }
+
+    @Override
+    public ThreadFactory getThreadFactory() {
+        // TODO Auto-generated method stub
+        return null;
+    }
+
+    @Override
+    public void setThreadFactory(ThreadFactory threadFactory) {
+        // TODO Auto-generated method stub
+
+    }
 }
\ No newline at end of file
diff --git a/hyracks/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/test/support/TestStorageManagerComponentHolder.java b/hyracks/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/test/support/TestStorageManagerComponentHolder.java
index fdb2100..2c6aee5 100644
--- a/hyracks/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/test/support/TestStorageManagerComponentHolder.java
+++ b/hyracks/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/test/support/TestStorageManagerComponentHolder.java
@@ -18,6 +18,7 @@
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
 
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
 import edu.uci.ics.hyracks.api.exceptions.HyracksException;
@@ -52,6 +53,11 @@
     private static int pageSize;
     private static int numPages;
     private static int maxOpenFiles;
+    private final static ThreadFactory threadFactory = new ThreadFactory() {
+        public Thread newThread(Runnable r) {
+            return new Thread(r);
+        }
+    };
 
     public static void init(int pageSize, int numPages, int maxOpenFiles) {
         TestStorageManagerComponentHolder.pageSize = pageSize;
@@ -76,7 +82,7 @@
             IPageReplacementStrategy prs = new ClockPageReplacementStrategy();
             IFileMapProvider fileMapProvider = getFileMapProvider(ctx);
             bufferCache = new BufferCache(ctx.getIOManager(), allocator, prs, new DelayPageCleanerPolicy(1000),
-                    (IFileMapManager) fileMapProvider, pageSize, numPages, maxOpenFiles);
+                    (IFileMapManager) fileMapProvider, pageSize, numPages, maxOpenFiles, threadFactory);
         }
         return bufferCache;
     }
diff --git a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/context/RuntimeContext.java b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/context/RuntimeContext.java
index 8d6ab38..92f2c11 100644
--- a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/context/RuntimeContext.java
+++ b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/context/RuntimeContext.java
@@ -19,6 +19,7 @@
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ThreadFactory;
 import java.util.logging.Logger;
 
 import edu.uci.ics.hyracks.api.application.INCApplicationContext;
@@ -57,6 +58,11 @@
     private final Map<String, Boolean> giraphJobIdToMove = new ConcurrentHashMap<String, Boolean>();
     private final IOManager ioManager;
     private final Map<Long, List<FileReference>> iterationToFiles = new ConcurrentHashMap<Long, List<FileReference>>();
+    private final ThreadFactory threadFactory = new ThreadFactory() {
+        public Thread newThread(Runnable r) {
+            return new Thread(r);
+        }
+    };
 
     public RuntimeContext(INCApplicationContext appCtx) {
         fileMapManager = new TransientFileMapManager();
@@ -68,7 +74,8 @@
         int numPages = (int) (bufferSize / pageSize);
         /** let the buffer cache never flush dirty pages */
         bufferCache = new BufferCache(appCtx.getRootContext().getIOManager(), allocator, prs,
-                new PreDelayPageCleanerPolicy(Long.MAX_VALUE), fileMapManager, pageSize, numPages, 1000000);
+                new PreDelayPageCleanerPolicy(Long.MAX_VALUE), fileMapManager, pageSize, numPages, 1000000,
+                threadFactory);
         ioManager = (IOManager) appCtx.getRootContext().getIOManager();
         lcManager = new IndexLifecycleManager();
         localResourceRepository = new TransientLocalResourceRepository();