Merge branch 'master' into westmann/doc
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/api/common/AsterixAppRuntimeContext.java b/asterix-app/src/main/java/edu/uci/ics/asterix/api/common/AsterixAppRuntimeContext.java
index e64f68f..d4d7ac6 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/api/common/AsterixAppRuntimeContext.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/api/common/AsterixAppRuntimeContext.java
@@ -27,6 +27,8 @@
 import edu.uci.ics.hyracks.api.application.INCApplicationContext;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.api.io.IIOManager;
+import edu.uci.ics.hyracks.api.lifecycle.ILifeCycleComponent;
+import edu.uci.ics.hyracks.api.lifecycle.LifeCycleComponentManager;
 import edu.uci.ics.hyracks.storage.am.common.api.IIndex;
 import edu.uci.ics.hyracks.storage.am.common.api.IIndexLifecycleManager;
 import edu.uci.ics.hyracks.storage.am.common.dataflow.IndexLifecycleManager;
@@ -99,10 +101,9 @@
         ioManager = ncApplicationContext.getRootContext().getIOManager();
         bufferCache = new BufferCache(ioManager, allocator, prs, pcp, fileMapManager,
                 storageProperties.getBufferCachePageSize(), storageProperties.getBufferCacheNumPages(),
-                storageProperties.getBufferCacheMaxOpenFiles());
+                storageProperties.getBufferCacheMaxOpenFiles(), ncApplicationContext.getThreadFactory());
 
         indexLifecycleManager = new IndexLifecycleManager(storageProperties.getMemoryComponentGlobalBudget());
-
         lsmIOScheduler = SynchronousScheduler.INSTANCE;
         mergePolicy = new ConstantMergePolicy(storageProperties.getLSMIndexMergeThreshold(), this);
         lsmBTreeOpTrackerFactory = new IndexOperationTrackerFactory(LSMBTreeIOOperationCallbackFactory.INSTANCE);
@@ -120,6 +121,14 @@
                 this);
         txnSubsystem = new TransactionSubsystem(ncApplicationContext.getNodeId(), asterixAppRuntimeContextProvider);
         isShuttingdown = false;
