Orchestrated the use of executors and thread factory and added buffer cache core dump.
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/application/IApplicationContext.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/application/IApplicationContext.java
index 6ee719b..54aac5e 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/application/IApplicationContext.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/application/IApplicationContext.java
@@ -15,6 +15,7 @@
 package edu.uci.ics.hyracks.api.application;
 
 import java.io.Serializable;
+import java.util.concurrent.ThreadFactory;
 
 import edu.uci.ics.hyracks.api.job.IJobSerializerDeserializerContainer;
 import edu.uci.ics.hyracks.api.messages.IMessageBroker;
@@ -39,4 +40,8 @@
 
     public IJobSerializerDeserializerContainer getJobSerializerDeserializerContainer();
 
+    public ThreadFactory getThreadFactory();
+
+    public void setThreadFactory(ThreadFactory threadFactory);
+
 }
\ No newline at end of file
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/io/IIOManager.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/io/IIOManager.java
index 2e7feb5..668950b 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/io/IIOManager.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/io/IIOManager.java
@@ -16,6 +16,7 @@
 
 import java.nio.ByteBuffer;
 import java.util.List;
+import java.util.concurrent.Executor;
 
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 
@@ -47,4 +48,6 @@
     public void close(IFileHandle fHandle) throws HyracksDataException;
 
     public void sync(IFileHandle fileHandle, boolean metadata) throws HyracksDataException;
+
+    public void setExecutor(Executor executor);
 }
\ No newline at end of file
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/lifecycle/ApplicationThreadExecutor.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/lifecycle/ApplicationThreadExecutor.java
deleted file mode 100644
index 948584e..0000000
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/lifecycle/ApplicationThreadExecutor.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Copyright 2009-2013 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.hyracks.api.lifecycle;
-
-import java.util.concurrent.Executor;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ThreadFactory;
-
-public class ApplicationThreadExecutor implements Executor {
-
-    private final ThreadFactory threadFactory;
-    private final Executor executor = Executors.newCachedThreadPool();
-
-    public ApplicationThreadExecutor(ThreadFactory threadFactory) {
-        this.threadFactory = threadFactory;
-    }
-
-    @Override
-    public void execute(Runnable command) {
-        Thread t = threadFactory.newThread(command);
-        t.setUncaughtExceptionHandler(LifeCycleComponentManager.INSTANCE);
-        executor.execute(t);
-    }
-
-}
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/lifecycle/ILifeCycleComponent.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/lifecycle/ILifeCycleComponent.java
index fb96456..e273863 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/lifecycle/ILifeCycleComponent.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/lifecycle/ILifeCycleComponent.java
@@ -14,11 +14,12 @@
  */
 package edu.uci.ics.hyracks.api.lifecycle;
 
+import java.io.IOException;
 import java.io.OutputStream;
 
 public interface ILifeCycleComponent {
 
    public void start();
 
-	public void stop(boolean dumpState, OutputStream ouputStream);
+	public void stop(boolean dumpState, OutputStream ouputStream) throws IOException;
 }
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/lifecycle/LifeCycleComponentManager.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/lifecycle/LifeCycleComponentManager.java
index 314ac67..8f0580d 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/lifecycle/LifeCycleComponentManager.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/lifecycle/LifeCycleComponentManager.java
@@ -28,17 +28,20 @@
     public static LifeCycleComponentManager INSTANCE = new LifeCycleComponentManager();
 
     public static final class Config {
-        public static final String KEY_DUMP_PATH = "DUMP_PATH";
+        public static final String DUMP_PATH_KEY = "DUMP_PATH";
     }
 
     private static final Logger LOGGER = Logger.getLogger(LifeCycleComponentManager.class.getName());
 
-    private final List<ILifeCycleComponent> components = new ArrayList<ILifeCycleComponent>();
-    private boolean stopInitiated = false;
+    private final List<ILifeCycleComponent> components;
+    private boolean stopInitiated;
     private String dumpPath;
-    private boolean configured = false;
+    private boolean configured;
 
     private LifeCycleComponentManager() {
+        components = new ArrayList<ILifeCycleComponent>();
+        stopInitiated = false;
+        configured = false;
     }
 
     @Override
@@ -122,7 +125,7 @@
 
     @Override
     public void configure(Map<String, String> configuration) {
-        dumpPath = configuration.get(Config.KEY_DUMP_PATH);
+        dumpPath = configuration.get(Config.DUMP_PATH_KEY);
         if (dumpPath == null) {
             dumpPath = System.getProperty("user.dir");
             if (LOGGER.isLoggable(Level.SEVERE)) {
@@ -130,7 +133,7 @@
             }
         }
         if (LOGGER.isLoggable(Level.INFO)) {
-            LOGGER.severe("LifecycleComponentManager configurd " + this);
+            LOGGER.severe("LifecycleComponentManager configured " + this);
         }
         configured = true;
     }
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
index 9b8a996..6548ec7 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
@@ -119,7 +119,7 @@
 
     private final WorkQueue workQueue;
 
-    private final ExecutorService executor;
+    private ExecutorService executor;
 
     private final Timer timer;
 
@@ -140,7 +140,6 @@
         nodeRegistry = new LinkedHashMap<String, NodeControllerState>();
         ipAddressNodeNameMap = new HashMap<String, Set<String>>();
         serverCtx = new ServerContext(ServerContext.ServerType.CLUSTER_CONTROLLER, new File(ccConfig.ccRoot));
-        executor = Executors.newCachedThreadPool();
         IIPCI ccIPCI = new ClusterControllerIPCI();
         clusterIPC = new IPCSystem(new InetSocketAddress(ccConfig.clusterNetPort), ccIPCI,
                 new CCNCFunctions.SerializerDeserializer());
@@ -223,6 +222,7 @@
                     .size()]);
             aep.start(appCtx, args);
         }
+        executor = Executors.newCachedThreadPool(appCtx.getThreadFactory());
     }
 
     @Override
diff --git a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/application/ApplicationContext.java b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/application/ApplicationContext.java
index 58ae79e..04ee3bd 100644
--- a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/application/ApplicationContext.java
+++ b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/application/ApplicationContext.java
@@ -16,6 +16,7 @@
 
 import java.io.IOException;
 import java.io.Serializable;
+import java.util.concurrent.ThreadFactory;
 
 import edu.uci.ics.hyracks.api.application.IApplicationContext;
 import edu.uci.ics.hyracks.api.job.IJobSerializerDeserializerContainer;
@@ -28,6 +29,11 @@
     protected Serializable distributedState;
     protected IMessageBroker messageBroker;
     protected IJobSerializerDeserializerContainer jobSerDeContainer = new JobSerializerDeserializerContainer();
+    protected ThreadFactory threadFactory = new ThreadFactory() {
+        public Thread newThread(Runnable r) {
+            return new Thread(r);
+        }
+    };
 
     public ApplicationContext(ServerContext serverCtx) throws IOException {
         this.serverCtx = serverCtx;
@@ -52,4 +58,14 @@
     public IJobSerializerDeserializerContainer getJobSerializerDeserializerContainer() {
         return this.jobSerDeContainer;
     }
+
+    @Override
+    public ThreadFactory getThreadFactory() {
+        return threadFactory;
+    }
+
+    @Override
+    public void setThreadFactory(ThreadFactory threadFactory) {
+        this.threadFactory = threadFactory;
+    }
 }
\ No newline at end of file
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
index a1c3b08..f82aa37 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
@@ -99,9 +99,9 @@
 
     private final NetworkManager netManager;
 
-    private final IDatasetPartitionManager datasetPartitionManager;
+    private IDatasetPartitionManager datasetPartitionManager;
 
-    private final DatasetNetworkManager datasetNetworkManager;
+    private DatasetNetworkManager datasetNetworkManager;
 
     private final WorkQueue queue;
 
@@ -115,7 +115,7 @@
 
     private final Map<JobId, Joblet> jobletMap;
 
-    private final ExecutorService executor;
+    private ExecutorService executor;
 
     private NodeParameters nodeParameters;
 
@@ -142,22 +142,17 @@
     public NodeControllerService(NCConfig ncConfig) throws Exception {
         this.ncConfig = ncConfig;
         id = ncConfig.nodeId;
-        executor = Executors.newCachedThreadPool();
         NodeControllerIPCI ipci = new NodeControllerIPCI();
         ipc = new IPCSystem(new InetSocketAddress(ncConfig.clusterNetIPAddress, 0), ipci,
                 new CCNCFunctions.SerializerDeserializer());
-        this.ctx = new RootHyracksContext(this, new IOManager(getDevices(ncConfig.ioDevices), executor));
+
+        this.ctx = new RootHyracksContext(this, new IOManager(getDevices(ncConfig.ioDevices)));
         if (id == null) {
             throw new Exception("id not set");
         }
         partitionManager = new PartitionManager(this);
         netManager = new NetworkManager(getIpAddress(ncConfig.dataIPAddress), partitionManager, ncConfig.nNetThreads);
 
-        datasetPartitionManager = new DatasetPartitionManager(this, executor, ncConfig.resultManagerMemory,
-                ncConfig.resultHistorySize);
-        datasetNetworkManager = new DatasetNetworkManager(getIpAddress(ncConfig.datasetIPAddress),
-                datasetPartitionManager, ncConfig.nNetThreads);
-
         queue = new WorkQueue();
         jobletMap = new Hashtable<JobId, Joblet>();
         timer = new Timer(true);
@@ -219,6 +214,14 @@
         fv.setValue(ncInfos);
     }
 
+    private void init() throws Exception {
+        ctx.getIOManager().setExecutor(executor);
+        datasetPartitionManager = new DatasetPartitionManager(this, executor, ncConfig.resultManagerMemory,
+                ncConfig.resultHistorySize);
+        datasetNetworkManager = new DatasetNetworkManager(getIpAddress(ncConfig.datasetIPAddress),
+                datasetPartitionManager, ncConfig.nNetThreads);
+    }
+
     @Override
     public void start() throws Exception {
         LOGGER.log(Level.INFO, "Starting NodeControllerService");
@@ -226,6 +229,7 @@
         netManager.start();
 
         startApplication();
+        init();
 
         datasetNetworkManager.start();
         IIPCHandle ccIPCHandle = ipc.getHandle(new InetSocketAddress(ncConfig.ccHost, ncConfig.ccPort));
@@ -280,6 +284,7 @@
                     .toArray(new String[ncConfig.appArgs.size()]);
             ncAppEntryPoint.start(appCtx, args);
         }