+
+        // The order of registration is important. The buffer cache must registered before recovery and transaction managers.
+        LifeCycleComponentManager.INSTANCE.register((ILifeCycleComponent) bufferCache);
+        LifeCycleComponentManager.INSTANCE.register((ILifeCycleComponent) indexLifecycleManager);
+        LifeCycleComponentManager.INSTANCE.register((ILifeCycleComponent) txnSubsystem.getTransactionManager());
+        LifeCycleComponentManager.INSTANCE.register((ILifeCycleComponent) txnSubsystem.getLogManager());
+        LifeCycleComponentManager.INSTANCE.register((ILifeCycleComponent) txnSubsystem.getLockManager());
+        LifeCycleComponentManager.INSTANCE.register((ILifeCycleComponent) txnSubsystem.getRecoveryManager());
     }
 
     public boolean isShuttingdown() {
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/CCApplicationEntryPoint.java b/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/CCApplicationEntryPoint.java
index 5d4f6ec..9b83f00 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/CCApplicationEntryPoint.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/CCApplicationEntryPoint.java
@@ -13,6 +13,7 @@
 import edu.uci.ics.asterix.api.http.servlet.QueryResultAPIServlet;
 import edu.uci.ics.asterix.api.http.servlet.QueryStatusAPIServlet;
 import edu.uci.ics.asterix.api.http.servlet.UpdateAPIServlet;
+import edu.uci.ics.asterix.common.api.AsterixThreadFactory;
 import edu.uci.ics.asterix.common.config.AsterixExternalProperties;
 import edu.uci.ics.asterix.common.config.AsterixMetadataProperties;
 import edu.uci.ics.asterix.metadata.MetadataManager;
@@ -37,12 +38,12 @@
     @Override
     public void start(ICCApplicationContext ccAppCtx, String[] args) throws Exception {
         this.appCtx = ccAppCtx;
+
         if (LOGGER.isLoggable(Level.INFO)) {
             LOGGER.info("Starting Asterix cluster controller");
         }
-
+        appCtx.setThreadFactory(AsterixThreadFactory.INSTANCE);
         AsterixAppContextInfo.initialize(appCtx);
-
         proxy = AsterixStateProxy.registerRemoteObject();
         appCtx.setDistributedState(proxy);
 
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java b/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java
index 4569088..32bfeb5 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java
@@ -1,11 +1,15 @@
 package edu.uci.ics.asterix.hyracks.bootstrap;
 
+import java.io.File;
 import java.rmi.RemoteException;
 import java.rmi.server.UnicastRemoteObject;
+import java.util.HashMap;
+import java.util.Map;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
 import edu.uci.ics.asterix.api.common.AsterixAppRuntimeContext;
+import edu.uci.ics.asterix.common.api.AsterixThreadFactory;
 import edu.uci.ics.asterix.common.api.IAsterixAppRuntimeContext;
 import edu.uci.ics.asterix.common.config.AsterixMetadataProperties;
 import edu.uci.ics.asterix.common.config.IAsterixPropertiesProvider;
@@ -19,6 +23,7 @@
 import edu.uci.ics.asterix.transaction.management.resource.PersistentLocalResourceRepository;
 import edu.uci.ics.hyracks.api.application.INCApplicationContext;
 import edu.uci.ics.hyracks.api.application.INCApplicationEntryPoint;
+import edu.uci.ics.hyracks.api.lifecycle.LifeCycleComponentManager;
 
 public class NCApplicationEntryPoint implements INCApplicationEntryPoint {
     private static final Logger LOGGER = Logger.getLogger(NCApplicationEntryPoint.class.getName());
@@ -32,17 +37,19 @@
 
     @Override
     public void start(INCApplicationContext ncAppCtx, String[] args) throws Exception {
+        ncAppCtx.setThreadFactory(AsterixThreadFactory.INSTANCE);
         ncApplicationContext = ncAppCtx;
         nodeId = ncApplicationContext.getNodeId();
         if (LOGGER.isLoggable(Level.INFO)) {
-            LOGGER.info("Starting Asterix node controller: " + nodeId);
+            LOGGER.info("Starting Asterix node controller  TAKE NOTE: " + nodeId);
         }
+        JVMShutdownHook sHook = new JVMShutdownHook(this);
+        Runtime.getRuntime().addShutdownHook(sHook);
 
+     
         runtimeContext = new AsterixAppRuntimeContext(ncApplicationContext);
         runtimeContext.initialize();
         ncApplicationContext.setApplicationObject(runtimeContext);
-        JVMShutdownHook sHook = new JVMShutdownHook(this);
-        Runtime.getRuntime().addShutdownHook(sHook);
 
         // #. recover if the system is corrupted by checking system state.
         IRecoveryManager recoveryMgr = runtimeContext.getTransactionSubsystem().getRecoveryManager();
@@ -78,6 +85,8 @@
             if (isMetadataNode) {
                 MetadataBootstrap.stopUniverse();
             }
+
+            LifeCycleComponentManager.INSTANCE.stopAll(false);
             runtimeContext.deinitialize();
         } else {
             if (LOGGER.isLoggable(Level.INFO)) {
@@ -89,7 +98,8 @@
     @Override
     public void notifyStartupComplete() throws Exception {
         IAsterixStateProxy proxy = (IAsterixStateProxy) ncApplicationContext.getDistributedState();
-        AsterixMetadataProperties metadataProperties = ((IAsterixPropertiesProvider)runtimeContext).getMetadataProperties();
+        AsterixMetadataProperties metadataProperties = ((IAsterixPropertiesProvider) runtimeContext)
+                .getMetadataProperties();
 
         if (systemState == SystemState.NEW_UNIVERSE) {
             if (LOGGER.isLoggable(Level.INFO)) {
@@ -113,11 +123,29 @@
             }
             MetadataManager.INSTANCE = new MetadataManager(proxy, metadataProperties);
             MetadataManager.INSTANCE.init();
-            MetadataBootstrap.startUniverse( ((IAsterixPropertiesProvider)runtimeContext), ncApplicationContext,
+            MetadataBootstrap.startUniverse(((IAsterixPropertiesProvider) runtimeContext), ncApplicationContext,
                     systemState == SystemState.NEW_UNIVERSE);
             MetadataBootstrap.startDDLRecovery();
         }
 
+        if (LOGGER.isLoggable(Level.INFO)) {
+            LOGGER.info("Starting lifecycle components");
+        }
+        
+        Map<String, String> lifecycleMgmtConfiguration = new HashMap<String, String>();
+        String key = LifeCycleComponentManager.Config.DUMP_PATH_KEY;
+        String value = metadataProperties.getCoredumpPath(nodeId);
+        lifecycleMgmtConfiguration.put(key, value);
+        if (LOGGER.isLoggable(Level.INFO)) {
+            LOGGER.info("Coredump directory for NC is: " + value);
+        }
+        LifeCycleComponentManager.INSTANCE.configure(lifecycleMgmtConfiguration);
+        if (LOGGER.isLoggable(Level.INFO)) {
+            LOGGER.info("Configured:" + LifeCycleComponentManager.INSTANCE);
+        }
+        
+        LifeCycleComponentManager.INSTANCE.startAll();
+
         IRecoveryManager recoveryMgr = runtimeContext.getTransactionSubsystem().getRecoveryManager();
         recoveryMgr.checkpoint(true);
 
diff --git a/asterix-app/src/test/resources/hadoop/conf/mapred-site.xml b/asterix-app/src/test/resources/hadoop/conf/mapred-site.xml
index 1b9a4d6..b39fced 100644
--- a/asterix-app/src/test/resources/hadoop/conf/mapred-site.xml
+++ b/asterix-app/src/test/resources/hadoop/conf/mapred-site.xml
@@ -5,21 +5,21 @@
 
 <configuration>
 
-  <property>
-    <name>mapred.job.tracker</name>
-    <value>localhost:29007</value>
-  </property>
-  <property>
-     <name>mapred.tasktracker.map.tasks.maximum</name>
-     <value>20</value>
-  </property>
-   <property>
-      <name>mapred.tasktracker.reduce.tasks.maximum</name>
-      <value>20</value>
-   </property>
-   <property>
-      <name>mapred.min.split.size</name>
-      <value>65536</value>
-   </property>
+	<property>
+		<name>mapred.job.tracker</name>
+		<value>localhost:29007</value>
+	</property>
+	<property>
+		<name>mapred.tasktracker.map.tasks.maximum</name>
+		<value>20</value>
+	</property>
+	<property>
+		<name>mapred.tasktracker.reduce.tasks.maximum</name>
+		<value>20</value>
+	</property>
+	<property>
+		<name>mapred.max.split.size</name>
+		<value>128</value>
+	</property>
 
 </configuration>
diff --git a/asterix-app/src/test/resources/runtimets/queries/hdfs/issue_245_hdfs/issue_245_hdfs.3.query.aql b/asterix-app/src/test/resources/runtimets/queries/hdfs/issue_245_hdfs/issue_245_hdfs.3.query.aql
index 653ee6c..527a0e5 100644
--- a/asterix-app/src/test/resources/runtimets/queries/hdfs/issue_245_hdfs/issue_245_hdfs.3.query.aql
+++ b/asterix-app/src/test/resources/runtimets/queries/hdfs/issue_245_hdfs/issue_245_hdfs.3.query.aql
@@ -8,4 +8,5 @@
 use dataverse test;
 
 for $x in dataset('TextDataset')
+order by $x.line
 return $x
diff --git a/asterix-app/src/test/resources/runtimets/results/hdfs/issue_245_hdfs/issue_245_hdfs.1.adm b/asterix-app/src/test/resources/runtimets/results/hdfs/issue_245_hdfs/issue_245_hdfs.1.adm
index 8af2f5f..59425b1 100644
--- a/asterix-app/src/test/resources/runtimets/results/hdfs/issue_245_hdfs/issue_245_hdfs.1.adm
+++ b/asterix-app/src/test/resources/runtimets/results/hdfs/issue_245_hdfs/issue_245_hdfs.1.adm
@@ -1,4 +1,4 @@
+{ "line": "ASTERIX is taking an open stance on data formats and addressing research issues including highly scalable data storage and indexing, semi-structured query processing on very large clusters, and merging parallel database techniques with todays data-intensive computing techniques to support performant yet declarative solutions to the problem of analyzing semi-structured information" }
+{ "line": "ASTERIX targets a wide range of semi-structured information, ranging from data use cases where information is well-tagged and highly regular to content use cases where data is irregular and much of each datum is textual" }
 { "line": "The ASTERIX project is developing new technologies for ingesting, storing, managing, indexing, querying, analyzing, and subscribing to vast quantities of semi-structured information" }
 { "line": "The project is combining ideas from three distinct areas semi-structured data, parallel databases, and data-intensive computing  to create a next-generation, open source software platform that scales by running on large, shared-nothing commodity computing clusters" }
-{ "line": "ASTERIX targets a wide range of semi-structured information, ranging from data use cases where information is well-tagged and highly regular to content use cases where data is irregular and much of each datum is textual" }
-{ "line": "ASTERIX is taking an open stance on data formats and addressing research issues including highly scalable data storage and indexing, semi-structured query processing on very large clusters, and merging parallel database techniques with todays data-intensive computing techniques to support performant yet declarative solutions to the problem of analyzing semi-structured information" }
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/api/AsterixThreadExecutor.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/api/AsterixThreadExecutor.java
new file mode 100644
index 0000000..14975ff
--- /dev/null
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/api/AsterixThreadExecutor.java
@@ -0,0 +1,32 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.common.api;
+
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
+
+public class AsterixThreadExecutor implements Executor {
+    public final static AsterixThreadExecutor INSTANCE = new AsterixThreadExecutor();
+    private final Executor executor = Executors.newCachedThreadPool(AsterixThreadFactory.INSTANCE);
+
+    private AsterixThreadExecutor() {
+
+    }
+
+    @Override
+    public void execute(Runnable command) {
+        executor.execute(command);
+    }
+}
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/api/AsterixThreadFactory.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/api/AsterixThreadFactory.java
new file mode 100644
index 0000000..7e4735f
--- /dev/null
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/api/AsterixThreadFactory.java
@@ -0,0 +1,41 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.common.api;
+
+import java.util.concurrent.ThreadFactory;
+
+import edu.uci.ics.hyracks.api.lifecycle.LifeCycleComponentManager;
+
+public class AsterixThreadFactory implements ThreadFactory {
+
+    public final static AsterixThreadFactory INSTANCE = new AsterixThreadFactory();
+
+    private AsterixThreadFactory() {
+
+    }
+
+    @Override
+    public Thread newThread(Runnable r) {
+        Thread t;
+        if ((r instanceof Thread)) {
+            t = (Thread) r;
+        } else {
+            t = new Thread(r);
+        }
+        t.setUncaughtExceptionHandler(LifeCycleComponentManager.INSTANCE);
+        return t;
+    }
+
+}
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/config/AsterixMetadataProperties.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/config/AsterixMetadataProperties.java
index 6d47e78..6b6cded 100644
--- a/asterix-common/src/main/java/edu/uci/ics/asterix/common/config/AsterixMetadataProperties.java
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/config/AsterixMetadataProperties.java
@@ -24,5 +24,9 @@
     public Set<String> getNodeNames() {
         return accessor.getNodeNames();
     }
+    
+    public String getCoredumpPath(String nodeId){
+        return accessor.getCoredumpPath(nodeId);
+    }
 
 }
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/config/AsterixPropertiesAccessor.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/config/AsterixPropertiesAccessor.java
index 7b2f2a6..d623ae5 100644
--- a/asterix-common/src/main/java/edu/uci/ics/asterix/common/config/AsterixPropertiesAccessor.java
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/config/AsterixPropertiesAccessor.java
@@ -16,6 +16,7 @@
 import javax.xml.bind.Unmarshaller;
 
 import edu.uci.ics.asterix.common.configuration.AsterixConfiguration;
+import edu.uci.ics.asterix.common.configuration.Coredump;
 import edu.uci.ics.asterix.common.configuration.Property;
 import edu.uci.ics.asterix.common.configuration.Store;
 import edu.uci.ics.asterix.common.exceptions.AsterixException;
@@ -26,6 +27,7 @@
     private final String metadataNodeName;
     private final Set<String> nodeNames;
     private final Map<String, String[]> stores;
+    private final Map<String, String> coredumpConfig;
     private final Map<String, Property> asterixConfigurationParams;
 
     public AsterixPropertiesAccessor() throws AsterixException {
@@ -64,6 +66,11 @@
         for (Property p : asterixConfiguration.getProperty()) {
             asterixConfigurationParams.put(p.getName(), p);
         }
+        coredumpConfig = new HashMap<String, String>();
+        for (Coredump cd : asterixConfiguration.getCoredump()) {
+            coredumpConfig.put(cd.getNcId(), cd.getCoredumpPath());
+        }
+
     }
 
     public String getMetadataNodeName() {
@@ -82,6 +89,10 @@
         return nodeNames;
     }
 
+    public String getCoredumpPath(String nodeId) {
+        return coredumpConfig.get(nodeId);
+    }
+
     public <T> T getProperty(String property, T defaultValue, IPropertyInterpreter<T> interpreter) {
         Property p = asterixConfigurationParams.get(property);
         if (p == null) {
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/ILogManager.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/ILogManager.java
index 9470d17..c90422c 100644
--- a/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/ILogManager.java
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/ILogManager.java
@@ -50,12 +50,10 @@
             ACIDException;
 
     /**
-     * @param logicalLogLocator
+     * @param lsnValue
      *            TODO
      * @param logicalLogLocator
      *            TODO
-     * @param PhysicalLogLocator
-     *            specifies the location of the log record to be read
      * @throws ACIDException
      */
     public void readLog(long lsnValue, LogicalLogLocator logicalLogLocator) throws ACIDException;
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/ITransactionContext.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/ITransactionContext.java
index a06cc75..a7f2984 100644
--- a/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/ITransactionContext.java
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/ITransactionContext.java
@@ -1,7 +1,6 @@
 package edu.uci.ics.asterix.common.transactions;
 
 import edu.uci.ics.asterix.common.exceptions.ACIDException;
-import edu.uci.ics.asterix.common.transactions.ITransactionContext.TransactionType;
 import edu.uci.ics.asterix.common.transactions.ITransactionManager.TransactionState;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndex;
@@ -42,6 +41,8 @@
 
     public void setTransactionType(TransactionType transactionType);
 
+    public String prettyPrint();
+
     public static final long INVALID_TIME = -1l; // used for showing a
     // transaction is not waiting.
     public static final int ACTIVE_STATUS = 0;
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/ITransactionManager.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/ITransactionManager.java
index 2bfc00d..e57cc64 100644
--- a/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/ITransactionManager.java
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/ITransactionManager.java
@@ -120,5 +120,4 @@
      */
     public ITransactionSubsystem getTransactionProvider();
 
-
 }
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/LogManagerProperties.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/LogManagerProperties.java
index 4dc943c..89816aa 100644
--- a/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/LogManagerProperties.java
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/LogManagerProperties.java
@@ -105,7 +105,7 @@
     public String getLogDirKey() {
         return logDirKey;
     }
-    
+
     public int getDiskSectorSize() {
         return diskSectorSize;
     }
diff --git a/asterix-common/src/main/resources/schema/asterix-conf.xsd b/asterix-common/src/main/resources/schema/asterix-conf.xsd
index f53fb4b..5aefdbd 100644
--- a/asterix-common/src/main/resources/schema/asterix-conf.xsd
+++ b/asterix-common/src/main/resources/schema/asterix-conf.xsd
@@ -7,6 +7,7 @@
 
         
 	<xs:element name="metadataNode" type="xs:string" />
+	<xs:element name="coredumpPath" type="xs:string" />
 	<xs:element name="storeDirs" type="xs:string" />
 	<xs:element name="ncId" type="xs:string" />
 	<xs:element name="name" type="xs:string" />
@@ -23,6 +24,15 @@
 		</xs:complexType>
 	</xs:element>
 
+	<xs:element name="coredump">
+		<xs:complexType>
+			<xs:sequence>
+				<xs:element ref="mg:ncId" />
+				<xs:element ref="mg:coredumpPath" />
+			</xs:sequence>
+		</xs:complexType>
+	</xs:element>
+
 	<xs:element name="property">
 		<xs:complexType>
 			<xs:sequence>
@@ -39,6 +49,7 @@
 			<xs:sequence>
 				<xs:element ref="mg:metadataNode" minOccurs="0"/>
 				<xs:element ref="mg:store" maxOccurs="unbounded" />
+				<xs:element ref="mg:coredump" maxOccurs="unbounded" />
 				<xs:element ref="mg:property" minOccurs="0" maxOccurs="unbounded" />
 			</xs:sequence>
 		</xs:complexType>
diff --git a/asterix-events/src/main/java/edu/uci/ics/asterix/event/management/EventExecutor.java b/asterix-events/src/main/java/edu/uci/ics/asterix/event/management/EventExecutor.java
index e702ef3..63a791b 100644
--- a/asterix-events/src/main/java/edu/uci/ics/asterix/event/management/EventExecutor.java
+++ b/asterix-events/src/main/java/edu/uci/ics/asterix/event/management/EventExecutor.java
@@ -18,7 +18,6 @@
 import java.io.File;
 import java.io.IOException;
 import java.io.StringWriter;
-import java.math.BigInteger;
 import java.util.ArrayList;
 import java.util.List;
 
diff --git a/asterix-external-data/pom.xml b/asterix-external-data/pom.xml
index 842cd67..aff6967 100644
--- a/asterix-external-data/pom.xml
+++ b/asterix-external-data/pom.xml
@@ -131,7 +131,8 @@
 		</dependency>
 		<dependency>
 			<groupId>edu.uci.ics.hyracks</groupId>
-			<artifactId>hyracks-dataflow-hadoop</artifactId>
+			<artifactId>hyracks-hdfs-core</artifactId>
+			<version>${hyracks.version}</version>
 		</dependency>
 		<dependency>
 			<groupId>jdom</groupId>
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/CNNFeedAdapterFactory.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/CNNFeedAdapterFactory.java
index f1f5884..8c8880a 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/CNNFeedAdapterFactory.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/CNNFeedAdapterFactory.java
@@ -23,9 +23,10 @@
  * A factory class for creating the @see {CNNFeedAdapter}.  
  */
 public class CNNFeedAdapterFactory implements ITypedDatasetAdapterFactory {
+    private static final long serialVersionUID = 1L;
 
     @Override
-    public IDatasourceAdapter createAdapter(Map<String, String> configuration) throws Exception {
+    public IDatasourceAdapter createAdapter(Map<String, Object> configuration) throws Exception {
         CNNFeedAdapter cnnFeedAdapter = new CNNFeedAdapter();
         cnnFeedAdapter.configure(configuration);
         return cnnFeedAdapter;
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/HDFSAdapterFactory.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/HDFSAdapterFactory.java
index 6fcb710..c267658 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/HDFSAdapterFactory.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/HDFSAdapterFactory.java
@@ -14,22 +14,79 @@
  */
 package edu.uci.ics.asterix.external.adapter.factory;
 
+import java.util.Arrays;
+import java.util.HashMap;
 import java.util.Map;
 
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+
 import edu.uci.ics.asterix.external.dataset.adapter.HDFSAdapter;
 import edu.uci.ics.asterix.external.dataset.adapter.IDatasourceAdapter;
 import edu.uci.ics.asterix.om.types.IAType;
+import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
+import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
+import edu.uci.ics.hyracks.hdfs.dataflow.ConfFactory;
+import edu.uci.ics.hyracks.hdfs.dataflow.InputSplitsFactory;
+import edu.uci.ics.hyracks.hdfs.scheduler.Scheduler;
 
 /**
  * A factory class for creating an instance of HDFSAdapter
  */
+@SuppressWarnings("deprecation")
 public class HDFSAdapterFactory implements IGenericDatasetAdapterFactory {
+    private static final long serialVersionUID = 1L;
 
     public static final String HDFS_ADAPTER_NAME = "hdfs";
+    public static final String CLUSTER_LOCATIONS = "cluster-locations";
+    public static transient String SCHEDULER = "hdfs-scheduler";
+
+    public static final String KEY_HDFS_URL = "hdfs";
+    public static final String KEY_PATH = "path";
+    public static final String KEY_INPUT_FORMAT = "input-format";
+    public static final String INPUT_FORMAT_TEXT = "text-input-format";
+    public static final String INPUT_FORMAT_SEQUENCE = "sequence-input-format";
+
+    private transient AlgebricksPartitionConstraint clusterLocations;
+    private String[] readSchedule;
+    private boolean executed[];
+    private InputSplitsFactory inputSplitsFactory;
+    private ConfFactory confFactory;
+    private boolean setup = false;
+
+    private static final Map<String, String> formatClassNames = initInputFormatMap();
+
+    private static Map<String, String> initInputFormatMap() {
+        Map<String, String> formatClassNames = new HashMap<String, String>();
+        formatClassNames.put(INPUT_FORMAT_TEXT, "org.apache.hadoop.mapred.TextInputFormat");
+        formatClassNames.put(INPUT_FORMAT_SEQUENCE, "org.apache.hadoop.mapred.SequenceFileInputFormat");
+        return formatClassNames;
+    }
 
     @Override
-    public IDatasourceAdapter createAdapter(Map<String, String> configuration, IAType atype) throws Exception {
-        HDFSAdapter hdfsAdapter = new HDFSAdapter(atype);
+    public IDatasourceAdapter createAdapter(Map<String, Object> configuration, IAType atype) throws Exception {
+        if (!setup) {
+            /** set up the factory --serializable stuff --- this if-block should be called only once for each factory instance */
+            configureJobConf(configuration);
+            JobConf conf = configureJobConf(configuration);
+            confFactory = new ConfFactory(conf);
+
+            clusterLocations = (AlgebricksPartitionConstraint) configuration.get(CLUSTER_LOCATIONS);
+            int numPartitions = ((AlgebricksAbsolutePartitionConstraint) clusterLocations).getLocations().length;
+
+            InputSplit[] inputSplits = conf.getInputFormat().getSplits(conf, numPartitions);
+            inputSplitsFactory = new InputSplitsFactory(inputSplits);
+
+            Scheduler scheduler = (Scheduler) configuration.get(SCHEDULER);
+            readSchedule = scheduler.getLocationConstraints(inputSplits);
+            executed = new boolean[readSchedule.length];
+            Arrays.fill(executed, false);
+
+            setup = true;
+        }
+        JobConf conf = confFactory.getConf();
+        InputSplit[] inputSplits = inputSplitsFactory.getSplits();
+        HDFSAdapter hdfsAdapter = new HDFSAdapter(atype, readSchedule, executed, inputSplits, conf, clusterLocations);
         hdfsAdapter.configure(configuration);
         return hdfsAdapter;
     }
@@ -39,4 +96,15 @@
         return HDFS_ADAPTER_NAME;
     }
 
+    private JobConf configureJobConf(Map<String, Object> configuration) throws Exception {
+        JobConf conf = new JobConf();
+        conf.set("fs.default.name", ((String) configuration.get(KEY_HDFS_URL)).trim());
+        conf.set("fs.hdfs.impl", "org.apache.hadoop.hdfs.DistributedFileSystem");
+        conf.setClassLoader(HDFSAdapter.class.getClassLoader());
+        conf.set("mapred.input.dir", ((String) configuration.get(KEY_PATH)).trim());
+        conf.set("mapred.input.format.class",
+                (String) formatClassNames.get(((String) configuration.get(KEY_INPUT_FORMAT)).trim()));
+        return conf;
+    }
+
 }
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/HiveAdapterFactory.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/HiveAdapterFactory.java
index 5e28eed..ba6ade5 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/HiveAdapterFactory.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/HiveAdapterFactory.java
@@ -14,20 +14,92 @@
  */
 package edu.uci.ics.asterix.external.adapter.factory;
 
+import java.util.Arrays;
+import java.util.HashMap;
 import java.util.Map;
 
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+
+import edu.uci.ics.asterix.external.dataset.adapter.HDFSAdapter;
 import edu.uci.ics.asterix.external.dataset.adapter.HiveAdapter;
 import edu.uci.ics.asterix.external.dataset.adapter.IDatasourceAdapter;
 import edu.uci.ics.asterix.om.types.IAType;
+import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
+import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
+import edu.uci.ics.hyracks.hdfs.dataflow.ConfFactory;
+import edu.uci.ics.hyracks.hdfs.dataflow.InputSplitsFactory;
+import edu.uci.ics.hyracks.hdfs.scheduler.Scheduler;
 
 /**
  * A factory class for creating an instance of HiveAdapter
  */
+@SuppressWarnings("deprecation")
 public class HiveAdapterFactory implements IGenericDatasetAdapterFactory {
+    private static final long serialVersionUID = 1L;
+
+    public static final String HDFS_ADAPTER_NAME = "hdfs";
+    public static final String CLUSTER_LOCATIONS = "cluster-locations";
+    public static transient String SCHEDULER = "hdfs-scheduler";
+
+    public static final String KEY_HDFS_URL = "hdfs";
+    public static final String KEY_PATH = "path";
+    public static final String KEY_INPUT_FORMAT = "input-format";
+    public static final String INPUT_FORMAT_TEXT = "text-input-format";
+    public static final String INPUT_FORMAT_SEQUENCE = "sequence-input-format";
+
+    public static final String KEY_FORMAT = "format";
+    public static final String KEY_PARSER_FACTORY = "parser";
+    public static final String FORMAT_DELIMITED_TEXT = "delimited-text";
+    public static final String FORMAT_ADM = "adm";
+
+    public static final String HIVE_DATABASE = "database";
+    public static final String HIVE_TABLE = "table";
+    public static final String HIVE_HOME = "hive-home";
+    public static final String HIVE_METASTORE_URI = "metastore-uri";
+    public static final String HIVE_WAREHOUSE_DIR = "warehouse-dir";
+    public static final String HIVE_METASTORE_RAWSTORE_IMPL = "rawstore-impl";
+
+    private String[] readSchedule;
+    private boolean executed[];
+    private InputSplitsFactory inputSplitsFactory;
+    private ConfFactory confFactory;
+    private transient AlgebricksPartitionConstraint clusterLocations;
+    private boolean setup = false;
+
+    private static final Map<String, String> formatClassNames = initInputFormatMap();
+
+    private static Map<String, String> initInputFormatMap() {
+        Map<String, String> formatClassNames = new HashMap<String, String>();
+        formatClassNames.put(INPUT_FORMAT_TEXT, "org.apache.hadoop.mapred.TextInputFormat");
+        formatClassNames.put(INPUT_FORMAT_SEQUENCE, "org.apache.hadoop.mapred.SequenceFileInputFormat");
+        return formatClassNames;
+    }
 
     @Override
-    public IDatasourceAdapter createAdapter(Map<String, String> configuration, IAType type) throws Exception {
-        HiveAdapter hiveAdapter = new HiveAdapter(type);
+    public IDatasourceAdapter createAdapter(Map<String, Object> configuration, IAType atype) throws Exception {
+        if (!setup) {
+            /** set up the factory --serializable stuff --- this if-block should be called only once for each factory instance */
+            configureJobConf(configuration);
+            JobConf conf = configureJobConf(configuration);
+            confFactory = new ConfFactory(conf);
+
+            clusterLocations = (AlgebricksPartitionConstraint) configuration.get(CLUSTER_LOCATIONS);
+            int numPartitions = ((AlgebricksAbsolutePartitionConstraint) clusterLocations).getLocations().length;
+
+            InputSplit[] inputSplits = conf.getInputFormat().getSplits(conf, numPartitions);
+            inputSplitsFactory = new InputSplitsFactory(inputSplits);
+
+            Scheduler scheduler = (Scheduler) configuration.get(SCHEDULER);
+            readSchedule = scheduler.getLocationConstraints(inputSplits);
+            executed = new boolean[readSchedule.length];
+            Arrays.fill(executed, false);
+
+            setup = true;
+        }
+        JobConf conf = confFactory.getConf();
+        InputSplit[] inputSplits = inputSplitsFactory.getSplits();
+        HiveAdapter hiveAdapter = new HiveAdapter(atype, readSchedule, executed, inputSplits, conf, clusterLocations);
         hiveAdapter.configure(configuration);
         return hiveAdapter;
     }
@@ -36,4 +108,37 @@
     public String getName() {
         return "hive";
     }
+
+    private JobConf configureJobConf(Map<String, Object> configuration) throws Exception {
+        JobConf conf = new JobConf();
+
+        /** configure hive */
+        String database = (String) configuration.get(HIVE_DATABASE);
+        String tablePath = null;
+        if (database == null) {
+            tablePath = configuration.get(HIVE_WAREHOUSE_DIR) + "/" + configuration.get(HIVE_TABLE);
+        } else {
+            tablePath = configuration.get(HIVE_WAREHOUSE_DIR) + "/" + tablePath + ".db" + "/"
+                    + configuration.get(HIVE_TABLE);
+        }
+        configuration.put(HDFSAdapter.KEY_PATH, tablePath);
+        if (!configuration.get(KEY_FORMAT).equals(FORMAT_DELIMITED_TEXT)) {
+            throw new IllegalArgumentException("format" + configuration.get(KEY_FORMAT) + " is not supported");
+        }
+
+        if (!(configuration.get(HDFSAdapterFactory.KEY_INPUT_FORMAT).equals(HDFSAdapterFactory.INPUT_FORMAT_TEXT) || configuration
+                .get(HDFSAdapterFactory.KEY_INPUT_FORMAT).equals(HDFSAdapterFactory.INPUT_FORMAT_SEQUENCE))) {
+            throw new IllegalArgumentException("file input format"
+                    + configuration.get(HDFSAdapterFactory.KEY_INPUT_FORMAT) + " is not supported");
+        }
+
+        /** configure hdfs */
+        conf.set("fs.default.name", ((String) configuration.get(KEY_HDFS_URL)).trim());
+        conf.set("fs.hdfs.impl", "org.apache.hadoop.hdfs.DistributedFileSystem");
+        conf.setClassLoader(HDFSAdapter.class.getClassLoader());
+        conf.set("mapred.input.dir", ((String) configuration.get(KEY_PATH)).trim());
+        conf.set("mapred.input.format.class",
+                (String) formatClassNames.get(((String) configuration.get(KEY_INPUT_FORMAT)).trim()));
+        return conf;
+    }
 }
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/IAdapterFactory.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/IAdapterFactory.java
index 45fd6cf..697c8ea 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/IAdapterFactory.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/IAdapterFactory.java
@@ -14,12 +14,14 @@
  */
 package edu.uci.ics.asterix.external.adapter.factory;
 
+import java.io.Serializable;
+
 /**
  * Base interface for IGenericDatasetAdapterFactory and ITypedDatasetAdapterFactory.
  * Acts as a marker interface indicating that the implementation provides functionality
  * for creating an adapter.
  */
-public interface IAdapterFactory {
+public interface IAdapterFactory extends Serializable {
 
     /**
      * Returns the display name corresponding to the Adapter type that is created by the factory.
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/IGenericDatasetAdapterFactory.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/IGenericDatasetAdapterFactory.java
index 093a3dd..e8d120a 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/IGenericDatasetAdapterFactory.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/IGenericDatasetAdapterFactory.java
@@ -38,6 +38,6 @@
      * @return An instance of IDatasourceAdapter.
      * @throws Exception
      */
-    public IDatasourceAdapter createAdapter(Map<String, String> configuration, IAType atype) throws Exception;
+    public IDatasourceAdapter createAdapter(Map<String, Object> configuration, IAType atype) throws Exception;
 
 }
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/ITypedDatasetAdapterFactory.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/ITypedDatasetAdapterFactory.java
index 0f9978e..84a5ca8 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/ITypedDatasetAdapterFactory.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/ITypedDatasetAdapterFactory.java
@@ -33,6 +33,6 @@
      * @return An instance of IDatasourceAdapter.
      * @throws Exception
      */
-    public IDatasourceAdapter createAdapter(Map<String, String> configuration) throws Exception;
+    public IDatasourceAdapter createAdapter(Map<String, Object> configuration) throws Exception;
 
 }
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/NCFileSystemAdapterFactory.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/NCFileSystemAdapterFactory.java
index 2040949..659fd23 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/NCFileSystemAdapterFactory.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/NCFileSystemAdapterFactory.java
@@ -26,11 +26,11 @@
  * an NC.
  */
 public class NCFileSystemAdapterFactory implements IGenericDatasetAdapterFactory {
-
+    private static final long serialVersionUID = 1L;
     public static final String NC_FILE_SYSTEM_ADAPTER_NAME = "localfs";
 
     @Override
-    public IDatasourceAdapter createAdapter(Map<String, String> configuration, IAType atype) throws Exception {
+    public IDatasourceAdapter createAdapter(Map<String, Object> configuration, IAType atype) throws Exception {
         NCFileSystemAdapter fsAdapter = new NCFileSystemAdapter(atype);
         fsAdapter.configure(configuration);
         return fsAdapter;
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/PullBasedTwitterAdapterFactory.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/PullBasedTwitterAdapterFactory.java
index bc00469..e63be17 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/PullBasedTwitterAdapterFactory.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/PullBasedTwitterAdapterFactory.java
@@ -25,11 +25,11 @@
  * via pull-based Twitter API.
  */
 public class PullBasedTwitterAdapterFactory implements ITypedDatasetAdapterFactory {
-
+    private static final long serialVersionUID = 1L;
     public static final String PULL_BASED_TWITTER_ADAPTER_NAME = "pull_twitter";
 
     @Override
-    public IDatasourceAdapter createAdapter(Map<String, String> configuration) throws Exception {
+    public IDatasourceAdapter createAdapter(Map<String, Object> configuration) throws Exception {
         PullBasedTwitterAdapter twitterAdapter = new PullBasedTwitterAdapter();
         twitterAdapter.configure(configuration);
         return twitterAdapter;
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/RSSFeedAdapterFactory.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/RSSFeedAdapterFactory.java
index bbbea38..3cd22e8 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/RSSFeedAdapterFactory.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/RSSFeedAdapterFactory.java
@@ -24,11 +24,11 @@
  * RSSFeedAdapter provides the functionality of fetching an RSS based feed.
  */
 public class RSSFeedAdapterFactory implements ITypedDatasetAdapterFactory {
-
+    private static final long serialVersionUID = 1L;
     public static final String RSS_FEED_ADAPTER_NAME = "rss_feed";
 
     @Override
-    public IDatasourceAdapter createAdapter(Map<String, String> configuration) throws Exception {
+    public IDatasourceAdapter createAdapter(Map<String, Object> configuration) throws Exception {
         RSSFeedAdapter rssFeedAdapter = new RSSFeedAdapter();
         rssFeedAdapter.configure(configuration);
         return rssFeedAdapter;
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/data/operator/ExternalDataScanOperatorDescriptor.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/data/operator/ExternalDataScanOperatorDescriptor.java
index fb4cc99..f9f72cf 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/data/operator/ExternalDataScanOperatorDescriptor.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/data/operator/ExternalDataScanOperatorDescriptor.java
@@ -19,8 +19,6 @@
 import edu.uci.ics.asterix.external.adapter.factory.IGenericDatasetAdapterFactory;
 import edu.uci.ics.asterix.external.dataset.adapter.IDatasourceAdapter;
 import edu.uci.ics.asterix.om.types.IAType;
-import edu.uci.ics.hyracks.api.application.ICCApplicationContext;
-import edu.uci.ics.hyracks.api.constraints.IConstraintAcceptor;
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
 import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
 import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
@@ -38,61 +36,24 @@
 
     private static final long serialVersionUID = 1L;
 
-    private final String adapterFactory;
-    private final Map<String, String> adapterConfiguration;
+    private final Map<String, Object> adapterConfiguration;
     private final IAType atype;
     private IGenericDatasetAdapterFactory datasourceAdapterFactory;
 
-    public ExternalDataScanOperatorDescriptor(JobSpecification spec, String adapter, Map<String, String> arguments,
-            IAType atype, RecordDescriptor rDesc) {
+    public ExternalDataScanOperatorDescriptor(JobSpecification spec, Map<String, Object> arguments, IAType atype,
+            RecordDescriptor rDesc, IGenericDatasetAdapterFactory dataSourceAdapterFactory) {
         super(spec, 0, 1);
         recordDescriptors[0] = rDesc;
-        this.adapterFactory = adapter;
         this.adapterConfiguration = arguments;
         this.atype = atype;
-    }
-
-    @Override
-    public void contributeSchedulingConstraints(IConstraintAcceptor constraintAcceptor, ICCApplicationContext appCtx) {
-
-        /*
-        Comment: The following code is commented out. This is because constraints are being set at compile time so that they can 
-        be propagated to upstream Asterix operators. Hyracks has to provide a way to propagate constraints to upstream operators.
-        Once it is there, we will uncomment the following code.  
-        
-        AlgebricksPartitionConstraint constraint = datasourceReadAdapter.getPartitionConstraint();
-        switch (constraint.getPartitionConstraintType()) {
-            case ABSOLUTE:
-                String[] locations = ((AlgebricksAbsolutePartitionConstraint) constraint).getLocations();
-                for (int i = 0; i < locations.length; ++i) {
-                    constraintAcceptor.addConstraint(new Constraint(new PartitionLocationExpression(this.odId, i),
-                            new ConstantExpression(locations[i])));
-                }
-                constraintAcceptor.addConstraint(new Constraint(new PartitionCountExpression(this.odId),
-                        new ConstantExpression(locations.length)));
-
-                break;
-            case COUNT:
-                constraintAcceptor.addConstraint(new Constraint(new PartitionCountExpression(this.odId),
-                        new ConstantExpression(((AlgebricksCountPartitionConstraint) constraint).getCount())));
-                break;
-            default:
-                throw new IllegalStateException(" Constraint type :" + constraint.getPartitionConstraintType()
-                        + " not supported");
-
-        }*/
-
+        this.datasourceAdapterFactory = dataSourceAdapterFactory;
     }
 
     @Override
     public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
             IRecordDescriptorProvider recordDescProvider, final int partition, int nPartitions)
             throws HyracksDataException {
-        try {
-            datasourceAdapterFactory = (IGenericDatasetAdapterFactory) Class.forName(adapterFactory).newInstance();
-        } catch (Exception e) {
-            throw new HyracksDataException("initialization of adapter failed", e);
-        }
+
         return new AbstractUnaryOutputSourceOperatorNodePushable() {
             @Override
             public void initialize() throws HyracksDataException {
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/data/operator/FeedIntakeOperatorDescriptor.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/data/operator/FeedIntakeOperatorDescriptor.java
index 2da4e76..f07168a 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/data/operator/FeedIntakeOperatorDescriptor.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/data/operator/FeedIntakeOperatorDescriptor.java
@@ -35,52 +35,46 @@
  * Operator responsible for ingesting data from an external source. This
  * operator uses a (configurable) adapter associated with the feed dataset.
  */
-public class FeedIntakeOperatorDescriptor extends
-		AbstractSingleActivityOperatorDescriptor {
+public class FeedIntakeOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor {
 
-	private static final long serialVersionUID = 1L;
+    private static final long serialVersionUID = 1L;
 
-	private final String adapterFactoryClassName;
-	private final Map<String, String> adapterConfiguration;
-	private final IAType atype;
-	private final FeedId feedId;
+    private final String adapterFactoryClassName;
+    private final Map<String, Object> adapterConfiguration;
+    private final IAType atype;
+    private final FeedId feedId;
+    private final IAdapterFactory datasourceAdapterFactory;
 
-	private transient IAdapterFactory datasourceAdapterFactory;
+    public FeedIntakeOperatorDescriptor(JobSpecification spec, FeedId feedId, String adapter,
+            Map<String, Object> arguments, ARecordType atype, RecordDescriptor rDesc,
+            IAdapterFactory datasourceAdapterFactory) {
+        super(spec, 1, 1);
+        recordDescriptors[0] = rDesc;
+        this.adapterFactoryClassName = adapter;
+        this.adapterConfiguration = arguments;
+        this.atype = atype;
+        this.feedId = feedId;
+        this.datasourceAdapterFactory = datasourceAdapterFactory;
+    }
 
-	public FeedIntakeOperatorDescriptor(JobSpecification spec, FeedId feedId,
-			String adapter, Map<String, String> arguments, ARecordType atype,
-			RecordDescriptor rDesc) {
-		super(spec, 1, 1);
-		recordDescriptors[0] = rDesc;
-		this.adapterFactoryClassName = adapter;
-		this.adapterConfiguration = arguments;
-		this.atype = atype;
-		this.feedId = feedId;
-	}
-
-	public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx,
-			IRecordDescriptorProvider recordDescProvider, final int partition,
-			int nPartitions) throws HyracksDataException {
-		ITypedDatasourceAdapter adapter;
-		try {
-			datasourceAdapterFactory = (IAdapterFactory) Class.forName(
-					adapterFactoryClassName).newInstance();
-			if (datasourceAdapterFactory instanceof IGenericDatasetAdapterFactory) {
-				adapter = (ITypedDatasourceAdapter) ((IGenericDatasetAdapterFactory) datasourceAdapterFactory)
-						.createAdapter(adapterConfiguration, atype);
-			} else if (datasourceAdapterFactory instanceof ITypedDatasetAdapterFactory) {
-				adapter = (ITypedDatasourceAdapter) ((ITypedDatasetAdapterFactory) datasourceAdapterFactory)
-						.createAdapter(adapterConfiguration);
-			} else {
-				throw new IllegalStateException(
-						" Unknown adapter factory type for "
-								+ adapterFactoryClassName);
-			}
-			adapter.initialize(ctx);
-		} catch (Exception e) {
-			throw new HyracksDataException("initialization of adapter failed",
-					e);
-		}
-		return new FeedIntakeOperatorNodePushable(feedId, adapter, partition);
-	}
+    public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx,
+            IRecordDescriptorProvider recordDescProvider, final int partition, int nPartitions)
+            throws HyracksDataException {
+        ITypedDatasourceAdapter adapter;
+        try {
+            if (datasourceAdapterFactory instanceof IGenericDatasetAdapterFactory) {
+                adapter = (ITypedDatasourceAdapter) ((IGenericDatasetAdapterFactory) datasourceAdapterFactory)
+                        .createAdapter(adapterConfiguration, atype);
+            } else if (datasourceAdapterFactory instanceof ITypedDatasetAdapterFactory) {
+                adapter = (ITypedDatasourceAdapter) ((ITypedDatasetAdapterFactory) datasourceAdapterFactory)
+                        .createAdapter(adapterConfiguration);
+            } else {
+                throw new IllegalStateException(" Unknown adapter factory type for " + adapterFactoryClassName);
+            }
+            adapter.initialize(ctx);
+        } catch (Exception e) {
+            throw new HyracksDataException("initialization of adapter failed", e);
+        }
+        return new FeedIntakeOperatorNodePushable(feedId, adapter, partition);
+    }
 }
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/data/operator/FeedIntakeOperatorNodePushable.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/data/operator/FeedIntakeOperatorNodePushable.java
index d0dbb98..fd40b03 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/data/operator/FeedIntakeOperatorNodePushable.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/data/operator/FeedIntakeOperatorNodePushable.java
@@ -17,6 +17,7 @@
 import java.nio.ByteBuffer;
 import java.util.concurrent.LinkedBlockingQueue;
 
+import edu.uci.ics.asterix.common.api.AsterixThreadExecutor;
 import edu.uci.ics.asterix.external.dataset.adapter.IDatasourceAdapter;
 import edu.uci.ics.asterix.external.feed.lifecycle.AlterFeedMessage;
 import edu.uci.ics.asterix.external.feed.lifecycle.FeedId;
@@ -51,7 +52,7 @@
     public void open() throws HyracksDataException {
         if (adapter instanceof IManagedFeedAdapter) {
             feedInboxMonitor = new FeedInboxMonitor((IManagedFeedAdapter) adapter, inbox, partition);
-            feedInboxMonitor.start();
+            AsterixThreadExecutor.INSTANCE.execute(feedInboxMonitor);
             feedManager.registerFeedMsgQueue(feedId, inbox);
         }
         writer.open();
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/AbstractDatasourceAdapter.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/AbstractDatasourceAdapter.java
index 440ee8c..23e545d 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/AbstractDatasourceAdapter.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/AbstractDatasourceAdapter.java
@@ -36,7 +36,7 @@
 
     private static final long serialVersionUID = 1L;
 
-    protected Map<String, String> configuration;
+    protected Map<String, Object> configuration;
     protected transient AlgebricksPartitionConstraint partitionConstraint;
     protected IAType atype;
     protected IHyracksTaskContext ctx;
@@ -51,15 +51,15 @@
         typeToValueParserFactMap.put(ATypeTag.STRING, UTF8StringParserFactory.INSTANCE);
     }
 
-    protected static final Map<String, String> formatToParserFactoryMap = initializeFormatParserFactoryMap();
+    protected static final Map<String, Object> formatToParserFactoryMap = initializeFormatParserFactoryMap();
 
     public static final String KEY_FORMAT = "format";
     public static final String KEY_PARSER_FACTORY = "parser";
     public static final String FORMAT_DELIMITED_TEXT = "delimited-text";
     public static final String FORMAT_ADM = "adm";
 
-    private static Map<String, String> initializeFormatParserFactoryMap() {
-        Map<String, String> map = new HashMap<String, String>();
+    private static Map<String, Object> initializeFormatParserFactoryMap() {
+        Map<String, Object> map = new HashMap<String, Object>();
         map.put(FORMAT_DELIMITED_TEXT, "edu.uci.ics.asterix.runtime.operators.file.NtDelimitedDataTupleParserFactory");
         map.put(FORMAT_ADM, "edu.uci.ics.asterix.runtime.operators.file.AdmSchemafullRecordParserFactory");
         return map;
@@ -77,7 +77,7 @@
      * @param attribute
      *            The attribute whose value needs to be obtained.
      */
-    public String getAdapterProperty(String attribute) {
+    public Object getAdapterProperty(String attribute) {
         return configuration.get(attribute);
     }
 
@@ -86,7 +86,7 @@
      * 
      * @return A Map<String,String> instance representing the adapter configuration.
      */
-    public Map<String, String> getConfiguration() {
+    public Map<String, Object> getConfiguration() {
         return configuration;
     }
 
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/CNNFeedAdapter.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/CNNFeedAdapter.java
index 3898f7e..8976f7a 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/CNNFeedAdapter.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/CNNFeedAdapter.java
@@ -70,9 +70,9 @@
     }
 
     @Override
-    public void configure(Map<String, String> arguments) throws Exception {
+    public void configure(Map<String, Object> arguments) throws Exception {
         configuration = arguments;
-        String rssURLProperty = configuration.get(KEY_RSS_URL);
+        String rssURLProperty = (String) configuration.get(KEY_RSS_URL);
         if (rssURLProperty == null) {
             throw new IllegalArgumentException("no rss url provided");
         }
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/FileSystemBasedAdapter.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/FileSystemBasedAdapter.java
index 9f8cedc..8ab252d 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/FileSystemBasedAdapter.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/FileSystemBasedAdapter.java
@@ -69,7 +69,7 @@
     public abstract void initialize(IHyracksTaskContext ctx) throws Exception;
 
     @Override
-    public abstract void configure(Map<String, String> arguments) throws Exception;
+    public abstract void configure(Map<String, Object> arguments) throws Exception;
 
     @Override
     public abstract AdapterType getAdapterType();
@@ -82,14 +82,14 @@
     }
 
     protected void configureFormat() throws Exception {
-        String parserFactoryClassname = configuration.get(KEY_PARSER_FACTORY);
+        String parserFactoryClassname = (String) configuration.get(KEY_PARSER_FACTORY);
         if (parserFactoryClassname == null) {
-            String specifiedFormat = configuration.get(KEY_FORMAT);
+            String specifiedFormat = (String) configuration.get(KEY_FORMAT);
             if (specifiedFormat == null) {
                 throw new IllegalArgumentException(" Unspecified data format");
             } else if (FORMAT_DELIMITED_TEXT.equalsIgnoreCase(specifiedFormat)) {
                 parserFactory = getDelimitedDataTupleParserFactory((ARecordType) atype);
-            } else if (FORMAT_ADM.equalsIgnoreCase(configuration.get(KEY_FORMAT))) {
+            } else if (FORMAT_ADM.equalsIgnoreCase((String)configuration.get(KEY_FORMAT))) {
                 parserFactory = getADMDataTupleParserFactory((ARecordType) atype);
             } else {
                 throw new IllegalArgumentException(" format " + configuration.get(KEY_FORMAT) + " not supported");
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/HDFSAdapter.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/HDFSAdapter.java
index 1e05b2f..02eacf5 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/HDFSAdapter.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/HDFSAdapter.java
@@ -14,18 +14,9 @@
  */
 package edu.uci.ics.asterix.external.dataset.adapter;
 
-import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.InputStream;
-import java.net.UnknownHostException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
-import java.util.Random;
-import java.util.Set;
-import java.util.logging.Level;
-import java.util.logging.Logger;
 
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapred.Counters.Counter;
@@ -37,14 +28,9 @@
 import org.apache.hadoop.mapred.TextInputFormat;
 
 import edu.uci.ics.asterix.om.types.IAType;
-import edu.uci.ics.asterix.om.util.AsterixRuntimeUtil;
-import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
-import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksCountPartitionConstraint;
 import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
 import edu.uci.ics.hyracks.algebricks.common.exceptions.NotImplementedException;
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.dataflow.hadoop.util.InputSplitsProxy;
 
 /**
  * Provides functionality for fetching external data stored in an HDFS instance.
@@ -53,118 +39,29 @@
 public class HDFSAdapter extends FileSystemBasedAdapter {
 
     private static final long serialVersionUID = 1L;
-    private static final Logger LOGGER = Logger.getLogger(HDFSAdapter.class.getName());
 
-    public static final String KEY_HDFS_URL = "hdfs";
-    public static final String KEY_INPUT_FORMAT = "input-format";
-    public static final String INPUT_FORMAT_TEXT = "text-input-format";
-    public static final String INPUT_FORMAT_SEQUENCE = "sequence-input-format";
-
-    private Object[] inputSplits;
+    private transient String[] readSchedule;
+    private transient boolean executed[];
+    private transient InputSplit[] inputSplits;
     private transient JobConf conf;
-    private InputSplitsProxy inputSplitsProxy;
-    private static final Map<String, String> formatClassNames = initInputFormatMap();
+    private transient AlgebricksPartitionConstraint clusterLocations;
 
-    private static Map<String, String> initInputFormatMap() {
-        Map<String, String> formatClassNames = new HashMap<String, String>();
-        formatClassNames.put(INPUT_FORMAT_TEXT, "org.apache.hadoop.mapred.TextInputFormat");
-        formatClassNames.put(INPUT_FORMAT_SEQUENCE, "org.apache.hadoop.mapred.SequenceFileInputFormat");
-        return formatClassNames;
-    }
+    private transient String nodeName;
 
-    public HDFSAdapter(IAType atype) {
+    public HDFSAdapter(IAType atype, String[] readSchedule, boolean[] executed, InputSplit[] inputSplits, JobConf conf,
+            AlgebricksPartitionConstraint clusterLocations) {
         super(atype);
+        this.readSchedule = readSchedule;
+        this.executed = executed;
+        this.inputSplits = inputSplits;
+        this.conf = conf;
+        this.clusterLocations = clusterLocations;
     }
 
     @Override
-    public void configure(Map<String, String> arguments) throws Exception {
-        configuration = arguments;
+    public void configure(Map<String, Object> arguments) throws Exception {
+        this.configuration = arguments;
         configureFormat();
-        configureJobConf();
-        configureSplits();
-    }
-
-    private void configureSplits() throws IOException {
-        if (inputSplitsProxy == null) {
-            inputSplits = conf.getInputFormat().getSplits(conf, 0);
-        }
-        inputSplitsProxy = new InputSplitsProxy(conf, inputSplits);
-    }
-
-    private void configurePartitionConstraint() throws Exception {
-        List<String> locations = new ArrayList<String>();
-        Random random = new Random();
-        boolean couldConfigureLocationConstraints = false;
-        try {
-            Map<String, Set<String>> nodeControllers = AsterixRuntimeUtil.getNodeControllerMap();
-            for (Object inputSplit : inputSplits) {
-                String[] dataNodeLocations = ((InputSplit) inputSplit).getLocations();
-                if (dataNodeLocations == null || dataNodeLocations.length == 0) {
-                    throw new IllegalArgumentException("No datanode locations found: check hdfs path");
-                }
-
-                // loop over all replicas until a split location coincides
-                // with an asterix datanode location
-                for (String datanodeLocation : dataNodeLocations) {
-                    Set<String> nodeControllersAtLocation = null;
-                    try {
-                        nodeControllersAtLocation = nodeControllers.get(AsterixRuntimeUtil
-                                .getIPAddress(datanodeLocation));
-                    } catch (UnknownHostException uhe) {
-                        if (LOGGER.isLoggable(Level.WARNING)) {
-                            LOGGER.log(Level.WARNING, "Unknown host :" + datanodeLocation);
-                        }
-                        continue;
-                    }
-                    if (nodeControllersAtLocation == null || nodeControllersAtLocation.size() == 0) {
-                        if (LOGGER.isLoggable(Level.WARNING)) {
-                            LOGGER.log(Level.WARNING, "No node controller found at " + datanodeLocation
-                                    + " will look at replica location");
-                        }
-                        couldConfigureLocationConstraints = false;
-                    } else {
-                        int locationIndex = random.nextInt(nodeControllersAtLocation.size());
-                        String chosenLocation = (String) nodeControllersAtLocation.toArray()[locationIndex];
-                        locations.add(chosenLocation);
-                        if (LOGGER.isLoggable(Level.INFO)) {
-                            LOGGER.log(Level.INFO, "split : " + inputSplit + " to be processed by :" + chosenLocation);
-                        }
-                        couldConfigureLocationConstraints = true;
-                        break;
-                    }
-                }
-
-                /* none of the replica locations coincides with an Asterix
-                   node controller location.
-                */
-                if (!couldConfigureLocationConstraints) {
-                    List<String> allNodeControllers = AsterixRuntimeUtil.getAllNodeControllers();
-                    int locationIndex = random.nextInt(allNodeControllers.size());
-                    String chosenLocation = allNodeControllers.get(locationIndex);
-                    locations.add(chosenLocation);
-                    if (LOGGER.isLoggable(Level.SEVERE)) {
-                        LOGGER.log(Level.SEVERE, "No local node controller found to process split : " + inputSplit
-                                + " will be processed by a remote node controller:" + chosenLocation);
-                    }
-                }
-            }
-            partitionConstraint = new AlgebricksAbsolutePartitionConstraint(locations.toArray(new String[] {}));
-        } catch (Exception e) {
-            if (LOGGER.isLoggable(Level.SEVERE)) {
-                LOGGER.log(Level.SEVERE, "Encountered exception :" + e + " using count constraints");
-            }
-            partitionConstraint = new AlgebricksCountPartitionConstraint(inputSplits.length);
-        }
-    }
-
-    private JobConf configureJobConf() throws Exception {
-        conf = new JobConf();
-        conf.set("fs.default.name", configuration.get(KEY_HDFS_URL).trim());
-        conf.set("fs.hdfs.impl", "org.apache.hadoop.hdfs.DistributedFileSystem");
-        conf.setClassLoader(HDFSAdapter.class.getClassLoader());
-        conf.set("mapred.input.dir", configuration.get(KEY_PATH).trim());
-        conf.set("mapred.input.format.class", formatClassNames.get(configuration.get(KEY_INPUT_FORMAT).trim()));
-        return conf;
     }
 
     public AdapterType getAdapterType() {
@@ -174,7 +71,7 @@
     @Override
     public void initialize(IHyracksTaskContext ctx) throws Exception {
         this.ctx = ctx;
-        inputSplits = inputSplitsProxy.toInputSplits(conf);
+        this.nodeName = ctx.getJobletContext().getApplicationContext().getNodeId();
     }
 
     private Reporter getReporter() {
@@ -215,98 +112,124 @@
         return reporter;
     }
 
-    @SuppressWarnings("unchecked")
     @Override
     public InputStream getInputStream(int partition) throws IOException {
-        try {
-            InputStream inputStream;
-            if (conf.getInputFormat() instanceof SequenceFileInputFormat) {
-                SequenceFileInputFormat format = (SequenceFileInputFormat) conf.getInputFormat();
-                RecordReader reader = format.getRecordReader(
-                        (org.apache.hadoop.mapred.FileSplit) inputSplits[partition], conf, getReporter());
-                inputStream = new HDFSStream(reader, ctx);
-            } else {
-                try {
+
+        return new InputStream() {
+
+            private RecordReader<Object, Text> reader;
+            private Object key;
+            private Text value;
+            private boolean hasMore = false;
+            private int EOL = "\n".getBytes()[0];
+            private Text pendingValue = null;
+            private int currentSplitIndex = 0;
+
+            @SuppressWarnings("unchecked")
+            private boolean moveToNext() throws IOException {
+                for (; currentSplitIndex < inputSplits.length; currentSplitIndex++) {
+                    /**
+                     * read all the partitions scheduled to the current node
+                     */
+                    if (readSchedule[currentSplitIndex].equals(nodeName)) {
+                        /**
+                         * pick an unread split to read
+                         * synchronize among simultaneous partitions in the same machine
+                         */
+                        synchronized (executed) {
+                            if (executed[currentSplitIndex] == false) {
+                                executed[currentSplitIndex] = true;
+                            } else {
+                                continue;
+                            }
+                        }
+
+                        /**
+                         * read the split
+                         */
+                        reader = getRecordReader(currentSplitIndex);
+                        key = reader.createKey();
+                        value = (Text) reader.createValue();
+                        return true;
+                    }
+                }
+                return false;
+            }
+
+            @Override
+            public int read(byte[] buffer, int offset, int len) throws IOException {
+                if (reader == null) {
+                    if (!moveToNext()) {
+                        //nothing to read
+                        return -1;
+                    }
+                }
+
+                int numBytes = 0;
+                if (pendingValue != null) {
+                    System.arraycopy(pendingValue.getBytes(), 0, buffer, offset + numBytes, pendingValue.getLength());
+                    buffer[offset + numBytes + pendingValue.getLength()] = (byte) EOL;
+                    numBytes += pendingValue.getLength() + 1;
+                    pendingValue = null;
+                }
+
+                while (numBytes < len) {
+                    hasMore = reader.next(key, value);
+                    if (!hasMore) {
+                        while (moveToNext()) {
+                            hasMore = reader.next(key, value);
+                            if (hasMore) {
+                                //move to the next non-empty split
+                                break;
+                            }
+                        }
+                    }
+                    if (!hasMore) {
+                        return (numBytes == 0) ? -1 : numBytes;
+                    }
+                    int sizeOfNextTuple = value.getLength() + 1;
+                    if (numBytes + sizeOfNextTuple > len) {
+                        // cannot add tuple to current buffer
+                        // but the reader has moved pass the fetched tuple
+                        // we need to store this for a subsequent read call.
+                        // and return this then.
+                        pendingValue = value;
+                        break;
+                    } else {
+                        System.arraycopy(value.getBytes(), 0, buffer, offset + numBytes, value.getLength());
+                        buffer[offset + numBytes + value.getLength()] = (byte) EOL;
+                        numBytes += sizeOfNextTuple;
+                    }
+                }
+                return numBytes;
+            }
+
+            @Override
+            public int read() throws IOException {
+                throw new NotImplementedException("Use read(byte[], int, int");
+            }
+
+            private RecordReader getRecordReader(int slitIndex) throws IOException {
+                if (conf.getInputFormat() instanceof SequenceFileInputFormat) {
+                    SequenceFileInputFormat format = (SequenceFileInputFormat) conf.getInputFormat();
+                    RecordReader reader = format.getRecordReader(
+                            (org.apache.hadoop.mapred.FileSplit) inputSplits[slitIndex], conf, getReporter());
+                    return reader;
+                } else {
                     TextInputFormat format = (TextInputFormat) conf.getInputFormat();
                     RecordReader reader = format.getRecordReader(
-                            (org.apache.hadoop.mapred.FileSplit) inputSplits[partition], conf, getReporter());
-                    inputStream = new HDFSStream(reader, ctx);
-                } catch (FileNotFoundException e) {
-                    throw new HyracksDataException(e);
+                            (org.apache.hadoop.mapred.FileSplit) inputSplits[slitIndex], conf, getReporter());
+                    return reader;
                 }
             }
-            return inputStream;
-        } catch (Exception e) {
-            throw new IOException(e);
-        }
+
+        };
 
     }
 
     @Override
     public AlgebricksPartitionConstraint getPartitionConstraint() throws Exception {
-        if (partitionConstraint == null) {
-            configurePartitionConstraint();
-        }
-        return partitionConstraint;
-    }
-
-}
-
-class HDFSStream extends InputStream {
-
-    private RecordReader<Object, Text> reader;
-    private final Object key;
-    private final Text value;
-    private boolean hasMore = false;
-    private static final int EOL = "\n".getBytes()[0];
-    private Text pendingValue = null;
-
-    public HDFSStream(RecordReader<Object, Text> reader, IHyracksTaskContext ctx) throws Exception {
-        this.reader = reader;
-        key = reader.createKey();
-        try {
-            value = (Text) reader.createValue();
-        } catch (ClassCastException cce) {
-            throw new Exception("value is not of type org.apache.hadoop.io.Text"
-                    + " type not supported in sequence file format", cce);
-        }
-    }
-
-    @Override
-    public int read(byte[] buffer, int offset, int len) throws IOException {
-        int numBytes = 0;
-        if (pendingValue != null) {
-            System.arraycopy(pendingValue.getBytes(), 0, buffer, offset + numBytes, pendingValue.getLength());
-            buffer[offset + numBytes + pendingValue.getLength()] = (byte) EOL;
-            numBytes += pendingValue.getLength() + 1;
-            pendingValue = null;
-        }
-
-        while (numBytes < len) {
-            hasMore = reader.next(key, value);
-            if (!hasMore) {
-                return (numBytes == 0) ? -1 : numBytes;
-            }
-            int sizeOfNextTuple = value.getLength() + 1;
-            if (numBytes + sizeOfNextTuple > len) {
-                // cannot add tuple to current buffer
-                // but the reader has moved pass the fetched tuple
-                // we need to store this for a subsequent read call.
-                // and return this then.
-                pendingValue = value;
-                break;
-            } else {
-                System.arraycopy(value.getBytes(), 0, buffer, offset + numBytes, value.getLength());
-                buffer[offset + numBytes + value.getLength()] = (byte) EOL;
-                numBytes += sizeOfNextTuple;
-            }
-        }
-        return numBytes;
-    }
-
-    @Override
-    public int read() throws IOException {
-        throw new NotImplementedException("Use read(byte[], int, int");
+        return clusterLocations;
     }
 
 }
\ No newline at end of file
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/HiveAdapter.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/HiveAdapter.java
index 3731eba..5e48834 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/HiveAdapter.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/HiveAdapter.java
@@ -16,6 +16,9 @@
 
 import java.util.Map;
 
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+
 import edu.uci.ics.asterix.om.types.IAType;
 import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
 import edu.uci.ics.hyracks.api.comm.IFrameWriter;
@@ -24,6 +27,7 @@
 /**
  * Provides the functionality of fetching data in form of ADM records from a Hive dataset.
  */
+@SuppressWarnings("deprecation")
 public class HiveAdapter extends AbstractDatasourceAdapter {
 
     private static final long serialVersionUID = 1L;
@@ -37,8 +41,9 @@
 
     private HDFSAdapter hdfsAdapter;
 
-    public HiveAdapter(IAType atype) {
-        this.hdfsAdapter = new HDFSAdapter(atype);
+    public HiveAdapter(IAType atype, String[] readSchedule, boolean[] executed, InputSplit[] inputSplits, JobConf conf,
+            AlgebricksPartitionConstraint clusterLocations) {
+        this.hdfsAdapter = new HDFSAdapter(atype, readSchedule, executed, inputSplits, conf, clusterLocations);
         this.atype = atype;
     }
 
@@ -48,33 +53,9 @@
     }
 
     @Override
-    public void configure(Map<String, String> arguments) throws Exception {
-        configuration = arguments;
-        configureHadoopAdapter();
-    }
-
-    private void configureHadoopAdapter() throws Exception {
-        String database = configuration.get(HIVE_DATABASE);
-        String tablePath = null;
-        if (database == null) {
-            tablePath = configuration.get(HIVE_WAREHOUSE_DIR) + "/" + configuration.get(HIVE_TABLE);
-        } else {
-            tablePath = configuration.get(HIVE_WAREHOUSE_DIR) + "/" + tablePath + ".db" + "/"
-                    + configuration.get(HIVE_TABLE);
-        }
-        configuration.put(HDFSAdapter.KEY_PATH, tablePath);
-        if (!configuration.get(KEY_FORMAT).equals(FORMAT_DELIMITED_TEXT)) {
-            throw new IllegalArgumentException("format" + configuration.get(KEY_FORMAT) + " is not supported");
-        }
-
-        if (!(configuration.get(HDFSAdapter.KEY_INPUT_FORMAT).equals(HDFSAdapter.INPUT_FORMAT_TEXT) || configuration
-                .get(HDFSAdapter.KEY_INPUT_FORMAT).equals(HDFSAdapter.INPUT_FORMAT_SEQUENCE))) {
-            throw new IllegalArgumentException("file input format" + configuration.get(HDFSAdapter.KEY_INPUT_FORMAT)
-                    + " is not supported");
-        }
-
-        hdfsAdapter = new HDFSAdapter(atype);
-        hdfsAdapter.configure(configuration);
+    public void configure(Map<String, Object> arguments) throws Exception {
+        this.configuration = arguments;
+        this.hdfsAdapter.configure(arguments);
     }
 
     @Override
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/IDatasourceAdapter.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/IDatasourceAdapter.java
index b0dc32f..031f34f 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/IDatasourceAdapter.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/IDatasourceAdapter.java
@@ -70,7 +70,7 @@
      * @return String the value corresponding to the configuration parameter
      *         represented by the key- attributeKey.
      */
-    public String getAdapterProperty(String propertyKey);
+    public Object getAdapterProperty(String propertyKey);
 
     /**
      * Configures the IDatasourceAdapter instance.
@@ -100,7 +100,7 @@
      *            providing all arguments as a set of (key,value) pairs. These
      *            arguments are put into the metadata.
      */
-    public void configure(Map<String, String> arguments) throws Exception;
+    public void configure(Map<String, Object> arguments) throws Exception;
 
     /**
      * Returns a list of partition constraints. A partition constraint can be a
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/NCFileSystemAdapter.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/NCFileSystemAdapter.java
index ef39d45..9abc92a 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/NCFileSystemAdapter.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/NCFileSystemAdapter.java
@@ -44,9 +44,9 @@
     }
 
     @Override
-    public void configure(Map<String, String> arguments) throws Exception {
+    public void configure(Map<String, Object> arguments) throws Exception {
         this.configuration = arguments;
-        String[] splits = arguments.get(KEY_PATH).split(",");
+        String[] splits = ((String) arguments.get(KEY_PATH)).split(",");
         configureFileSplits(splits);
         configureFormat();
     }
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedTwitterAdapter.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedTwitterAdapter.java
index ebfbcad..66d9f98 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedTwitterAdapter.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedTwitterAdapter.java
@@ -31,9 +31,8 @@
  */
 public class PullBasedTwitterAdapter extends PullBasedAdapter implements IManagedFeedAdapter {
 
-   
     private static final long serialVersionUID = 1L;
-    
+
     public static final String QUERY = "query";
     public static final String INTERVAL = "interval";
 
@@ -49,7 +48,7 @@
     }
 
     @Override
-    public void configure(Map<String, String> arguments) throws Exception {
+    public void configure(Map<String, Object> arguments) throws Exception {
         configuration = arguments;
         String[] fieldNames = { "id", "username", "location", "text", "timestamp" };
         IAType[] fieldTypes = { BuiltinType.ASTRING, BuiltinType.ASTRING, BuiltinType.ASTRING, BuiltinType.ASTRING,
@@ -69,12 +68,12 @@
     }
 
     @Override
-    public void stop()  {
+    public void stop() {
         tweetClient.stop();
     }
 
     @Override
-    public void alter(Map<String, String> properties)  {
+    public void alter(Map<String, String> properties) {
         alterRequested = true;
         this.alteredParams = properties;
     }
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedTwitterFeedClient.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedTwitterFeedClient.java
index 2a07472..06cddfd 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedTwitterFeedClient.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedTwitterFeedClient.java
@@ -63,8 +63,8 @@
         tupleFieldValues = new String[recordType.getFieldNames().length];
     }
 
-    public void initialize(Map<String, String> params) {
-        this.keywords = params.get(PullBasedTwitterAdapter.QUERY);
+    public void initialize(Map<String, Object> params) {
+        this.keywords = (String) params.get(PullBasedTwitterAdapter.QUERY);
         this.query = new Query(keywords);
         query.setRpp(100);
     }
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/RSSFeedAdapter.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/RSSFeedAdapter.java
index 611183c..ccd6516 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/RSSFeedAdapter.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/RSSFeedAdapter.java
@@ -72,9 +72,9 @@
     }
 
     @Override
-    public void configure(Map<String, String> arguments) throws Exception {
+    public void configure(Map<String, Object> arguments) throws Exception {
         configuration = arguments;
-        String rssURLProperty = configuration.get(KEY_RSS_URL);
+        String rssURLProperty = (String) configuration.get(KEY_RSS_URL);
         if (rssURLProperty == null) {
             throw new IllegalArgumentException("no rss url provided");
         }
@@ -94,7 +94,7 @@
     }
 
     protected void reconfigure(Map<String, String> arguments) {
-        String rssURLProperty = configuration.get(KEY_RSS_URL);
+        String rssURLProperty = (String) configuration.get(KEY_RSS_URL);
         if (rssURLProperty != null) {
             initializeFeedURLs(rssURLProperty);
         }
diff --git a/asterix-installer/src/main/java/edu/uci/ics/asterix/installer/driver/InstallerUtil.java b/asterix-installer/src/main/java/edu/uci/ics/asterix/installer/driver/InstallerUtil.java
index 2b884ef..abf0420 100644
--- a/asterix-installer/src/main/java/edu/uci/ics/asterix/installer/driver/InstallerUtil.java
+++ b/asterix-installer/src/main/java/edu/uci/ics/asterix/installer/driver/InstallerUtil.java
@@ -49,6 +49,7 @@
 
 import edu.uci.ics.asterix.common.configuration.AsterixConfiguration;
 import edu.uci.ics.asterix.common.configuration.Store;
+import edu.uci.ics.asterix.common.configuration.Coredump;
 import edu.uci.ics.asterix.event.driver.EventDriver;
 import edu.uci.ics.asterix.event.management.EventrixClient;
 import edu.uci.ics.asterix.event.management.EventUtil;
@@ -223,6 +224,14 @@
         }
         configuration.setStore(stores);
 
+        List<Coredump> coredump = new ArrayList<Coredump>();
+        String coredumpDir = null;
+        for (Node node : cluster.getNode()) {
+            coredumpDir = node.getLogDir() == null ? cluster.getLogDir() : node.getLogDir();
+            coredump.add(new Coredump(asterixInstanceName + "_" + node.getId(), coredumpDir));
+        }
+        configuration.setCoredump(coredump);
+
         File asterixConfDir = new File(InstallerDriver.getAsterixDir() + File.separator + asterixInstanceName);
         asterixConfDir.mkdirs();
 
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataManager.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataManager.java
index f9f5260..c33d130 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataManager.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataManager.java
@@ -43,38 +43,29 @@
  * received from the metadata node, to avoid contacting the metadata node
  * repeatedly. We assume that this metadata manager is the only metadata manager
  * in an Asterix cluster. Therefore, no separate cache-invalidation mechanism is
- * needed at this point.
- * Assumptions/Limitations:
- * The metadata subsystem is started during NC Bootstrap start, i.e., when
- * Asterix is deployed.
- * The metadata subsystem is destroyed in NC Bootstrap end, i.e., when Asterix
- * is undeployed.
- * The metadata subsystem consists of the MetadataManager and the MatadataNode.
- * The MetadataManager provides users access to the metadata.
- * The MetadataNode implements direct access to the storage layer on behalf of
- * the MetadataManager, and translates the binary representation of ADM into
- * Java objects for consumption by the MetadataManager's users.
- * There is exactly one instance of the MetadataManager and of the MetadataNode
- * in the cluster, which may or may not be co-located on the same machine (or in
- * the same JVM).
- * The MetadataManager exists in the same JVM as its user's (e.g., the query
- * compiler).
- * The MetadataNode exists in the same JVM as it's transactional components
- * (LockManager, LogManager, etc.)
- * Users shall access the metadata only through the MetadataManager, and never
- * via the MetadataNode directly.
+ * needed at this point. Assumptions/Limitations: The metadata subsystem is
+ * started during NC Bootstrap start, i.e., when Asterix is deployed. The
+ * metadata subsystem is destroyed in NC Bootstrap end, i.e., when Asterix is
+ * undeployed. The metadata subsystem consists of the MetadataManager and the
+ * MatadataNode. The MetadataManager provides users access to the metadata. The
+ * MetadataNode implements direct access to the storage layer on behalf of the
+ * MetadataManager, and translates the binary representation of ADM into Java
+ * objects for consumption by the MetadataManager's users. There is exactly one
+ * instance of the MetadataManager and of the MetadataNode in the cluster, which
+ * may or may not be co-located on the same machine (or in the same JVM). The
+ * MetadataManager exists in the same JVM as its user's (e.g., the query
+ * compiler). The MetadataNode exists in the same JVM as it's transactional
+ * components (LockManager, LogManager, etc.) Users shall access the metadata
+ * only through the MetadataManager, and never via the MetadataNode directly.
  * Multiple threads may issue requests to the MetadataManager concurrently. For
  * the sake of accessing metadata, we assume a transaction consists of one
- * thread.
- * Users are responsible for locking the metadata (using the MetadataManager
- * API) before issuing requests.
- * The MetadataNode is responsible for acquiring finer-grained locks on behalf
- * of requests from the MetadataManager. Currently, locks are acquired per
- * BTree, since the BTree does not acquire even finer-grained locks yet
- * internally.
- * The metadata can be queried with AQL DML like any other dataset, but can only
- * be changed with AQL DDL.
- * The transaction ids for metadata transactions must be unique across the
+ * thread. Users are responsible for locking the metadata (using the
+ * MetadataManager API) before issuing requests. The MetadataNode is responsible
+ * for acquiring finer-grained locks on behalf of requests from the
+ * MetadataManager. Currently, locks are acquired per BTree, since the BTree
+ * does not acquire even finer-grained locks yet internally. The metadata can be
+ * queried with AQL DML like any other dataset, but can only be changed with AQL
+ * DDL. The transaction ids for metadata transactions must be unique across the
  * cluster, i.e., metadata transaction ids shall never "accidentally" overlap
  * with transaction ids of regular jobs or other metadata transactions.
  */
@@ -220,7 +211,7 @@
 
     @Override
     public void addDataset(MetadataTransactionContext ctx, Dataset dataset) throws MetadataException {
-        // add dataset into metadataNode 
+        // add dataset into metadataNode
         try {
             metadataNode.addDataset(ctx.getJobId(), dataset);
         } catch (RemoteException e) {
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java
index 117f492..29bafa9 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java
@@ -34,6 +34,7 @@
 import edu.uci.ics.asterix.common.transactions.IResourceManager.ResourceType;
 import edu.uci.ics.asterix.common.transactions.JobId;
 import edu.uci.ics.asterix.dataflow.data.nontagged.valueproviders.AqlPrimitiveValueProviderFactory;
+import edu.uci.ics.asterix.external.adapter.factory.HDFSAdapterFactory;
 import edu.uci.ics.asterix.external.adapter.factory.IAdapterFactory;
 import edu.uci.ics.asterix.external.adapter.factory.IGenericDatasetAdapterFactory;
 import edu.uci.ics.asterix.external.adapter.factory.ITypedDatasetAdapterFactory;
@@ -101,6 +102,7 @@
 import edu.uci.ics.hyracks.algebricks.runtime.base.IPushRuntimeFactory;
 import edu.uci.ics.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
 import edu.uci.ics.hyracks.algebricks.runtime.operators.std.SinkWriterRuntimeFactory;
+import edu.uci.ics.hyracks.api.context.ICCContext;
 import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
 import edu.uci.ics.hyracks.api.dataflow.value.ILinearizeComparatorFactory;
@@ -119,6 +121,7 @@
 import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
 import edu.uci.ics.hyracks.dataflow.std.file.ITupleParserFactory;
 import edu.uci.ics.hyracks.dataflow.std.result.ResultWriterOperatorDescriptor;
+import edu.uci.ics.hyracks.hdfs.scheduler.Scheduler;
 import edu.uci.ics.hyracks.storage.am.btree.dataflow.BTreeSearchOperatorDescriptor;
 import edu.uci.ics.hyracks.storage.am.btree.frames.BTreeNSMInteriorFrameFactory;
 import edu.uci.ics.hyracks.storage.am.common.api.IPrimitiveValueProviderFactory;
@@ -157,6 +160,7 @@
     private final AsterixStorageProperties storageProperties;
 
     private static final Map<String, String> adapterFactoryMapping = initializeAdapterFactoryMapping();
+    private static Scheduler hdfsScheduler;
 
     public String getPropertyValue(String propertyName) {
         return config.get(propertyName);
@@ -178,6 +182,16 @@
         this.defaultDataverse = defaultDataverse;
         this.stores = AsterixAppContextInfo.getInstance().getMetadataProperties().getStores();
         this.storageProperties = AsterixAppContextInfo.getInstance().getStorageProperties();
+        ICCContext ccContext = AsterixAppContextInfo.getInstance().getCCApplicationContext().getCCContext();
+        try {
+            if (hdfsScheduler == null) {
+                //set the singleton hdfs scheduler
+                hdfsScheduler = new Scheduler(ccContext.getClusterControllerInfo().getClientNetAddress(), ccContext
+                        .getClusterControllerInfo().getClientNetPort());
+            }
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
     }
 
     public void setJobId(JobId jobId) {
@@ -343,8 +357,8 @@
                 adapterFactory = (IGenericDatasetAdapterFactory) Class.forName(adapterFactoryClassname).newInstance();
             }
 
-            adapter = ((IGenericDatasetAdapterFactory) adapterFactory).createAdapter(datasetDetails.getProperties(),
-                    itemType);
+            adapter = ((IGenericDatasetAdapterFactory) adapterFactory).createAdapter(
+                    wrapProperties(datasetDetails.getProperties()), itemType);
         } catch (AlgebricksException ae) {
             throw ae;
         } catch (Exception e) {
@@ -362,7 +376,7 @@
         RecordDescriptor scannerDesc = new RecordDescriptor(new ISerializerDeserializer[] { payloadSerde });
 
         ExternalDataScanOperatorDescriptor dataScanner = new ExternalDataScanOperatorDescriptor(jobSpec,
-                adapterFactoryClassname, datasetDetails.getProperties(), rt, scannerDesc);
+                wrapPropertiesEmpty(datasetDetails.getProperties()), rt, scannerDesc, adapterFactory);
 
         AlgebricksPartitionConstraint constraint;
         try {
@@ -427,14 +441,15 @@
             }
 
             if (adapterFactory instanceof ITypedDatasetAdapterFactory) {
-                adapter = ((ITypedDatasetAdapterFactory) adapterFactory).createAdapter(datasetDetails.getProperties());
+                adapter = ((ITypedDatasetAdapterFactory) adapterFactory).createAdapter(wrapProperties(datasetDetails
+                        .getProperties()));
                 adapterOutputType = ((ITypedDatasourceAdapter) adapter).getAdapterOutputType();
             } else if (adapterFactory instanceof IGenericDatasetAdapterFactory) {
                 String outputTypeName = datasetDetails.getProperties().get(IGenericDatasetAdapterFactory.KEY_TYPE_NAME);
                 adapterOutputType = MetadataManager.INSTANCE.getDatatype(mdTxnCtx, dataset.getDataverseName(),
                         outputTypeName).getDatatype();
                 adapter = ((IGenericDatasetAdapterFactory) adapterFactory).createAdapter(
-                        datasetDetails.getProperties(), adapterOutputType);
+                        wrapProperties(datasetDetails.getProperties()), adapterOutputType);
             } else {
                 throw new IllegalStateException(" Unknown factory type for " + adapterFactoryClassname);
             }
@@ -451,7 +466,8 @@
 
         FeedIntakeOperatorDescriptor feedIngestor = new FeedIntakeOperatorDescriptor(jobSpec, new FeedId(
                 dataset.getDataverseName(), dataset.getDatasetName()), adapterFactoryClassname,
-                datasetDetails.getProperties(), (ARecordType) adapterOutputType, feedDesc);
+                this.wrapPropertiesEmpty(datasetDetails.getProperties()), (ARecordType) adapterOutputType, feedDesc,
+                adapterFactory);
 
         AlgebricksPartitionConstraint constraint = null;
         try {
@@ -1484,4 +1500,32 @@
         return FormatUtils.getDefaultFormat();
     }
 
+    /**
+     * Add HDFS scheduler and the cluster location constraint into the scheduler
+     * 
+     * @param properties
+     *            the original dataset properties
+     * @return a new map containing the original dataset properties and the scheduler/locations
+     */
+    private Map<String, Object> wrapProperties(Map<String, String> properties) {
+        Map<String, Object> wrappedProperties = new HashMap<String, Object>();
+        wrappedProperties.putAll(properties);
+        wrappedProperties.put(HDFSAdapterFactory.SCHEDULER, hdfsScheduler);
+        wrappedProperties.put(HDFSAdapterFactory.CLUSTER_LOCATIONS, getClusterLocations());
+        return wrappedProperties;
+    }
+
+    /**
+     * Adapt the original properties to a string-object map
+     * 
+     * @param properties
+     *            the original properties
+     * @return the new stirng-object map
+     */
+    private Map<String, Object> wrapPropertiesEmpty(Map<String, String> properties) {
+        Map<String, Object> wrappedProperties = new HashMap<String, Object>();
+        wrappedProperties.putAll(properties);
+        return wrappedProperties;
+    }
+
 }
diff --git a/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/RateControlledFileSystemBasedAdapter.java b/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/RateControlledFileSystemBasedAdapter.java
index 84b989d..afdf343 100644
--- a/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/RateControlledFileSystemBasedAdapter.java
+++ b/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/RateControlledFileSystemBasedAdapter.java
@@ -19,7 +19,6 @@
 import java.util.Map;
 
 import edu.uci.ics.asterix.common.exceptions.AsterixException;
-import edu.uci.ics.asterix.external.adapter.factory.IGenericDatasetAdapterFactory;
 import edu.uci.ics.asterix.external.dataset.adapter.FileSystemBasedAdapter;
 import edu.uci.ics.asterix.external.dataset.adapter.ITypedDatasourceAdapter;
 import edu.uci.ics.asterix.feed.managed.adapter.IManagedFeedAdapter;
@@ -47,48 +46,15 @@
         IManagedFeedAdapter {
 
     private static final long serialVersionUID = 1L;
+    private FileSystemBasedAdapter coreAdapter;
+    private String format;
 
-    public static final String KEY_FILE_SYSTEM = "fs";
-    public static final String LOCAL_FS = "localfs";
-    public static final String HDFS = "hdfs";
-
-    private final FileSystemBasedAdapter coreAdapter;
-    private final Map<String, String> configuration;
-    private final String fileSystem;
-    private final String format;
-
-    public RateControlledFileSystemBasedAdapter(ARecordType atype, Map<String, String> configuration) throws Exception {
+    public RateControlledFileSystemBasedAdapter(ARecordType atype, Map<String, Object> configuration,
+            FileSystemBasedAdapter coreAdapter, String format) throws Exception {
         super(atype);
-        checkRequiredArgs(configuration);
-        fileSystem = configuration.get(KEY_FILE_SYSTEM);
-        String adapterFactoryClass = null;
-        if (fileSystem.equalsIgnoreCase(LOCAL_FS)) {
-            adapterFactoryClass = "edu.uci.ics.asterix.external.adapter.factory.NCFileSystemAdapterFactory";
-        } else if (fileSystem.equals(HDFS)) {
-            adapterFactoryClass = "edu.uci.ics.asterix.external.adapter.factory.HDFSAdapterFactory";
-        } else {
-            throw new AsterixException("Unsupported file system type " + fileSystem);
-        }
-        format = configuration.get(KEY_FORMAT);
-        IGenericDatasetAdapterFactory adapterFactory = (IGenericDatasetAdapterFactory) Class.forName(
-                adapterFactoryClass).newInstance();
-        coreAdapter = (FileSystemBasedAdapter) adapterFactory.createAdapter(configuration, atype);
         this.configuration = configuration;
-    }
-
-    private void checkRequiredArgs(Map<String, String> configuration) throws Exception {
-        if (configuration.get(KEY_FILE_SYSTEM) == null) {
-            throw new Exception("File system type not specified. (fs=?) File system could be 'localfs' or 'hdfs'");
-        }
-        if (configuration.get(IGenericDatasetAdapterFactory.KEY_TYPE_NAME) == null) {
-            throw new Exception("Record type not specified (output-type-name=?)");
-        }
-        if (configuration.get(KEY_PATH) == null) {
-            throw new Exception("File path not specified (path=?)");
-        }
-        if (configuration.get(KEY_FORMAT) == null) {
-            throw new Exception("File format not specified (format=?)");
-        }
+        this.coreAdapter = coreAdapter;
+        this.format = format;
     }
 
     @Override
@@ -103,7 +69,7 @@
     }
 
     @Override
-    public void configure(Map<String, String> arguments) throws Exception {
+    public void configure(Map<String, Object> arguments) throws Exception {
         coreAdapter.configure(arguments);
     }
 
@@ -189,16 +155,16 @@
 
     private final ARecordType recordType;
     private final IDataParser dataParser;
-    private final Map<String, String> configuration;
+    private final Map<String, Object> configuration;
 
     public RateControlledTupleParserFactory(ARecordType recordType, IValueParserFactory[] valueParserFactories,
-            char fieldDelimiter, Map<String, String> configuration) {
+            char fieldDelimiter, Map<String, Object> configuration) {
         this.recordType = recordType;
         dataParser = new DelimitedDataParser(recordType, valueParserFactories, fieldDelimiter);
         this.configuration = configuration;
     }
 
-    public RateControlledTupleParserFactory(ARecordType recordType, Map<String, String> configuration) {
+    public RateControlledTupleParserFactory(ARecordType recordType, Map<String, Object> configuration) {
         this.recordType = recordType;
         dataParser = new ADMDataParser();
         this.configuration = configuration;
@@ -221,10 +187,10 @@
     public static final String INTER_TUPLE_INTERVAL = "tuple-interval";
 
     public RateControlledTupleParser(IHyracksTaskContext ctx, ARecordType recType, IDataParser dataParser,
-            Map<String, String> configuration) {
+            Map<String, Object> configuration) {
         super(ctx, recType);
         this.dataParser = dataParser;
-        String propValue = configuration.get(INTER_TUPLE_INTERVAL);
+        String propValue = (String) configuration.get(INTER_TUPLE_INTERVAL);
         if (propValue != null) {
             interTupleInterval = Long.parseLong(propValue);
         } else {
diff --git a/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/RateControlledFileSystemBasedAdapterFactory.java b/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/RateControlledFileSystemBasedAdapterFactory.java
index 6c32acb..bf1c268 100644
--- a/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/RateControlledFileSystemBasedAdapterFactory.java
+++ b/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/RateControlledFileSystemBasedAdapterFactory.java
@@ -16,7 +16,9 @@
 
 import java.util.Map;
 
+import edu.uci.ics.asterix.common.exceptions.AsterixException;
 import edu.uci.ics.asterix.external.adapter.factory.IGenericDatasetAdapterFactory;
+import edu.uci.ics.asterix.external.dataset.adapter.FileSystemBasedAdapter;
 import edu.uci.ics.asterix.external.dataset.adapter.IDatasourceAdapter;
 import edu.uci.ics.asterix.om.types.ARecordType;
 import edu.uci.ics.asterix.om.types.IAType;
@@ -28,10 +30,37 @@
  * source file has been ingested.
  */
 public class RateControlledFileSystemBasedAdapterFactory implements IGenericDatasetAdapterFactory {
+    private static final long serialVersionUID = 1L;
+    
+    public static final String KEY_FILE_SYSTEM = "fs";
+    public static final String LOCAL_FS = "localfs";
+    public static final String HDFS = "hdfs";
+    public static final String KEY_PATH = "path";
+    public static final String KEY_FORMAT = "format";
+
+    private IGenericDatasetAdapterFactory adapterFactory;
+    private String format;
+    private boolean setup = false;
 
     @Override
-    public IDatasourceAdapter createAdapter(Map<String, String> configuration, IAType type) throws Exception {
-        return new RateControlledFileSystemBasedAdapter((ARecordType) type, configuration);
+    public IDatasourceAdapter createAdapter(Map<String, Object> configuration, IAType type) throws Exception {
+        if (!setup) {
+            checkRequiredArgs(configuration);
+            String fileSystem = (String) configuration.get(KEY_FILE_SYSTEM);
+            String adapterFactoryClass = null;
+            if (fileSystem.equalsIgnoreCase(LOCAL_FS)) {
+                adapterFactoryClass = "edu.uci.ics.asterix.external.adapter.factory.NCFileSystemAdapterFactory";
+            } else if (fileSystem.equals(HDFS)) {
+                adapterFactoryClass = "edu.uci.ics.asterix.external.adapter.factory.HDFSAdapterFactory";
+            } else {
+                throw new AsterixException("Unsupported file system type " + fileSystem);
+            }
+            format = (String) configuration.get(KEY_FORMAT);
+            adapterFactory = (IGenericDatasetAdapterFactory) Class.forName(adapterFactoryClass).newInstance();
+            setup = true;
+        }
+        return new RateControlledFileSystemBasedAdapter((ARecordType) type, configuration,
+                (FileSystemBasedAdapter) adapterFactory.createAdapter(configuration, type), format);
     }
 
     @Override
@@ -39,4 +68,19 @@
         return "file_feed";
     }
 
+    private void checkRequiredArgs(Map<String, Object> configuration) throws Exception {
+        if (configuration.get(KEY_FILE_SYSTEM) == null) {
+            throw new Exception("File system type not specified. (fs=?) File system could be 'localfs' or 'hdfs'");
+        }
+        if (configuration.get(IGenericDatasetAdapterFactory.KEY_TYPE_NAME) == null) {
+            throw new Exception("Record type not specified (output-type-name=?)");
+        }
+        if (configuration.get(KEY_PATH) == null) {
+            throw new Exception("File path not specified (path=?)");
+        }
+        if (configuration.get(KEY_FORMAT) == null) {
+            throw new Exception("File format not specified (format=?)");
+        }
+    }
+
 }
\ No newline at end of file
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/DatasetLockInfo.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/DatasetLockInfo.java
index d5e525a..d583352 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/DatasetLockInfo.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/DatasetLockInfo.java
@@ -437,6 +437,20 @@
 
         return s.toString();
     }
+    
+    public String coreDump() {
+        StringBuilder sb = new StringBuilder();
+        sb.append("\n\t firstUpgrader: " + firstUpgrader);
+        sb.append("\n\t firstWaiter: " + firstWaiter);
+        sb.append("\n\t lastHolder: " + lastHolder);
+        sb.append("\n\t ISCount: " + ISCount);
+        sb.append("\n\t IXCount: " + IXCount);
+        sb.append("\n\t SCount: " + SCount);
+        sb.append("\n\t XCount: " + XCount);
+        sb.append("\n\t entityResourceHT");
+        sb.append(entityResourceHT.prettyPrint());
+        return sb.toString();
+    }
 
     /////////////////////////////////////////////////////////
     //  set/get method for private variable
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/EntityInfoManager.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/EntityInfoManager.java
index 5d81e8a..206a3bf 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/EntityInfoManager.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/EntityInfoManager.java
@@ -15,6 +15,8 @@
 
 package edu.uci.ics.asterix.transaction.management.service.locking;
 
+import java.io.IOException;
+import java.io.OutputStream;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 
@@ -272,29 +274,35 @@
             if (child.isDeinitialized()) {
                 continue;
             }
-            s.append("child[" + i + "]: occupiedSlots:" + child.getNumOfOccupiedSlots());
-            s.append(" freeSlotNum:" + child.getFreeSlotNum() + "\n");
-            s.append("\tjid\t").append("did\t").append("PK\t").append("DLM\t").append("DLC\t").append("ELM\t")
-                    .append("ELC\t").append("NEA\t").append("PJR\t").append("NJR\n");
-            for (int j = 0; j < ChildEntityInfoArrayManager.NUM_OF_SLOTS; j++) {
-                s.append(j).append(": ");
-                s.append("\t" + child.getJobId(j));
-                s.append("\t" + child.getDatasetId(j));
-                s.append("\t" + child.getPKHashVal(j));
-                s.append("\t" + child.getDatasetLockMode(j));
-                s.append("\t" + child.getDatasetLockCount(j));
-                s.append("\t" + child.getEntityLockMode(j));
-                s.append("\t" + child.getEntityLockCount(j));
-                s.append("\t" + child.getNextEntityActor(j));
-                s.append("\t" + child.getPrevJobResource(j));
-                s.append("\t" + child.getNextJobResource(j));
-                //s.append("\t" + child.getNextDatasetActor(j));
-                s.append("\n");
-            }
-            s.append("\n");
+            s.append("child[" + i + "]");
+            s.append(child.prettyPrint());
         }
         return s.toString();
     }
+    
+    public void coreDump(OutputStream os) {
+        StringBuilder sb = new StringBuilder("\n\t########### EntityLockInfoManager Status #############\n");
+        int size = pArray.size();
+        ChildEntityInfoArrayManager child;
+
+        sb.append("Number of Child: " + size + "\n"); 
+        for (int i = 0; i < size; i++) {
+            try {
+                child = pArray.get(i);
+                sb.append("child[" + i + "]");
+                sb.append(child.prettyPrint());
+                
+                os.write(sb.toString().getBytes());
+            } catch (IOException e) {
+                //ignore IOException
+            }
+            sb = new StringBuilder();
+        }
+    }
+    
+    public int getShrinkTimerThreshold() {
+        return SHRINK_TIMER_THRESHOLD;
+    }
 
     public void initEntityInfo(int slotNum, int jobId, int datasetId, int PKHashVal, byte lockMode) {
         pArray.get(slotNum / ChildEntityInfoArrayManager.NUM_OF_SLOTS).initEntityInfo(
@@ -567,6 +575,29 @@
     public int getFreeSlotNum() {
         return freeSlotNum;
     }
+    
+    public String prettyPrint() {
+        StringBuilder sb = new StringBuilder();
+        sb.append("\n\toccupiedSlots:" + getNumOfOccupiedSlots());
+        sb.append("\n\tfreeSlotNum:" + getFreeSlotNum() + "\n");
+        sb.append("\tjid\t").append("did\t").append("PK\t").append("DLM\t").append("DLC\t").append("ELM\t")
+                .append("ELC\t").append("NEA\t").append("PJR\t").append("NJR\n");
+        for (int j = 0; j < ChildEntityInfoArrayManager.NUM_OF_SLOTS; j++) {
+            sb.append(j).append(": ");
+            sb.append("\t" + getJobId(j));
+            sb.append("\t" + getDatasetId(j));
+            sb.append("\t" + getPKHashVal(j));
+            sb.append("\t" + getDatasetLockMode(j));
+            sb.append("\t" + getDatasetLockCount(j));
+            sb.append("\t" + getEntityLockMode(j));
+            sb.append("\t" + getEntityLockCount(j));
+            sb.append("\t" + getNextEntityActor(j));
+            sb.append("\t" + getPrevJobResource(j));
+            sb.append("\t" + getNextJobResource(j));
+            sb.append("\n");
+        }
+        return sb.toString();
+    }
 
     //////////////////////////////////////////////////////////////////
     //   set/get method for each field of EntityInfo plus freeSlot
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/EntityLockInfoManager.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/EntityLockInfoManager.java
index ca00aa2..2fae460 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/EntityLockInfoManager.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/EntityLockInfoManager.java
@@ -15,6 +15,8 @@
 
 package edu.uci.ics.asterix.transaction.management.service.locking;
 
+import java.io.IOException;
+import java.io.OutputStream;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 
@@ -271,22 +273,35 @@
             if (child.isDeinitialized()) {
                 continue;
             }
-            s.append("child[" + i + "]: occupiedSlots:" + child.getNumOfOccupiedSlots());
-            s.append(" freeSlotNum:" + child.getFreeSlotNum() + "\n");
-            s.append("\tX\t").append("S\t").append("LH\t").append("FW\t").append("UP\n");
-            for (int j = 0; j < ChildEntityLockInfoArrayManager.NUM_OF_SLOTS; j++) {
-                s.append(j).append(": ");
-                s.append("\t" + child.getXCount(j));
-                s.append("\t" + child.getSCount(j));
-                s.append("\t" + child.getLastHolder(j));
-                s.append("\t" + child.getFirstWaiter(j));
-                s.append("\t" + child.getUpgrader(j));
-                s.append("\n");
-            }
-            s.append("\n");
+            s.append("child[" + i + "]");
+            s.append(child.prettyPrint());
         }
         return s.toString();
     }
+    
+    public void coreDump(OutputStream os) {
+        StringBuilder sb = new StringBuilder("\n\t########### EntityLockInfoManager Status #############\n");
+        int size = pArray.size();
+        ChildEntityLockInfoArrayManager child;
+
+        sb.append("Number of Child: " + size + "\n"); 
+        for (int i = 0; i < size; i++) {
+            try {
+                child = pArray.get(i);
+                sb.append("child[" + i + "]");
+                sb.append(child.prettyPrint());
+                
+                os.write(sb.toString().getBytes());
+            } catch (IOException e) {
+                //ignore IOException
+            }
+            sb = new StringBuilder();
+        }
+    }
+    
+    public int getShrinkTimerThreshold() {
+        return SHRINK_TIMER_THRESHOLD;
+    }
 
     //debugging method
     public String printWaiters(int slotNum) {
@@ -736,6 +751,23 @@
     public int getFreeSlotNum() {
         return freeSlotNum;
     }
+    
+    public String prettyPrint() {
+        StringBuilder sb = new StringBuilder();
+        sb.append("\n\toccupiedSlots:" + getNumOfOccupiedSlots());
+        sb.append("\n\tfreeSlotNum:" + getFreeSlotNum());
+        sb.append("\n\tX\t").append("S\t").append("LH\t").append("FW\t").append("UP\n");
+        for (int j = 0; j < ChildEntityLockInfoArrayManager.NUM_OF_SLOTS; j++) {
+            sb.append(j).append(": ");
+            sb.append("\t" + getXCount(j));
+            sb.append("\t" + getSCount(j));
+            sb.append("\t" + getLastHolder(j));
+            sb.append("\t" + getFirstWaiter(j));
+            sb.append("\t" + getUpgrader(j));
+            sb.append("\n");
+        }
+        return sb.toString();
+    }
 
     //////////////////////////////////////////////////////////////////
     //   set/get method for each field of EntityLockInfo plus freeSlot
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/JobInfo.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/JobInfo.java
index d846603..8bf5304 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/JobInfo.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/JobInfo.java
@@ -275,6 +275,17 @@
         }
         return s.toString();
     }
+    
+    public String coreDump() {
+        StringBuilder sb = new StringBuilder();
+        sb.append("\n\t datasetISLockHT");
+        sb.append(datasetISLockHT.prettyPrint());
+        sb.append("\n\t firstWaitingResource: " + firstWaitingResource);
+        sb.append("\n\t lastHoldingResource: " + lastHoldingResource);
+        sb.append("\n\t upgradingResource: " + upgradingResource);
+        sb.append("\n\t jobCtx.jobId: " + jobCtx.getJobId());
+        return sb.toString();
+    }
 
     /////////////////////////////////////////////////////////
     //  set/get method for private variable
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/LockManager.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/LockManager.java
index 8fb7494..cf41199 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/LockManager.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/LockManager.java
@@ -15,10 +15,13 @@
 
 package edu.uci.ics.asterix.transaction.management.service.locking;
 
+import java.io.IOException;
+import java.io.OutputStream;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.Set;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
@@ -33,6 +36,7 @@
 import edu.uci.ics.asterix.transaction.management.service.logging.LogUtil;
 import edu.uci.ics.asterix.transaction.management.service.transaction.TransactionManagementConstants.LockManagerConstants.LockMode;
 import edu.uci.ics.asterix.transaction.management.service.transaction.TransactionSubsystem;
+import edu.uci.ics.hyracks.api.lifecycle.ILifeCycleComponent;
 
 /**
  * An implementation of the ILockManager interface for the
@@ -42,7 +46,7 @@
  * @author pouria, kisskys
  */
 
-public class LockManager implements ILockManager {
+public class LockManager implements ILockManager, ILifeCycleComponent {
 
     public static final boolean IS_DEBUG_MODE = false;//true
     //This variable indicates that the dataset granule X lock request is allowed when 
@@ -2046,6 +2050,182 @@
             unlatchLockTable();
         }
     }
+
+    @Override
+    public void start() {
+        //no op
+    }
+
+    @Override
+    public void stop(boolean dumpState, OutputStream os) {
+        if (dumpState) {
+
+            //#. dump Configurable Variables
+            dumpConfVars(os);
+
+            //#. dump jobHT
+            dumpJobInfo(os);
+
+            //#. dump datasetResourceHT
+            dumpDatasetLockInfo(os);
+
+            //#. dump entityLockInfoManager
+            dumpEntityLockInfo(os);
+
+            //#. dump entityInfoManager
+            dumpEntityInfo(os);
+
+            //#. dump lockWaiterManager
+
+            dumpLockWaiterInfo(os);
+            try {
+                os.flush();
+            } catch (IOException e) {
+                //ignore
+            }
+        }
+    }
+
+    private void dumpConfVars(OutputStream os) {
+        try {
+            StringBuilder sb = new StringBuilder();
+            sb.append("\n>>dump_begin\t>>----- [ConfVars] -----");
+            sb.append("\nESCALATE_TRHESHOLD_ENTITY_TO_DATASET: " + ESCALATE_TRHESHOLD_ENTITY_TO_DATASET);
+            sb.append("\nSHRINK_TIMER_THRESHOLD (entityLockInfoManager): "
+                    + entityLockInfoManager.getShrinkTimerThreshold());
+            sb.append("\nSHRINK_TIMER_THRESHOLD (entityInfoManager): " + entityInfoManager.getShrinkTimerThreshold());
+            sb.append("\nSHRINK_TIMER_THRESHOLD (lockWaiterManager): " + lockWaiterManager.getShrinkTimerThreshold());
+            sb.append("\n>>dump_end\t>>----- [ConfVars] -----\n");
+            os.write(sb.toString().getBytes());
+        } catch (Exception e) {
+            //ignore exception and continue dumping as much as possible.
+            if (IS_DEBUG_MODE) {
+                e.printStackTrace();
+            }
+        }
+    }
+
+    private void dumpJobInfo(OutputStream os) {
+        JobId jobId;
+        JobInfo jobInfo;
+        StringBuilder sb = new StringBuilder();
+
+        try {
+            sb.append("\n>>dump_begin\t>>----- [JobInfo] -----");
+            Set<Map.Entry<JobId, JobInfo>> entrySet = jobHT.entrySet();
+            if (entrySet != null) {
+                for (Map.Entry<JobId, JobInfo> entry : entrySet) {
+                    if (entry != null) {
+                        jobId = entry.getKey();
+                        if (jobId != null) {
+                            sb.append("\n" + jobId);
+                        } else {
+                            sb.append("\nJID:null");
+                        }
+
+                        jobInfo = entry.getValue();
+                        if (jobInfo != null) {
+                            sb.append(jobInfo.coreDump());
+                        } else {
+                            sb.append("\nJobInfo:null");
+                        }
+                    }
+                }
+            }
+            sb.append("\n>>dump_end\t>>----- [JobInfo] -----\n");
+            os.write(sb.toString().getBytes());
+        } catch (Exception e) {
+            //ignore exception and continue dumping as much as possible.
+            if (IS_DEBUG_MODE) {
+                e.printStackTrace();
+            }
+        }
+    }
+
+    private void dumpDatasetLockInfo(OutputStream os) {
+        DatasetId datasetId;
+        DatasetLockInfo datasetLockInfo;
+        StringBuilder sb = new StringBuilder();
+
+        try {
+            sb.append("\n>>dump_begin\t>>----- [DatasetLockInfo] -----");
+            Set<Map.Entry<DatasetId, DatasetLockInfo>> entrySet = datasetResourceHT.entrySet();
+            if (entrySet != null) {
+                for (Map.Entry<DatasetId, DatasetLockInfo> entry : entrySet) {
+                    if (entry != null) {
+                        datasetId = entry.getKey();
+                        if (datasetId != null) {
+                            sb.append("\nDatasetId:" + datasetId.getId());
+                        } else {
+                            sb.append("\nDatasetId:null");
+                        }
+
+                        datasetLockInfo = entry.getValue();
+                        if (datasetLockInfo != null) {
+                            sb.append(datasetLockInfo.coreDump());
+                        } else {
+                            sb.append("\nDatasetLockInfo:null");
+                        }
+                    }
+                    sb.append("\n>>dump_end\t>>----- [DatasetLockInfo] -----\n");
+                    os.write(sb.toString().getBytes());
+
+                    //create a new sb to avoid possible OOM exception
+                    sb = new StringBuilder();
+                }
+            }
+        } catch (Exception e) {
+            //ignore exception and continue dumping as much as possible.
+            if (IS_DEBUG_MODE) {
+                e.printStackTrace();
+            }
+        }
+    }
+
+    private void dumpEntityLockInfo(OutputStream os) {
+        StringBuilder sb = new StringBuilder();
+        try {
+            sb.append("\n>>dump_begin\t>>----- [EntityLockInfo] -----");
+            entityLockInfoManager.coreDump(os);
+            sb.append("\n>>dump_end\t>>----- [EntityLockInfo] -----\n");
+            os.write(sb.toString().getBytes());
+        } catch (Exception e) {
+            //ignore exception and continue dumping as much as possible.
+            if (IS_DEBUG_MODE) {
+                e.printStackTrace();
+            }
+        }
+    }
+
+    private void dumpEntityInfo(OutputStream os) {
+        StringBuilder sb = new StringBuilder();
+        try {
+            sb.append("\n>>dump_begin\t>>----- [EntityInfo] -----");
+            entityInfoManager.coreDump(os);
+            sb.append("\n>>dump_end\t>>----- [EntityInfo] -----\n");
+            os.write(sb.toString().getBytes());
+        } catch (Exception e) {
+            //ignore exception and continue dumping as much as possible.
+            if (IS_DEBUG_MODE) {
+                e.printStackTrace();
+            }
+        }
+    }
+
+    private void dumpLockWaiterInfo(OutputStream os) {
+        StringBuilder sb = new StringBuilder();
+        try {
+            sb.append("\n>>dump_begin\t>>----- [LockWaiterInfo] -----");
+            lockWaiterManager.coreDump(os);
+            sb.append("\n>>dump_end\t>>----- [LockWaiterInfo] -----\n");
+            os.write(sb.toString().getBytes());
+        } catch (Exception e) {
+            //ignore exception and continue dumping as much as possible.
+            if (IS_DEBUG_MODE) {
+                e.printStackTrace();
+            }
+        }
+    }
 }
 
 class ConsecutiveWakeupContext {
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/LockWaiterManager.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/LockWaiterManager.java
index bd414de..c6fcbd5 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/LockWaiterManager.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/LockWaiterManager.java
@@ -15,6 +15,8 @@
 
 package edu.uci.ics.asterix.transaction.management.service.locking;
 
+import java.io.IOException;
+import java.io.OutputStream;
 import java.util.ArrayList;
 
 /**
@@ -258,28 +260,42 @@
         StringBuilder s = new StringBuilder("\n########### LockWaiterManager Status #############\n");
         int size = pArray.size();
         ChildLockWaiterArrayManager child;
-        LockWaiter waiter;
 
         for (int i = 0; i < size; i++) {
             child = pArray.get(i);
             if (child.isDeinitialized()) {
                 continue;
             }
-            s.append("child[" + i + "]: occupiedSlots:" + child.getNumOfOccupiedSlots());
-            s.append(" freeSlotNum:" + child.getFreeSlotNum() + "\n");
-            for (int j = 0; j < ChildLockWaiterArrayManager.NUM_OF_SLOTS; j++) {
-                waiter = child.getLockWaiter(j);
-                s.append(j).append(": ");
-                s.append("\t" + waiter.getEntityInfoSlot());
-                s.append("\t" + waiter.needWait());
-                s.append("\t" + waiter.isVictim());
-                s.append("\n");
-            }
-            s.append("\n");
+            s.append("child[" + i + "]");
+            s.append(child.prettyPrint());
         }
         return s.toString();
     }
     
+    public void coreDump(OutputStream os) {
+        StringBuilder sb = new StringBuilder("\n########### LockWaiterManager Status #############\n");
+        int size = pArray.size();
+        ChildLockWaiterArrayManager child;
+
+        sb.append("Number of Child: " + size + "\n"); 
+        for (int i = 0; i < size; i++) {
+            try {
+                child = pArray.get(i);
+                sb.append("child[" + i + "]");
+                sb.append(child.prettyPrint());
+                
+                os.write(sb.toString().getBytes());
+            } catch (IOException e) {
+                //ignore IOException
+            }
+            sb = new StringBuilder();
+        }
+    }
+    
+    public int getShrinkTimerThreshold() {
+        return SHRINK_TIMER_THRESHOLD;
+    }
+    
     public LockWaiter getLockWaiter(int slotNum) {
         return pArray.get(slotNum / ChildLockWaiterArrayManager.NUM_OF_SLOTS).getLockWaiter(
                 slotNum % ChildLockWaiterArrayManager.NUM_OF_SLOTS);
@@ -364,4 +380,20 @@
     public int getFreeSlotNum() {
         return freeSlotNum;
     }
+    
+    public String prettyPrint() {
+        LockWaiter waiter;
+        StringBuilder sb = new StringBuilder();
+        sb.append("\n\toccupiedSlots:" + getNumOfOccupiedSlots());
+        sb.append("\n\tfreeSlotNum:" + getFreeSlotNum() + "\n");
+        for (int j = 0; j < ChildLockWaiterArrayManager.NUM_OF_SLOTS; j++) {
+            waiter = getLockWaiter(j);
+            sb.append(j).append(": ");
+            sb.append("\t" + waiter.getEntityInfoSlot());
+            sb.append("\t" + waiter.needWait());
+            sb.append("\t" + waiter.isVictim());
+            sb.append("\n");
+        }
+        return sb.toString();
+    }
 }
\ No newline at end of file
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/TimeOutDetector.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/TimeOutDetector.java
index a53c890..05052f0 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/TimeOutDetector.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/TimeOutDetector.java
@@ -2,6 +2,7 @@
 
 import java.util.LinkedList;
 
+import edu.uci.ics.asterix.common.api.AsterixThreadExecutor;
 import edu.uci.ics.asterix.common.exceptions.ACIDException;
 
 /**
@@ -26,7 +27,7 @@
         this.lockMgr = lockMgr;
         this.trigger = new Thread(new TimeoutTrigger(this));
         trigger.setDaemon(true);
-        trigger.start();
+        AsterixThreadExecutor.INSTANCE.execute(trigger);
     }
 
     public void sweep() throws ACIDException {
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogManager.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogManager.java
index 318996a..205c5f2 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogManager.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogManager.java
@@ -17,6 +17,7 @@
 import java.io.File;
 import java.io.IOException;
 import java.io.InputStream;
+import java.io.OutputStream;
 import java.io.RandomAccessFile;
 import java.nio.ByteBuffer;
 import java.nio.channels.FileChannel;
@@ -31,6 +32,7 @@
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
+import edu.uci.ics.asterix.common.api.AsterixThreadExecutor;
 import edu.uci.ics.asterix.common.exceptions.ACIDException;
 import edu.uci.ics.asterix.common.transactions.FileBasedBuffer;
 import edu.uci.ics.asterix.common.transactions.FileUtil;
@@ -49,8 +51,9 @@
 import edu.uci.ics.asterix.transaction.management.service.transaction.TransactionManagementConstants;
 import edu.uci.ics.asterix.transaction.management.service.transaction.TransactionSubsystem;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.lifecycle.ILifeCycleComponent;
 
-public class LogManager implements ILogManager {
+public class LogManager implements ILogManager, ILifeCycleComponent {
 
     public static final boolean IS_DEBUG_MODE = false;//true
     private static final Logger LOGGER = Logger.getLogger(LogManager.class.getName());
@@ -185,7 +188,7 @@
          */
         logPageFlusher = new LogPageFlushThread(this);
         logPageFlusher.setDaemon(true);
-        logPageFlusher.start();
+        AsterixThreadExecutor.INSTANCE.execute(logPageFlusher);
     }
 
     public int getLogPageIndex(long lsnValue) {
@@ -758,6 +761,60 @@
 
         map.clear();
     }
+
+    @Override
+    public void start() {
+        //no op
+    }
+
+    @Override
+    public void stop(boolean dumpState, OutputStream os) {
+        if (dumpState) {
+            //#. dump Configurable Variables
+            dumpConfVars(os);
+
+            //#. dump LSNInfo
+            dumpLSNInfo(os);
+
+            try {
+                os.flush();
+            } catch (IOException e) {
+                //ignore
+            }
+        }
+    }
+
+    private void dumpConfVars(OutputStream os) {
+        try {
+            StringBuilder sb = new StringBuilder();
+            sb.append("\n>>dump_begin\t>>----- [ConfVars] -----");
+            sb.append(logManagerProperties.toString());
+            sb.append("\n>>dump_end\t>>----- [ConfVars] -----\n");
+            os.write(sb.toString().getBytes());
+        } catch (Exception e) {
+            //ignore exception and continue dumping as much as possible.
+            if (IS_DEBUG_MODE) {
+                e.printStackTrace();
+            }
+        }
+    }
+
+    private void dumpLSNInfo(OutputStream os) {
+        try {
+            StringBuilder sb = new StringBuilder();
+            sb.append("\n>>dump_begin\t>>----- [LSNInfo] -----");
+            sb.append("\nstartingLSN: " + startingLSN);
+            sb.append("\ncurrentLSN: " + lsn.get());
+            sb.append("\nlastFlushedLSN: " + lastFlushedLSN.get());
+            sb.append("\n>>dump_end\t>>----- [LSNInfo] -----\n");
+            os.write(sb.toString().getBytes());
+        } catch (Exception e) {
+            //ignore exception and continue dumping as much as possible.
+            if (IS_DEBUG_MODE) {
+                e.printStackTrace();
+            }
+        }
+    }
 }
 
 /*
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/recovery/RecoveryManager.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/recovery/RecoveryManager.java
index 50d4625..1e85c35 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/recovery/RecoveryManager.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/recovery/RecoveryManager.java
@@ -22,6 +22,7 @@
 import java.io.IOException;
 import java.io.ObjectInputStream;
 import java.io.ObjectOutputStream;
+import java.io.OutputStream;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
@@ -57,6 +58,7 @@
 import edu.uci.ics.asterix.transaction.management.service.transaction.TransactionManager;
 import edu.uci.ics.asterix.transaction.management.service.transaction.TransactionSubsystem;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.lifecycle.ILifeCycleComponent;
 import edu.uci.ics.hyracks.storage.am.btree.impls.BTree;
 import edu.uci.ics.hyracks.storage.am.common.api.IIndex;
 import edu.uci.ics.hyracks.storage.am.common.api.IIndexLifecycleManager;
@@ -83,7 +85,7 @@
  * not in place completely. Once we have physical logging implemented, we would
  * add support for crash recovery.
  */
-public class RecoveryManager implements IRecoveryManager {
+public class RecoveryManager implements IRecoveryManager, ILifeCycleComponent {
 
     public static final boolean IS_DEBUG_MODE = false;//true
     private static final Logger LOGGER = Logger.getLogger(RecoveryManager.class.getName());
@@ -430,7 +432,7 @@
         if (isSharpCheckpoint && LOGGER.isLoggable(Level.INFO)) {
             LOGGER.info("Starting sharp checkpoint ... ");
         }
-        
+
         LogManager logMgr = (LogManager) txnSubsystem.getLogManager();
         TransactionManager txnMgr = (TransactionManager) txnSubsystem.getTransactionManager();
         String logDir = logMgr.getLogManagerProperties().getLogDir();
@@ -522,7 +524,7 @@
         if (isSharpCheckpoint) {
             logMgr.renewLogFiles();
         }
-        
+
         if (isSharpCheckpoint && LOGGER.isLoggable(Level.INFO)) {
             LOGGER.info("Completed sharp checkpoint.");
         }
@@ -782,6 +784,16 @@
                     + undoCount);
         }
     }
+
+    @Override
+    public void start() {
+        //no op
+    }
+
+    @Override
+    public void stop(boolean dumpState, OutputStream os) {
+        //no op
+    }
 }
 
 class TxnId {
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionContext.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionContext.java
index 2659d8f..d4360ef 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionContext.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionContext.java
@@ -172,5 +172,15 @@
         return (o == this);
     }
 
-  
+    public String prettyPrint() {
+        StringBuilder sb = new StringBuilder();
+        sb.append("\n" + jobId + "\n");
+        sb.append("transactionType: " + transactionType);
+        sb.append("firstLogLocator: " + firstLogLocator.getLsn() + "\n");
+        sb.append("lastLogLocator: " + lastLogLocator.getLsn() + "\n");
+        sb.append("TransactionState: " + txnState + "\n");
+        sb.append("startWaitTime: " + startWaitTime + "\n");
+        sb.append("status: " + status + "\n");
+        return sb.toString();
+    }
 }
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionManager.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionManager.java
index 4e8808f..04f12ac 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionManager.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionManager.java
@@ -14,8 +14,11 @@
  */
 package edu.uci.ics.asterix.transaction.management.service.transaction;
 
+import java.io.IOException;
+import java.io.OutputStream;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.logging.Level;
 import java.util.logging.Logger;
@@ -26,15 +29,18 @@
 import edu.uci.ics.asterix.common.transactions.ITransactionManager;
 import edu.uci.ics.asterix.common.transactions.JobId;
 import edu.uci.ics.asterix.transaction.management.service.logging.LogType;
+import edu.uci.ics.hyracks.api.lifecycle.ILifeCycleComponent;
 
 /**
  * An implementation of the @see ITransactionManager interface that provides
  * implementation of APIs for governing the lifecycle of a transaction.
  */
-public class TransactionManager implements ITransactionManager {
+public class TransactionManager implements ITransactionManager, ILifeCycleComponent {
+
+    public static final boolean IS_DEBUG_MODE = false;//true
     private static final Logger LOGGER = Logger.getLogger(TransactionManager.class.getName());
     private final TransactionSubsystem transactionProvider;
-    private Map<JobId, ITransactionContext> ITransactionContextRepository = new HashMap<JobId, ITransactionContext>();
+    private Map<JobId, ITransactionContext> transactionContextRepository = new HashMap<JobId, ITransactionContext>();
     private AtomicInteger maxJobId = new AtomicInteger(0);
 
     public TransactionManager(TransactionSubsystem provider) {
@@ -61,7 +67,7 @@
             } finally {
                 txnContext.releaseResources();
                 transactionProvider.getLockManager().releaseLocks(txnContext);
-                ITransactionContextRepository.remove(txnContext.getJobId());
+                transactionContextRepository.remove(txnContext.getJobId());
                 txnContext.setTxnState(TransactionState.ABORTED);
             }
         }
@@ -72,7 +78,7 @@
         setMaxJobId(jobId.getId());
         ITransactionContext txnContext = new TransactionContext(jobId, transactionProvider);
         synchronized (this) {
-            ITransactionContextRepository.put(jobId, txnContext);
+            transactionContextRepository.put(jobId, txnContext);
         }
         return txnContext;
     }
@@ -80,13 +86,13 @@
     @Override
     public ITransactionContext getTransactionContext(JobId jobId) throws ACIDException {
         setMaxJobId(jobId.getId());
-        synchronized (ITransactionContextRepository) {
+        synchronized (transactionContextRepository) {
 
-            ITransactionContext context = ITransactionContextRepository.get(jobId);
+            ITransactionContext context = transactionContextRepository.get(jobId);
             if (context == null) {
-                context = ITransactionContextRepository.get(jobId);
+                context = transactionContextRepository.get(jobId);
                 context = new TransactionContext(jobId, transactionProvider);
-                ITransactionContextRepository.put(jobId, context);
+                transactionContextRepository.put(jobId, context);
             }
             return context;
         }
@@ -107,13 +113,13 @@
             if (PKHashVal != -1) {
                 transactionProvider.getLockManager().unlock(datasetId, PKHashVal, txnContext, true);
                 /*****************************
-                try {
-                    //decrease the transaction reference count on index
-                    txnContext.decreaseActiveTransactionCountOnIndexes();
-                } catch (HyracksDataException e) {
-                    throw new ACIDException("failed to complete index operation", e);
-                }
-                *****************************/
+                 * try {
+                 * //decrease the transaction reference count on index
+                 * txnContext.decreaseActiveTransactionCountOnIndexes();
+                 * } catch (HyracksDataException e) {
+                 * throw new ACIDException("failed to complete index operation", e);
+                 * }
+                 *****************************/
                 return;
             }
 
@@ -131,7 +137,7 @@
             } finally {
                 txnContext.releaseResources();
                 transactionProvider.getLockManager().releaseLocks(txnContext); // release
-                ITransactionContextRepository.remove(txnContext.getJobId());
+                transactionContextRepository.remove(txnContext.getJobId());
                 txnContext.setTxnState(TransactionState.COMMITTED);
             }
         }
@@ -151,12 +157,69 @@
     public TransactionSubsystem getTransactionProvider() {
         return transactionProvider;
     }
-    
+
     public void setMaxJobId(int jobId) {
         maxJobId.set(Math.max(maxJobId.get(), jobId));
     }
-    
+
     public int getMaxJobId() {
         return maxJobId.get();
     }
+
+    @Override
+    public void start() {
+        //no op
+    }
+
+    @Override
+    public void stop(boolean dumpState, OutputStream os) {
+        if (dumpState) {
+            //#. dump TxnContext
+            dumpTxnContext(os);
+
+            try {
+                os.flush();
+            } catch (IOException e) {
+                //ignore
+            }
+        }
+    }
+
+    private void dumpTxnContext(OutputStream os) {
+        JobId jobId;
+        ITransactionContext txnCtx;
+        StringBuilder sb = new StringBuilder();
+
+        try {
+            sb.append("\n>>dump_begin\t>>----- [ConfVars] -----");
+            Set<Map.Entry<JobId, ITransactionContext>> entrySet = transactionContextRepository.entrySet();
+            if (entrySet != null) {
+                for (Map.Entry<JobId, ITransactionContext> entry : entrySet) {
+                    if (entry != null) {
+                        jobId = entry.getKey();
+                        if (jobId != null) {
+                            sb.append("\n" + jobId);
+                        } else {
+                            sb.append("\nJID:null");
+                        }
+
+                        txnCtx = entry.getValue();
+                        if (txnCtx != null) {
+                            sb.append(txnCtx.prettyPrint());
+                        } else {
+                            sb.append("\nTxnCtx:null");
+                        }
+                    }
+                }
+            }
+
+            sb.append("\n>>dump_end\t>>----- [ConfVars] -----\n");
+            os.write(sb.toString().getBytes());
+        } catch (Exception e) {
+            //ignore exception and continue dumping as much as possible.
+            if (IS_DEBUG_MODE) {
+                e.printStackTrace();
+            }
+        }
+    }
 }