+        executor = Executors.newCachedThreadPool(appCtx.getThreadFactory());
     }
 
     @Override
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/io/IOManager.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/io/IOManager.java
index 3b13f32..c94953b 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/io/IOManager.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/io/IOManager.java
@@ -33,15 +33,19 @@
 public class IOManager implements IIOManager {
     private final List<IODeviceHandle> ioDevices;
 
-    private final Executor executor;
+    private Executor executor;
 
     private final List<IODeviceHandle> workAreaIODevices;
 
     private int workAreaDeviceIndex;
 
     public IOManager(List<IODeviceHandle> devices, Executor executor) throws HyracksException {
-        this.ioDevices = Collections.unmodifiableList(devices);
+        this(devices);
         this.executor = executor;
+    }
+
+    public IOManager(List<IODeviceHandle> devices) throws HyracksException {
+        this.ioDevices = Collections.unmodifiableList(devices);
         workAreaIODevices = new ArrayList<IODeviceHandle>();
         for (IODeviceHandle d : ioDevices) {
             if (d.getWorkAreaPath() != null) {
@@ -55,6 +59,10 @@
         workAreaDeviceIndex = 0;
     }
 
+    public void setExecutor(Executor executor) {
+        this.executor = executor;
+    }
+
     @Override
     public List<IODeviceHandle> getIODevices() {
         return ioDevices;
diff --git a/hyracks/hyracks-examples/btree-example/btreehelper/src/main/java/edu/uci/ics/hyracks/examples/btree/helper/RuntimeContext.java b/hyracks/hyracks-examples/btree-example/btreehelper/src/main/java/edu/uci/ics/hyracks/examples/btree/helper/RuntimeContext.java
index aa5ce1d..336c3fc 100644
--- a/hyracks/hyracks-examples/btree-example/btreehelper/src/main/java/edu/uci/ics/hyracks/examples/btree/helper/RuntimeContext.java
+++ b/hyracks/hyracks-examples/btree-example/btreehelper/src/main/java/edu/uci/ics/hyracks/examples/btree/helper/RuntimeContext.java
@@ -15,6 +15,8 @@
 
 package edu.uci.ics.hyracks.examples.btree.helper;
 
+import java.util.concurrent.ThreadFactory;
+
 import edu.uci.ics.hyracks.api.application.INCApplicationContext;
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
@@ -42,13 +44,18 @@
     private ILocalResourceRepository localResourceRepository;
     private IIndexLifecycleManager lcManager;
     private ResourceIdFactory resourceIdFactory;
+    private ThreadFactory threadFactory = new ThreadFactory() {
+        public Thread newThread(Runnable r) {
+            return new Thread(r);
+        }
+    };
 
     public RuntimeContext(INCApplicationContext appCtx) throws HyracksDataException {
         fileMapManager = new TransientFileMapManager();
         ICacheMemoryAllocator allocator = new HeapBufferAllocator();
         IPageReplacementStrategy prs = new ClockPageReplacementStrategy();
         bufferCache = new BufferCache(appCtx.getRootContext().getIOManager(), allocator, prs,
-                new DelayPageCleanerPolicy(1000), fileMapManager, 32768, 50, 100);
+                new DelayPageCleanerPolicy(1000), fileMapManager, 32768, 50, 100, threadFactory);
         lcManager = new IndexLifecycleManager();
         ILocalResourceRepositoryFactory localResourceRepositoryFactory = new TransientLocalResourceRepositoryFactory();
         localResourceRepository = localResourceRepositoryFactory.createRepository();
diff --git a/hyracks/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/api/IIndexLifecycleManager.java b/hyracks/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/api/IIndexLifecycleManager.java
index 9badc58..c4f43b9 100644
--- a/hyracks/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/api/IIndexLifecycleManager.java
+++ b/hyracks/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/api/IIndexLifecycleManager.java
@@ -3,9 +3,8 @@
 import java.util.List;
 
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.api.lifecycle.ILifeCycleComponent;
 
-public interface IIndexLifecycleManager extends ILifeCycleComponent {
+public interface IIndexLifecycleManager {
     public IIndex getIndex(long resourceID);
 
     public void register(long resourceID, IIndex index) throws HyracksDataException;
diff --git a/hyracks/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/IndexLifecycleManager.java b/hyracks/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/IndexLifecycleManager.java
index 63557f7..13ae7b1 100644
--- a/hyracks/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/IndexLifecycleManager.java
+++ b/hyracks/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/IndexLifecycleManager.java
@@ -1,5 +1,7 @@
 package edu.uci.ics.hyracks.storage.am.common.dataflow;
 
+import java.io.DataOutputStream;
+import java.io.IOException;
 import java.io.OutputStream;
 import java.util.ArrayList;
 import java.util.Collections;
@@ -8,199 +10,214 @@
 import java.util.Map;
 
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.api.lifecycle.LifeCycleComponentManager;
+import edu.uci.ics.hyracks.api.lifecycle.ILifeCycleComponent;
 import edu.uci.ics.hyracks.storage.am.common.api.IIndex;
 import edu.uci.ics.hyracks.storage.am.common.api.IIndexLifecycleManager;
 
-public class IndexLifecycleManager implements IIndexLifecycleManager {
-	private static final long DEFAULT_MEMORY_BUDGET = 1024 * 1024 * 100; // 100
-																			// megabytes
+public class IndexLifecycleManager implements IIndexLifecycleManager, ILifeCycleComponent {
+    private static final long DEFAULT_MEMORY_BUDGET = 1024 * 1024 * 100; // 100 megabytes
 
-	private final Map<Long, IndexInfo> indexInfos;
-	private final long memoryBudget;
+    private final Map<Long, IndexInfo> indexInfos;
+    private final long memoryBudget;
 
-	private long memoryUsed;
+    private long memoryUsed;
 
-	public IndexLifecycleManager() {
-		this(DEFAULT_MEMORY_BUDGET);
-	}
+    public IndexLifecycleManager() {
+        this(DEFAULT_MEMORY_BUDGET);
+    }
 
-	public IndexLifecycleManager(long memoryBudget) {
-		this.indexInfos = new HashMap<Long, IndexInfo>();
-		this.memoryBudget = memoryBudget;
-		this.memoryUsed = 0;
-		LifeCycleComponentManager.INSTANCE.register(this);
-	}
+    public IndexLifecycleManager(long memoryBudget) {
+        this.indexInfos = new HashMap<Long, IndexInfo>();
+        this.memoryBudget = memoryBudget;
+        this.memoryUsed = 0;
+    }
 
-	private boolean evictCandidateIndex() throws HyracksDataException {
-		// Why min()? As a heuristic for eviction, we will take an open index
-		// (an index consuming memory)
-		// that is not being used (refcount == 0) and has been least recently
-		// used. The sort order defined
-		// for IndexInfo maintains this. See IndexInfo.compareTo().
-		IndexInfo info = Collections.min(indexInfos.values());
-		if (info.referenceCount != 0 || !info.isOpen) {
-			return false;
-		}
+    private boolean evictCandidateIndex() throws HyracksDataException {
+        // Why min()? As a heuristic for eviction, we will take an open index
+        // (an index consuming memory)
+        // that is not being used (refcount == 0) and has been least recently
+        // used. The sort order defined
+        // for IndexInfo maintains this. See IndexInfo.compareTo().
+        IndexInfo info = Collections.min(indexInfos.values());
+        if (info.referenceCount != 0 || !info.isOpen) {
+            return false;
+        }
 
-		info.index.deactivate();
-		memoryUsed -= info.index.getMemoryAllocationSize();
-		info.isOpen = false;
+        info.index.deactivate();
+        memoryUsed -= info.index.getMemoryAllocationSize();
+        info.isOpen = false;
 
-		return true;
-	}
+        return true;
+    }
 
-	@Override
-	public IIndex getIndex(long resourceID) {
-		IndexInfo info = indexInfos.get(resourceID);
-		return info == null ? null : info.index;
-	}
+    @Override
+    public IIndex getIndex(long resourceID) {
+        IndexInfo info = indexInfos.get(resourceID);
+        return info == null ? null : info.index;
+    }
 
-	@Override
-	public void register(long resourceID, IIndex index)
-			throws HyracksDataException {
-		if (indexInfos.containsKey(resourceID)) {
-			throw new HyracksDataException("Index with resource ID "
-					+ resourceID + " already exists.");
-		}
+    @Override
+    public void register(long resourceID, IIndex index) throws HyracksDataException {
+        if (indexInfos.containsKey(resourceID)) {
+            throw new HyracksDataException("Index with resource ID " + resourceID + " already exists.");
+        }
 
-		indexInfos.put(resourceID, new IndexInfo(index));
-	}
+        indexInfos.put(resourceID, new IndexInfo(index));
+    }
 
-	@Override
-	public void unregister(long resourceID) throws HyracksDataException {
-		IndexInfo info = indexInfos.remove(resourceID);
-		if (info == null) {
-			throw new HyracksDataException("Index with resource ID "
-					+ resourceID + " does not exist.");
-		}
+    @Override
+    public void unregister(long resourceID) throws HyracksDataException {
+        IndexInfo info = indexInfos.remove(resourceID);
+        if (info == null) {
+            throw new HyracksDataException("Index with resource ID " + resourceID + " does not exist.");
+        }
 
-		if (info.referenceCount != 0) {
-			indexInfos.put(resourceID, info);
-			throw new HyracksDataException(
-					"Cannot remove index while it is open.");
-		}
+        if (info.referenceCount != 0) {
+            indexInfos.put(resourceID, info);
+            throw new HyracksDataException("Cannot remove index while it is open.");
+        }
 
-		if (info.isOpen) {
-			info.index.deactivate();
-			memoryUsed -= info.index.getMemoryAllocationSize();
-		}
-	}
+        if (info.isOpen) {
+            info.index.deactivate();
+            memoryUsed -= info.index.getMemoryAllocationSize();
+        }
+    }
 
-	@Override
-	public void open(long resourceID) throws HyracksDataException {
-		IndexInfo info = indexInfos.get(resourceID);
-		if (info == null) {
-			throw new HyracksDataException(
-					"Failed to open index with resource ID " + resourceID
-							+ " since it does not exist.");
-		}
+    @Override
+    public void open(long resourceID) throws HyracksDataException {
+        IndexInfo info = indexInfos.get(resourceID);
+        if (info == null) {
+            throw new HyracksDataException("Failed to open index with resource ID " + resourceID
+                    + " since it does not exist.");
+        }
 
-		long inMemorySize = info.index.getMemoryAllocationSize();
-		while (memoryUsed + inMemorySize > memoryBudget) {
-			if (!evictCandidateIndex()) {
-				throw new HyracksDataException(
-						"Cannot activate index since memory budget would be exceeded.");
-			}
-		}
+        long inMemorySize = info.index.getMemoryAllocationSize();
+        while (memoryUsed + inMemorySize > memoryBudget) {
+            if (!evictCandidateIndex()) {
+                throw new HyracksDataException("Cannot activate index since memory budget would be exceeded.");
+            }
+        }
 
-		if (!info.isOpen) {
-			info.index.activate();
-			info.isOpen = true;
-			memoryUsed += inMemorySize;
-		}
-		info.touch();
-	}
+        if (!info.isOpen) {
+            info.index.activate();
+            info.isOpen = true;
+            memoryUsed += inMemorySize;
+        }
+        info.touch();
+    }
 
-	@Override
-	public void close(long resourceID) {
-		indexInfos.get(resourceID).untouch();
-	}
+    @Override
+    public void close(long resourceID) {
+        indexInfos.get(resourceID).untouch();
+    }
 
-	private class IndexInfo implements Comparable<IndexInfo> {
-		private final IIndex index;
-		private int referenceCount;
-		private long lastAccess;
-		private boolean isOpen;
+    private class IndexInfo implements Comparable<IndexInfo> {
+        private final IIndex index;
+        private int referenceCount;
+        private long lastAccess;
+        private boolean isOpen;
 
-		public IndexInfo(IIndex index) {
-			this.index = index;
-			this.lastAccess = -1;
-			this.referenceCount = 0;
-			this.isOpen = false;
-		}
+        public IndexInfo(IIndex index) {
+            this.index = index;
+            this.lastAccess = -1;
+            this.referenceCount = 0;
+            this.isOpen = false;
+        }
 
-		public void touch() {
-			lastAccess = System.currentTimeMillis();
-			referenceCount++;
-		}
+        public void touch() {
+            lastAccess = System.currentTimeMillis();
+            referenceCount++;
+        }
 
-		public void untouch() {
-			lastAccess = System.currentTimeMillis();
-			referenceCount--;
-		}
+        public void untouch() {
+            lastAccess = System.currentTimeMillis();
+            referenceCount--;
+        }
 
-		@Override
-		public int compareTo(IndexInfo i) {
-			// sort by (isOpen, referenceCount, lastAccess) ascending, where
-			// true < false
-			//
-			// Example sort order:
-			// -------------------
-			// (F, 0, 70) <-- largest
-			// (F, 0, 60)
-			// (T, 10, 80)
-			// (T, 10, 70)
-			// (T, 9, 90)
-			// (T, 0, 100) <-- smallest
-			if (isOpen && !i.isOpen) {
-				return -1;
-			} else if (!isOpen && i.isOpen) {
-				return 1;
-			} else {
-				if (referenceCount < i.referenceCount) {
-					return -1;
-				} else if (referenceCount > i.referenceCount) {
-					return 1;
-				} else {
-					if (lastAccess < i.lastAccess) {
-						return -1;
-					} else if (lastAccess > i.lastAccess) {
-						return 1;
-					} else {
-						return 0;
-					}
-				}
-			}
+        @Override
+        public int compareTo(IndexInfo i) {
+            // sort by (isOpen, referenceCount, lastAccess) ascending, where
+            // true < false
+            //
+            // Example sort order:
+            // -------------------
+            // (F, 0, 70) <-- largest
+            // (F, 0, 60)
+            // (T, 10, 80)
+            // (T, 10, 70)
+            // (T, 9, 90)
+            // (T, 0, 100) <-- smallest
+            if (isOpen && !i.isOpen) {
+                return -1;
+            } else if (!isOpen && i.isOpen) {
+                return 1;
+            } else {
+                if (referenceCount < i.referenceCount) {
+                    return -1;
+                } else if (referenceCount > i.referenceCount) {
+                    return 1;
+                } else {
+                    if (lastAccess < i.lastAccess) {
+                        return -1;
+                    } else if (lastAccess > i.lastAccess) {
+                        return 1;
+                    } else {
+                        return 0;
+                    }
+                }
+            }
 
-		}
+        }
 
-		public String toString() {
-			return "{index: " + index + ", isOpen: " + isOpen + ", refCount: "
-					+ referenceCount + ", lastAccess: " + lastAccess + "}";
-		}
-	}
+        public String toString() {
+            return "{index: " + index + ", isOpen: " + isOpen + ", refCount: " + referenceCount + ", lastAccess: "
+                    + lastAccess + "}";
+        }
+    }
 
-	@Override
-	public List<IIndex> getOpenIndexes() {
-		List<IIndex> openIndexes = new ArrayList<IIndex>();
-		for (IndexInfo i : indexInfos.values()) {
-			if (i.isOpen) {
-				openIndexes.add(i.index);
-			}
-		}
-		return openIndexes;
-	}
+    @Override
+    public List<IIndex> getOpenIndexes() {
+        List<IIndex> openIndexes = new ArrayList<IIndex>();
+        for (IndexInfo i : indexInfos.values()) {
+            if (i.isOpen) {
+                openIndexes.add(i.index);
+            }
+        }
+        return openIndexes;
+    }
 
-	@Override
-	public void start() {
-		// TODO Auto-generated method stub
+    @Override
+    public void start() {
+    }
 
-	}
+    @Override
+    public void stop(boolean dumpState, OutputStream outputStream) throws IOException {
+        if (dumpState) {
+            dumpState(outputStream);
+        }
 
-	@Override
-	public void stop(boolean dumpState, OutputStream ouputStream) {
-		// TODO Auto-generated method stub
+        for (IndexInfo i : indexInfos.values()) {
+            if (i.isOpen) {
+                i.index.deactivate();
+            }
+        }
+    }
 
-	}
+    private void dumpState(OutputStream os) throws IOException {
+        DataOutputStream dos = new DataOutputStream(os);
+        StringBuilder sb = new StringBuilder();
+
+        sb.append(String.format("Memory budget = %d\n", memoryBudget));
+        sb.append(String.format("Memory used = %d\n", memoryUsed));
+
+        String headerFormat = "%-20s %-10s %-20s %-20s %-20s\n";
+        String rowFormat = "%-20d %-10b %-20d %-20s %-20s\n";
+        sb.append(String.format(headerFormat, "ResourceID", "Open", "Reference Count", "Last Access", "Index Name"));
+        IndexInfo ii;
+        for (Map.Entry<Long, IndexInfo> entry : indexInfos.entrySet()) {
+            ii = entry.getValue();
+            sb.append(String.format(rowFormat, entry.getKey(), ii.isOpen, ii.referenceCount, ii.lastAccess, ii.index));
+        }
+        dos.writeUTF(sb.toString());
+    }
 }
\ No newline at end of file
diff --git a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/freepage/InMemoryBufferCache.java b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/freepage/InMemoryBufferCache.java
index 99fdb2e..66d8ec2 100644
--- a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/freepage/InMemoryBufferCache.java
+++ b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/freepage/InMemoryBufferCache.java
@@ -15,7 +15,6 @@
 
 package edu.uci.ics.hyracks.storage.am.lsm.common.freepage;
 
-import java.io.OutputStream;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
@@ -204,16 +203,4 @@
     public IFileMapProvider getFileMapProvider() {
         return fileMapManager;
     }
-
-	@Override
-	public void start() {
-		// TODO Auto-generated method stub
-		
-	}
-
-	@Override
-	public void stop(boolean dumpState, OutputStream ouputStream) {
-		// TODO Auto-generated method stub
-		
-	}
 }
diff --git a/hyracks/hyracks-storage-common/src/main/java/edu/uci/ics/hyracks/storage/common/buffercache/BufferCache.java b/hyracks/hyracks-storage-common/src/main/java/edu/uci/ics/hyracks/storage/common/buffercache/BufferCache.java
index f8195ab..934575c 100644
--- a/hyracks/hyracks-storage-common/src/main/java/edu/uci/ics/hyracks/storage/common/buffercache/BufferCache.java
+++ b/hyracks/hyracks-storage-common/src/main/java/edu/uci/ics/hyracks/storage/common/buffercache/BufferCache.java
@@ -14,10 +14,15 @@
  */
 package edu.uci.ics.hyracks.storage.common.buffercache;
 
+import java.io.DataOutputStream;
+import java.io.IOException;
 import java.io.OutputStream;
 import java.nio.ByteBuffer;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.locks.Lock;
@@ -31,10 +36,11 @@
 import edu.uci.ics.hyracks.api.io.FileReference;
 import edu.uci.ics.hyracks.api.io.IFileHandle;
 import edu.uci.ics.hyracks.api.io.IIOManager;
+import edu.uci.ics.hyracks.api.lifecycle.ILifeCycleComponent;
 import edu.uci.ics.hyracks.storage.common.file.BufferedFileHandle;
 import edu.uci.ics.hyracks.storage.common.file.IFileMapManager;
 
-public class BufferCache implements IBufferCacheInternal {
+public class BufferCache implements IBufferCacheInternal, ILifeCycleComponent {
     private static final Logger LOGGER = Logger.getLogger(BufferCache.class.getName());
     private static final int MAP_FACTOR = 2;
 
@@ -46,7 +52,6 @@
     private final IIOManager ioManager;
     private final int pageSize;
     private final int numPages;
-    private final CachedPage[] cachedPages;
     private final CacheBucket[] pageMap;
     private final IPageReplacementStrategy pageReplacementStrategy;
     private final IPageCleanerPolicy pageCleanerPolicy;
@@ -54,31 +59,34 @@
     private final CleanerThread cleanerThread;
     private final Map<Integer, BufferedFileHandle> fileInfoMap;
 
+    private CachedPage[] cachedPages;
+
     private boolean closed;
 
     public BufferCache(IIOManager ioManager, ICacheMemoryAllocator allocator,
             IPageReplacementStrategy pageReplacementStrategy, IPageCleanerPolicy pageCleanerPolicy,
-            IFileMapManager fileMapManager, int pageSize, int numPages, int maxOpenFiles) {
+            IFileMapManager fileMapManager, int pageSize, int numPages, int maxOpenFiles, ThreadFactory threadFactory) {
         this.ioManager = ioManager;
         this.pageSize = pageSize;
         this.numPages = numPages;
         this.maxOpenFiles = maxOpenFiles;
         pageReplacementStrategy.setBufferCache(this);
+        pageMap = new CacheBucket[numPages * MAP_FACTOR];
+        for (int i = 0; i < pageMap.length; ++i) {
+            pageMap[i] = new CacheBucket();
+        }
         ByteBuffer[] buffers = allocator.allocate(pageSize, numPages);
         cachedPages = new CachedPage[buffers.length];
         for (int i = 0; i < buffers.length; ++i) {
             cachedPages[i] = new CachedPage(i, buffers[i], pageReplacementStrategy);
         }
-        pageMap = new CacheBucket[numPages * MAP_FACTOR];
-        for (int i = 0; i < pageMap.length; ++i) {
-            pageMap[i] = new CacheBucket();
-        }
         this.pageReplacementStrategy = pageReplacementStrategy;
         this.pageCleanerPolicy = pageCleanerPolicy;
         this.fileMapManager = fileMapManager;
+        Executor executor = Executors.newCachedThreadPool(threadFactory);
         fileInfoMap = new HashMap<Integer, BufferedFileHandle>();
         cleanerThread = new CleanerThread();
-        cleanerThread.start();
+        executor.execute(cleanerThread);
         closed = false;
     }
 
@@ -791,13 +799,14 @@
 
     @Override
     public void start() {
-        // TODO Auto-generated method stub
-
+        // no op
     }
 
     @Override
-    public void stop(boolean dumpState, OutputStream ouputStream) {
-        // TODO Auto-generated method stub
-
+    public void stop(boolean dumpState, OutputStream ouputStream) throws IOException {
+        if (dumpState) {
+            new DataOutputStream(ouputStream).writeUTF(dumpState());
+        }
+        close();
     }
 }
\ No newline at end of file
diff --git a/hyracks/hyracks-storage-common/src/main/java/edu/uci/ics/hyracks/storage/common/buffercache/DebugBufferCache.java b/hyracks/hyracks-storage-common/src/main/java/edu/uci/ics/hyracks/storage/common/buffercache/DebugBufferCache.java
index 0a43b1f..df8cc26 100644
--- a/hyracks/hyracks-storage-common/src/main/java/edu/uci/ics/hyracks/storage/common/buffercache/DebugBufferCache.java
+++ b/hyracks/hyracks-storage-common/src/main/java/edu/uci/ics/hyracks/storage/common/buffercache/DebugBufferCache.java
@@ -15,7 +15,6 @@
 
 package edu.uci.ics.hyracks.storage.common.buffercache;
 
-import java.io.OutputStream;
 import java.util.concurrent.atomic.AtomicLong;
 
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
@@ -157,25 +156,13 @@
         return closeFileCount.get();
     }
 
-	@Override
-	public void flushDirtyPage(ICachedPage page) throws HyracksDataException {
-		bufferCache.flushDirtyPage(page);
-	}
-
-	@Override
-	public void force(int fileId, boolean metadata) throws HyracksDataException {
-		bufferCache.force(fileId, metadata);
-	}
-
     @Override
-    public void start() {
-        // TODO Auto-generated method stub
-        
+    public void flushDirtyPage(ICachedPage page) throws HyracksDataException {
+        bufferCache.flushDirtyPage(page);
     }
 
     @Override
-    public void stop(boolean dumpState, OutputStream ouputStream) {
-        // TODO Auto-generated method stub
-        
+    public void force(int fileId, boolean metadata) throws HyracksDataException {
+        bufferCache.force(fileId, metadata);
     }
 }
diff --git a/hyracks/hyracks-storage-common/src/main/java/edu/uci/ics/hyracks/storage/common/buffercache/IBufferCache.java b/hyracks/hyracks-storage-common/src/main/java/edu/uci/ics/hyracks/storage/common/buffercache/IBufferCache.java
index 3c9882f..8b86a91 100644
--- a/hyracks/hyracks-storage-common/src/main/java/edu/uci/ics/hyracks/storage/common/buffercache/IBufferCache.java
+++ b/hyracks/hyracks-storage-common/src/main/java/edu/uci/ics/hyracks/storage/common/buffercache/IBufferCache.java
@@ -16,9 +16,8 @@
 
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.api.io.FileReference;
-import edu.uci.ics.hyracks.api.lifecycle.ILifeCycleComponent;
 
-public interface IBufferCache extends ILifeCycleComponent {
+public interface IBufferCache {
     public void createFile(FileReference fileRef) throws HyracksDataException;
 
     public void openFile(int fileId) throws HyracksDataException;
@@ -26,7 +25,7 @@
     public void closeFile(int fileId) throws HyracksDataException;
 
     public void deleteFile(int fileId, boolean flushDirtyPages) throws HyracksDataException;
-    
+
     public ICachedPage tryPin(long dpid) throws HyracksDataException;
 
     public ICachedPage pin(long dpid, boolean newPage) throws HyracksDataException;
@@ -34,9 +33,9 @@
     public void unpin(ICachedPage page) throws HyracksDataException;
 
     public void flushDirtyPage(ICachedPage page) throws HyracksDataException;
-    
+
     public void force(int fileId, boolean metadata) throws HyracksDataException;
-    
+
     public int getPageSize();
 
     public int getNumPages();
diff --git a/hyracks/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/test/support/TestNCApplicationContext.java b/hyracks/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/test/support/TestNCApplicationContext.java
index 285ab1f..842c0d1 100644
--- a/hyracks/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/test/support/TestNCApplicationContext.java
+++ b/hyracks/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/test/support/TestNCApplicationContext.java
@@ -15,6 +15,7 @@
 package edu.uci.ics.hyracks.test.support;
 
 import java.io.Serializable;
+import java.util.concurrent.ThreadFactory;
 
 import edu.uci.ics.hyracks.api.application.INCApplicationContext;
 import edu.uci.ics.hyracks.api.context.IHyracksRootContext;
@@ -70,9 +71,21 @@
         return null;
     }
 
-	@Override
-	public IJobSerializerDeserializerContainer getJobSerializerDeserializerContainer() {
-		// TODO Auto-generated method stub
-		return null;
-	}
+    @Override
+    public IJobSerializerDeserializerContainer getJobSerializerDeserializerContainer() {
+        // TODO Auto-generated method stub
+        return null;
+    }
+
+    @Override
+    public ThreadFactory getThreadFactory() {
+        // TODO Auto-generated method stub
+        return null;
+    }
+
+    @Override
+    public void setThreadFactory(ThreadFactory threadFactory) {
+        // TODO Auto-generated method stub
+
+    }
 }
\ No newline at end of file
diff --git a/hyracks/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/test/support/TestStorageManagerComponentHolder.java b/hyracks/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/test/support/TestStorageManagerComponentHolder.java
index fdb2100..2c6aee5 100644
--- a/hyracks/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/test/support/TestStorageManagerComponentHolder.java
+++ b/hyracks/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/test/support/TestStorageManagerComponentHolder.java
@@ -18,6 +18,7 @@
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
 
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
 import edu.uci.ics.hyracks.api.exceptions.HyracksException;
@@ -52,6 +53,11 @@
     private static int pageSize;
     private static int numPages;
     private static int maxOpenFiles;
+    private final static ThreadFactory threadFactory = new ThreadFactory() {
+        public Thread newThread(Runnable r) {
+            return new Thread(r);
+        }
+    };
 
     public static void init(int pageSize, int numPages, int maxOpenFiles) {
         TestStorageManagerComponentHolder.pageSize = pageSize;
@@ -76,7 +82,7 @@
             IPageReplacementStrategy prs = new ClockPageReplacementStrategy();
             IFileMapProvider fileMapProvider = getFileMapProvider(ctx);
             bufferCache = new BufferCache(ctx.getIOManager(), allocator, prs, new DelayPageCleanerPolicy(1000),
-                    (IFileMapManager) fileMapProvider, pageSize, numPages, maxOpenFiles);
+                    (IFileMapManager) fileMapProvider, pageSize, numPages, maxOpenFiles, threadFactory);
         }
         return bufferCache;
     }
diff --git a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/context/RuntimeContext.java b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/context/RuntimeContext.java
index 8d6ab38..92f2c11 100644
--- a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/context/RuntimeContext.java
+++ b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/context/RuntimeContext.java
@@ -19,6 +19,7 @@
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ThreadFactory;
 import java.util.logging.Logger;
 
 import edu.uci.ics.hyracks.api.application.INCApplicationContext;
@@ -57,6 +58,11 @@
     private final Map<String, Boolean> giraphJobIdToMove = new ConcurrentHashMap<String, Boolean>();
     private final IOManager ioManager;
     private final Map<Long, List<FileReference>> iterationToFiles = new ConcurrentHashMap<Long, List<FileReference>>();
+    private final ThreadFactory threadFactory = new ThreadFactory() {
+        public Thread newThread(Runnable r) {
+            return new Thread(r);
+        }
+    };
 
     public RuntimeContext(INCApplicationContext appCtx) {
         fileMapManager = new TransientFileMapManager();
@@ -68,7 +74,8 @@
         int numPages = (int) (bufferSize / pageSize);
         /** let the buffer cache never flush dirty pages */
         bufferCache = new BufferCache(appCtx.getRootContext().getIOManager(), allocator, prs,
-                new PreDelayPageCleanerPolicy(Long.MAX_VALUE), fileMapManager, pageSize, numPages, 1000000);
+                new PreDelayPageCleanerPolicy(Long.MAX_VALUE), fileMapManager, pageSize, numPages, 1000000,
+                threadFactory);
         ioManager = (IOManager) appCtx.getRootContext().getIOManager();
         lcManager = new IndexLifecycleManager();
         localResourceRepository = new TransientLocalResourceRepository();