Merge branch 'master' into zheilbron/asterix_issue470

Conflicts:
	asterix-app/src/main/java/edu/uci/ics/asterix/api/common/AsterixAppRuntimeContext.java
	asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionContext.java
	asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionManager.java
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/api/common/AsterixAppRuntimeContext.java b/asterix-app/src/main/java/edu/uci/ics/asterix/api/common/AsterixAppRuntimeContext.java
index 4ea338c..7a9ead4 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/api/common/AsterixAppRuntimeContext.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/api/common/AsterixAppRuntimeContext.java
@@ -24,6 +24,8 @@
 import edu.uci.ics.hyracks.api.application.INCApplicationContext;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.api.io.IIOManager;
+import edu.uci.ics.hyracks.api.lifecycle.ILifeCycleComponent;
+import edu.uci.ics.hyracks.api.lifecycle.LifeCycleComponentManager;
 import edu.uci.ics.hyracks.storage.am.common.api.IIndex;
 import edu.uci.ics.hyracks.storage.am.common.api.IIndexLifecycleManager;
 import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationScheduler;
@@ -90,7 +92,7 @@
         ioManager = ncApplicationContext.getRootContext().getIOManager();
         bufferCache = new BufferCache(ioManager, allocator, prs, pcp, fileMapManager,
                 storageProperties.getBufferCachePageSize(), storageProperties.getBufferCacheNumPages(),
-                storageProperties.getBufferCacheMaxOpenFiles());
+                storageProperties.getBufferCacheMaxOpenFiles(), ncApplicationContext.getThreadFactory());
 
         lsmIOScheduler = SynchronousScheduler.INSTANCE;
         mergePolicy = new ConstantMergePolicy(storageProperties.getLSMIndexMergeThreshold(), this);
@@ -105,6 +107,14 @@
                 this);
         txnSubsystem = new TransactionSubsystem(ncApplicationContext.getNodeId(), asterixAppRuntimeContextProvider);
         isShuttingdown = false;
+
+        // The order of registration is important. The buffer cache must registered before recovery and transaction managers.
+        LifeCycleComponentManager.INSTANCE.register((ILifeCycleComponent) bufferCache);
+        LifeCycleComponentManager.INSTANCE.register((ILifeCycleComponent) indexLifecycleManager);
+        LifeCycleComponentManager.INSTANCE.register((ILifeCycleComponent) txnSubsystem.getTransactionManager());
+        LifeCycleComponentManager.INSTANCE.register((ILifeCycleComponent) txnSubsystem.getLogManager());
+        LifeCycleComponentManager.INSTANCE.register((ILifeCycleComponent) txnSubsystem.getLockManager());
+        LifeCycleComponentManager.INSTANCE.register((ILifeCycleComponent) txnSubsystem.getRecoveryManager());
     }
 
     public boolean isShuttingdown() {
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/api/http/servlet/APIServlet.java b/asterix-app/src/main/java/edu/uci/ics/asterix/api/http/servlet/APIServlet.java
index fcea1ea..bb03dc3 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/api/http/servlet/APIServlet.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/api/http/servlet/APIServlet.java
@@ -1,23 +1,20 @@
 package edu.uci.ics.asterix.api.http.servlet;
 
+import java.awt.image.BufferedImage;
 import java.io.BufferedReader;
 import java.io.IOException;
 import java.io.InputStream;
-import java.io.OutputStream;
 import java.io.InputStreamReader;
+import java.io.OutputStream;
 import java.io.PrintWriter;
-import java.io.File;
-import java.io.FileInputStream;
 import java.util.List;
 import java.util.logging.Level;
-import java.awt.image.BufferedImage;
 
+import javax.imageio.ImageIO;
 import javax.servlet.ServletContext;
 import javax.servlet.http.HttpServlet;
 import javax.servlet.http.HttpServletRequest;
 import javax.servlet.http.HttpServletResponse;
-import javax.activation.MimetypesFileTypeMap;
-import javax.imageio.ImageIO;
 
 import edu.uci.ics.asterix.api.common.APIFramework.DisplayFormat;
 import edu.uci.ics.asterix.api.common.SessionConfig;
@@ -88,10 +85,7 @@
             out.println("<PRE>Duration of all jobs: " + duration + " sec</PRE>");
         } catch (ParseException | TokenMgrError | edu.uci.ics.asterix.aqlplus.parser.TokenMgrError pe) {
             GlobalConfig.ASTERIX_LOGGER.log(Level.INFO, pe.toString(), pe);
-            out.println("<pre class=\"error\">");
-            String errorMessage = ResultUtils.buildParseExceptionMessage(pe, query);
-            out.println(errorMessage);
-            out.println("</pre>");
+            ResultUtils.webUIParseExceptionHandler(out, pe, query);
         } catch (Exception e) {
             GlobalConfig.ASTERIX_LOGGER.log(Level.SEVERE, e.getMessage(), e);
             ResultUtils.webUIErrorHandler(out, e);
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/CCApplicationEntryPoint.java b/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/CCApplicationEntryPoint.java
index 5d4f6ec..9b83f00 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/CCApplicationEntryPoint.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/CCApplicationEntryPoint.java
@@ -13,6 +13,7 @@
 import edu.uci.ics.asterix.api.http.servlet.QueryResultAPIServlet;
 import edu.uci.ics.asterix.api.http.servlet.QueryStatusAPIServlet;
 import edu.uci.ics.asterix.api.http.servlet.UpdateAPIServlet;
+import edu.uci.ics.asterix.common.api.AsterixThreadFactory;
 import edu.uci.ics.asterix.common.config.AsterixExternalProperties;
 import edu.uci.ics.asterix.common.config.AsterixMetadataProperties;
 import edu.uci.ics.asterix.metadata.MetadataManager;
@@ -37,12 +38,12 @@
     @Override
     public void start(ICCApplicationContext ccAppCtx, String[] args) throws Exception {
         this.appCtx = ccAppCtx;
+
         if (LOGGER.isLoggable(Level.INFO)) {
             LOGGER.info("Starting Asterix cluster controller");
         }
-
+        appCtx.setThreadFactory(AsterixThreadFactory.INSTANCE);
         AsterixAppContextInfo.initialize(appCtx);
-
         proxy = AsterixStateProxy.registerRemoteObject();
         appCtx.setDistributedState(proxy);
 
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java b/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java
index 4569088..32bfeb5 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java
@@ -1,11 +1,15 @@
 package edu.uci.ics.asterix.hyracks.bootstrap;
 
+import java.io.File;
 import java.rmi.RemoteException;
 import java.rmi.server.UnicastRemoteObject;
+import java.util.HashMap;
+import java.util.Map;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
 import edu.uci.ics.asterix.api.common.AsterixAppRuntimeContext;
+import edu.uci.ics.asterix.common.api.AsterixThreadFactory;
 import edu.uci.ics.asterix.common.api.IAsterixAppRuntimeContext;
 import edu.uci.ics.asterix.common.config.AsterixMetadataProperties;
 import edu.uci.ics.asterix.common.config.IAsterixPropertiesProvider;
@@ -19,6 +23,7 @@
 import edu.uci.ics.asterix.transaction.management.resource.PersistentLocalResourceRepository;
 import edu.uci.ics.hyracks.api.application.INCApplicationContext;
 import edu.uci.ics.hyracks.api.application.INCApplicationEntryPoint;
+import edu.uci.ics.hyracks.api.lifecycle.LifeCycleComponentManager;
 
 public class NCApplicationEntryPoint implements INCApplicationEntryPoint {
     private static final Logger LOGGER = Logger.getLogger(NCApplicationEntryPoint.class.getName());
@@ -32,17 +37,19 @@
 
     @Override
     public void start(INCApplicationContext ncAppCtx, String[] args) throws Exception {
+        ncAppCtx.setThreadFactory(AsterixThreadFactory.INSTANCE);
         ncApplicationContext = ncAppCtx;
         nodeId = ncApplicationContext.getNodeId();
         if (LOGGER.isLoggable(Level.INFO)) {
-            LOGGER.info("Starting Asterix node controller: " + nodeId);
+            LOGGER.info("Starting Asterix node controller  TAKE NOTE: " + nodeId);
         }
+        JVMShutdownHook sHook = new JVMShutdownHook(this);
+        Runtime.getRuntime().addShutdownHook(sHook);
 
+     
         runtimeContext = new AsterixAppRuntimeContext(ncApplicationContext);
         runtimeContext.initialize();
         ncApplicationContext.setApplicationObject(runtimeContext);
-        JVMShutdownHook sHook = new JVMShutdownHook(this);
-        Runtime.getRuntime().addShutdownHook(sHook);
 
         // #. recover if the system is corrupted by checking system state.
         IRecoveryManager recoveryMgr = runtimeContext.getTransactionSubsystem().getRecoveryManager();
@@ -78,6 +85,8 @@
             if (isMetadataNode) {
                 MetadataBootstrap.stopUniverse();
             }
+
+            LifeCycleComponentManager.INSTANCE.stopAll(false);
             runtimeContext.deinitialize();
         } else {
             if (LOGGER.isLoggable(Level.INFO)) {
@@ -89,7 +98,8 @@
     @Override
     public void notifyStartupComplete() throws Exception {
         IAsterixStateProxy proxy = (IAsterixStateProxy) ncApplicationContext.getDistributedState();
-        AsterixMetadataProperties metadataProperties = ((IAsterixPropertiesProvider)runtimeContext).getMetadataProperties();
+        AsterixMetadataProperties metadataProperties = ((IAsterixPropertiesProvider) runtimeContext)
+                .getMetadataProperties();
 
         if (systemState == SystemState.NEW_UNIVERSE) {
             if (LOGGER.isLoggable(Level.INFO)) {
@@ -113,11 +123,29 @@
             }
             MetadataManager.INSTANCE = new MetadataManager(proxy, metadataProperties);
             MetadataManager.INSTANCE.init();
-            MetadataBootstrap.startUniverse( ((IAsterixPropertiesProvider)runtimeContext), ncApplicationContext,
+            MetadataBootstrap.startUniverse(((IAsterixPropertiesProvider) runtimeContext), ncApplicationContext,
                     systemState == SystemState.NEW_UNIVERSE);
             MetadataBootstrap.startDDLRecovery();
         }
 
+        if (LOGGER.isLoggable(Level.INFO)) {
+            LOGGER.info("Starting lifecycle components");
+        }
+        
+        Map<String, String> lifecycleMgmtConfiguration = new HashMap<String, String>();
+        String key = LifeCycleComponentManager.Config.DUMP_PATH_KEY;
+        String value = metadataProperties.getCoredumpPath(nodeId);
+        lifecycleMgmtConfiguration.put(key, value);
+        if (LOGGER.isLoggable(Level.INFO)) {
+            LOGGER.info("Coredump directory for NC is: " + value);
+        }
+        LifeCycleComponentManager.INSTANCE.configure(lifecycleMgmtConfiguration);
+        if (LOGGER.isLoggable(Level.INFO)) {
+            LOGGER.info("Configured:" + LifeCycleComponentManager.INSTANCE);
+        }
+        
+        LifeCycleComponentManager.INSTANCE.startAll();
+
         IRecoveryManager recoveryMgr = runtimeContext.getTransactionSubsystem().getRecoveryManager();
         recoveryMgr.checkpoint(true);
 
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/result/ResultUtils.java b/asterix-app/src/main/java/edu/uci/ics/asterix/result/ResultUtils.java
index 44a54e0..012b7b2 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/result/ResultUtils.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/result/ResultUtils.java
@@ -89,29 +89,20 @@
     }
 
     public static void webUIErrorHandler(PrintWriter out, Exception e) {
-        String errorTemplate = "%s\n%s\n%s";
-        try {
-            String resourcePath = "/webui/errortemplate.html";
-            InputStream is = APIServlet.class.getResourceAsStream(resourcePath);
-            InputStreamReader isr = new InputStreamReader(is);
-            StringBuilder sb = new StringBuilder();
-            BufferedReader br = new BufferedReader(isr);
-            String line = br.readLine();
-
-            while (line != null) {
-                sb.append(line);
-                line = br.readLine();
-            }
-            errorTemplate = sb.toString();
-        } catch (IOException ioe) {
-            // If there is an IOException reading the error template html file, default value of error template is used.
-        }
+        String errorTemplate = readTemplateFile("/webui/errortemplate.html", "%s\n%s\n%s");
 
         String errorOutput = String.format(errorTemplate, extractErrorMessage(e), extractErrorSummary(e),
                 extractFullStackTrace(e));
         out.println(errorOutput);
     }
 
+    public static void webUIParseExceptionHandler(PrintWriter out, Throwable e, String query) {
+        String errorTemplate = readTemplateFile("/webui/errortemplate_message.html", "<pre class=\"error\">%s\n</pre>");
+
+        String errorOutput = String.format(errorTemplate, buildParseExceptionMessage(e, query));
+        out.println(errorOutput);
+    }
+
     public static void apiErrorHandler(PrintWriter out, Exception e) {
         int errorCode = 99;
         if (e instanceof ParseException) {
@@ -211,4 +202,35 @@
         e.printStackTrace(printWriter);
         return stringWriter.toString();
     }
+
+    /**
+     * Read the template file which is stored as a resource and return its content. If the file does not exist or is
+     * not readable return the default template string.
+     *
+     * @param path
+     *            The path to the resource template file
+     * @param defaultTemplate
+     *            The default template string if the template file does not exist or is not readable
+     * @return The template string to be used to render the output.
+     */
+    private static String readTemplateFile(String path, String defaultTemplate) {
+        String errorTemplate = defaultTemplate;
+        try {
+            String resourcePath = "/webui/errortemplate_message.html";
+            InputStream is = APIServlet.class.getResourceAsStream(resourcePath);
+            InputStreamReader isr = new InputStreamReader(is);
+            StringBuilder sb = new StringBuilder();
+            BufferedReader br = new BufferedReader(isr);
+            String line = br.readLine();
+
+            while (line != null) {
+                sb.append(line);
+                line = br.readLine();
+            }
+            errorTemplate = sb.toString();
+        } catch (IOException ioe) {
+            // If there is an IOException reading the error template html file, default value of error template is used.
+        }
+        return errorTemplate;
+    }
 }
diff --git a/asterix-app/src/main/resources/webui/errortemplate_message.html b/asterix-app/src/main/resources/webui/errortemplate_message.html
new file mode 100644
index 0000000..ea95ccf
--- /dev/null
+++ b/asterix-app/src/main/resources/webui/errortemplate_message.html
@@ -0,0 +1,14 @@
+<div class="accordion" id="errorblock">
+  <div class="accordion-group">
+    <div class="accordion-heading">
+      <a class="accordion-toggle" data-toggle="collapse" data-parent="#errorblock" href="#levelOne">
+        Message
+      </a>
+    </div>
+    <div id="levelOne" class="accordion-body collapse in">
+      <div class="accordion-inner">
+        <pre class="error">%s</pre>
+      </div>
+    </div>
+  </div>
+</div>
diff --git a/asterix-app/src/main/resources/webui/querytemplate.html b/asterix-app/src/main/resources/webui/querytemplate.html
index 40ce161..f1c2e7e 100644
--- a/asterix-app/src/main/resources/webui/querytemplate.html
+++ b/asterix-app/src/main/resources/webui/querytemplate.html
@@ -57,9 +57,14 @@
             var errorPattern = /<div class="accordion" id="errorblock">/g;
             var resultCount = data.match(resPattern);
 
-            if (!resPattern.test(data) && errorPattern.test(data)) {
-                $('#output-heading').html('Error');
-                $('#output-heading').addClass('error');
+            if (!resPattern.test(data)) {
+                if(errorPattern.test(data)) {
+                  $('#output-heading').html('Error');
+                  $('#output-heading').addClass('error');
+                } else {
+                  $('#output-heading').html('Output');
+                  $('#output-heading').removeClass('error');
+                }
                 $('#output-message').html(data);
             } else {
                 $('#output-heading').html('Output');
diff --git a/asterix-app/src/test/resources/hadoop/conf/mapred-site.xml b/asterix-app/src/test/resources/hadoop/conf/mapred-site.xml
index 1b9a4d6..b39fced 100644
--- a/asterix-app/src/test/resources/hadoop/conf/mapred-site.xml
+++ b/asterix-app/src/test/resources/hadoop/conf/mapred-site.xml
@@ -5,21 +5,21 @@
 
 <configuration>
 
-  <property>
-    <name>mapred.job.tracker</name>
-    <value>localhost:29007</value>
-  </property>
-  <property>
-     <name>mapred.tasktracker.map.tasks.maximum</name>
-     <value>20</value>
-  </property>
-   <property>
-      <name>mapred.tasktracker.reduce.tasks.maximum</name>
-      <value>20</value>
-   </property>
-   <property>
-      <name>mapred.min.split.size</name>
-      <value>65536</value>
-   </property>
+	<property>
+		<name>mapred.job.tracker</name>
+		<value>localhost:29007</value>
+	</property>
+	<property>
+		<name>mapred.tasktracker.map.tasks.maximum</name>
+		<value>20</value>
+	</property>
+	<property>
+		<name>mapred.tasktracker.reduce.tasks.maximum</name>
+		<value>20</value>
+	</property>
+	<property>
+		<name>mapred.max.split.size</name>
+		<value>128</value>
+	</property>
 
 </configuration>
diff --git a/asterix-app/src/test/resources/runtimets/queries/hdfs/issue_245_hdfs/issue_245_hdfs.3.query.aql b/asterix-app/src/test/resources/runtimets/queries/hdfs/issue_245_hdfs/issue_245_hdfs.3.query.aql
index 653ee6c..527a0e5 100644
--- a/asterix-app/src/test/resources/runtimets/queries/hdfs/issue_245_hdfs/issue_245_hdfs.3.query.aql
+++ b/asterix-app/src/test/resources/runtimets/queries/hdfs/issue_245_hdfs/issue_245_hdfs.3.query.aql
@@ -8,4 +8,5 @@
 use dataverse test;
 
 for $x in dataset('TextDataset')
+order by $x.line
 return $x
diff --git a/asterix-app/src/test/resources/runtimets/results/hdfs/issue_245_hdfs/issue_245_hdfs.1.adm b/asterix-app/src/test/resources/runtimets/results/hdfs/issue_245_hdfs/issue_245_hdfs.1.adm
index 8af2f5f..59425b1 100644
--- a/asterix-app/src/test/resources/runtimets/results/hdfs/issue_245_hdfs/issue_245_hdfs.1.adm
+++ b/asterix-app/src/test/resources/runtimets/results/hdfs/issue_245_hdfs/issue_245_hdfs.1.adm
@@ -1,4 +1,4 @@
+{ "line": "ASTERIX is taking an open stance on data formats and addressing research issues including highly scalable data storage and indexing, semi-structured query processing on very large clusters, and merging parallel database techniques with todays data-intensive computing techniques to support performant yet declarative solutions to the problem of analyzing semi-structured information" }
+{ "line": "ASTERIX targets a wide range of semi-structured information, ranging from data use cases where information is well-tagged and highly regular to content use cases where data is irregular and much of each datum is textual" }
 { "line": "The ASTERIX project is developing new technologies for ingesting, storing, managing, indexing, querying, analyzing, and subscribing to vast quantities of semi-structured information" }
 { "line": "The project is combining ideas from three distinct areas semi-structured data, parallel databases, and data-intensive computing  to create a next-generation, open source software platform that scales by running on large, shared-nothing commodity computing clusters" }
-{ "line": "ASTERIX targets a wide range of semi-structured information, ranging from data use cases where information is well-tagged and highly regular to content use cases where data is irregular and much of each datum is textual" }
-{ "line": "ASTERIX is taking an open stance on data formats and addressing research issues including highly scalable data storage and indexing, semi-structured query processing on very large clusters, and merging parallel database techniques with todays data-intensive computing techniques to support performant yet declarative solutions to the problem of analyzing semi-structured information" }
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/api/AsterixThreadExecutor.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/api/AsterixThreadExecutor.java
new file mode 100644
index 0000000..14975ff
--- /dev/null
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/api/AsterixThreadExecutor.java
@@ -0,0 +1,32 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.common.api;
+
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
+
+public class AsterixThreadExecutor implements Executor {
+    public final static AsterixThreadExecutor INSTANCE = new AsterixThreadExecutor();
+    private final Executor executor = Executors.newCachedThreadPool(AsterixThreadFactory.INSTANCE);
+
+    private AsterixThreadExecutor() {
+
+    }
+
+    @Override
+    public void execute(Runnable command) {
+        executor.execute(command);
+    }
+}
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/api/AsterixThreadFactory.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/api/AsterixThreadFactory.java
new file mode 100644
index 0000000..7e4735f
--- /dev/null
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/api/AsterixThreadFactory.java
@@ -0,0 +1,41 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.common.api;
+
+import java.util.concurrent.ThreadFactory;
+
+import edu.uci.ics.hyracks.api.lifecycle.LifeCycleComponentManager;
+
+public class AsterixThreadFactory implements ThreadFactory {
+
+    public final static AsterixThreadFactory INSTANCE = new AsterixThreadFactory();
+
+    private AsterixThreadFactory() {
+
+    }
+
+    @Override
+    public Thread newThread(Runnable r) {
+        Thread t;
+        if ((r instanceof Thread)) {
+            t = (Thread) r;
+        } else {
+            t = new Thread(r);
+        }
+        t.setUncaughtExceptionHandler(LifeCycleComponentManager.INSTANCE);
+        return t;
+    }
+
+}
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/config/AsterixMetadataProperties.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/config/AsterixMetadataProperties.java
index 6d47e78..6b6cded 100644
--- a/asterix-common/src/main/java/edu/uci/ics/asterix/common/config/AsterixMetadataProperties.java
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/config/AsterixMetadataProperties.java
@@ -24,5 +24,9 @@
     public Set<String> getNodeNames() {
         return accessor.getNodeNames();
     }
+    
+    public String getCoredumpPath(String nodeId){
+        return accessor.getCoredumpPath(nodeId);
+    }
 
 }
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/config/AsterixPropertiesAccessor.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/config/AsterixPropertiesAccessor.java
index 7b2f2a6..d623ae5 100644
--- a/asterix-common/src/main/java/edu/uci/ics/asterix/common/config/AsterixPropertiesAccessor.java
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/config/AsterixPropertiesAccessor.java
@@ -16,6 +16,7 @@
 import javax.xml.bind.Unmarshaller;
 
 import edu.uci.ics.asterix.common.configuration.AsterixConfiguration;
+import edu.uci.ics.asterix.common.configuration.Coredump;
 import edu.uci.ics.asterix.common.configuration.Property;
 import edu.uci.ics.asterix.common.configuration.Store;
 import edu.uci.ics.asterix.common.exceptions.AsterixException;
@@ -26,6 +27,7 @@
     private final String metadataNodeName;
     private final Set<String> nodeNames;
     private final Map<String, String[]> stores;
+    private final Map<String, String> coredumpConfig;
     private final Map<String, Property> asterixConfigurationParams;
 
     public AsterixPropertiesAccessor() throws AsterixException {
@@ -64,6 +66,11 @@
         for (Property p : asterixConfiguration.getProperty()) {
             asterixConfigurationParams.put(p.getName(), p);
         }
+        coredumpConfig = new HashMap<String, String>();
+        for (Coredump cd : asterixConfiguration.getCoredump()) {
+            coredumpConfig.put(cd.getNcId(), cd.getCoredumpPath());
+        }
+
     }
 
     public String getMetadataNodeName() {
@@ -82,6 +89,10 @@
         return nodeNames;
     }
 
+    public String getCoredumpPath(String nodeId) {
+        return coredumpConfig.get(nodeId);
+    }
+
     public <T> T getProperty(String property, T defaultValue, IPropertyInterpreter<T> interpreter) {
         Property p = asterixConfigurationParams.get(property);
         if (p == null) {
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/ILogManager.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/ILogManager.java
index 9470d17..c90422c 100644
--- a/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/ILogManager.java
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/ILogManager.java
@@ -50,12 +50,10 @@
             ACIDException;
 
     /**
-     * @param logicalLogLocator
+     * @param lsnValue
      *            TODO
      * @param logicalLogLocator
      *            TODO
-     * @param PhysicalLogLocator
-     *            specifies the location of the log record to be read
      * @throws ACIDException
      */
     public void readLog(long lsnValue, LogicalLogLocator logicalLogLocator) throws ACIDException;
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/ITransactionContext.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/ITransactionContext.java
index 6f4c9f5..80917e0 100644
--- a/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/ITransactionContext.java
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/ITransactionContext.java
@@ -43,6 +43,8 @@
 
     public void setTransactionType(TransactionType transactionType);
 
+    public String prettyPrint();
+
     public static final long INVALID_TIME = -1l; // used for showing a
     // transaction is not waiting.
     public static final int ACTIVE_STATUS = 0;
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/ITransactionManager.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/ITransactionManager.java
index 2bfc00d..e57cc64 100644
--- a/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/ITransactionManager.java
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/ITransactionManager.java
@@ -120,5 +120,4 @@
      */
     public ITransactionSubsystem getTransactionProvider();
 
-
 }
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/LogManagerProperties.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/LogManagerProperties.java
index 4dc943c..89816aa 100644
--- a/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/LogManagerProperties.java
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/LogManagerProperties.java
@@ -105,7 +105,7 @@
     public String getLogDirKey() {
         return logDirKey;
     }
-    
+
     public int getDiskSectorSize() {
         return diskSectorSize;
     }
diff --git a/asterix-common/src/main/resources/schema/asterix-conf.xsd b/asterix-common/src/main/resources/schema/asterix-conf.xsd
index f53fb4b..5aefdbd 100644
--- a/asterix-common/src/main/resources/schema/asterix-conf.xsd
+++ b/asterix-common/src/main/resources/schema/asterix-conf.xsd
@@ -7,6 +7,7 @@
 
         
 	<xs:element name="metadataNode" type="xs:string" />
+	<xs:element name="coredumpPath" type="xs:string" />
 	<xs:element name="storeDirs" type="xs:string" />
 	<xs:element name="ncId" type="xs:string" />
 	<xs:element name="name" type="xs:string" />
@@ -23,6 +24,15 @@
 		</xs:complexType>
 	</xs:element>
 
+	<xs:element name="coredump">
+		<xs:complexType>
+			<xs:sequence>
+				<xs:element ref="mg:ncId" />
+				<xs:element ref="mg:coredumpPath" />
+			</xs:sequence>
+		</xs:complexType>
+	</xs:element>
+
 	<xs:element name="property">
 		<xs:complexType>
 			<xs:sequence>
@@ -39,6 +49,7 @@
 			<xs:sequence>
 				<xs:element ref="mg:metadataNode" minOccurs="0"/>
 				<xs:element ref="mg:store" maxOccurs="unbounded" />
+				<xs:element ref="mg:coredump" maxOccurs="unbounded" />
 				<xs:element ref="mg:property" minOccurs="0" maxOccurs="unbounded" />
 			</xs:sequence>
 		</xs:complexType>
diff --git a/asterix-doc/pom.xml b/asterix-doc/pom.xml
index d987e5f..4b8cb2f 100644
--- a/asterix-doc/pom.xml
+++ b/asterix-doc/pom.xml
@@ -1,21 +1,21 @@
 <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
-	<modelVersion>4.0.0</modelVersion>
-	<parent>
-		<artifactId>asterix</artifactId>
-		<groupId>edu.uci.ics.asterix</groupId>
-		<version>0.0.6-SNAPSHOT</version>
-	</parent>
-	<artifactId>asterix-doc</artifactId>
-	<build>
-		<plugins>
-			<plugin>
-				<groupId>org.apache.maven.plugins</groupId>
-				<artifactId>maven-site-plugin</artifactId>
-				<version>3.3</version>
-				<configuration>
-				  <generateReports>false</generateReports>
-				</configuration>
-			</plugin>
-		</plugins>
-	</build>
+  <modelVersion>4.0.0</modelVersion>
+  <parent>
+    <artifactId>asterix</artifactId>
+    <groupId>edu.uci.ics.asterix</groupId>
+    <version>0.0.6-SNAPSHOT</version>
+  </parent>
+  <artifactId>asterix-doc</artifactId>
+  <build>
+    <plugins>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-site-plugin</artifactId>
+        <version>3.3</version>
+        <configuration>
+          <generateReports>false</generateReports>
+        </configuration>
+      </plugin>
+    </plugins>
+  </build>
 </project>
diff --git a/asterix-doc/src/site/markdown/AsterixDBRestAPI.md b/asterix-doc/src/site/markdown/api.md
similarity index 100%
rename from asterix-doc/src/site/markdown/AsterixDBRestAPI.md
rename to asterix-doc/src/site/markdown/api.md
diff --git a/asterix-doc/src/site/markdown/AsterixDBDataModel.md b/asterix-doc/src/site/markdown/aql/datamodel.md
similarity index 100%
rename from asterix-doc/src/site/markdown/AsterixDBDataModel.md
rename to asterix-doc/src/site/markdown/aql/datamodel.md
diff --git a/asterix-doc/src/site/markdown/AccessingExternalDataInAsterixDB.md b/asterix-doc/src/site/markdown/aql/externaldata.md
similarity index 98%
rename from asterix-doc/src/site/markdown/AccessingExternalDataInAsterixDB.md
rename to asterix-doc/src/site/markdown/aql/externaldata.md
index 7319094..e603954 100644
--- a/asterix-doc/src/site/markdown/AccessingExternalDataInAsterixDB.md
+++ b/asterix-doc/src/site/markdown/aql/externaldata.md
@@ -10,7 +10,7 @@
 
 As an example we consider the Lineitem dataset from [TPCH schema](http://www.openlinksw.com/dataspace/doc/dav/wiki/Main/VOSTPCHLinkedData/tpch.sql).
 
-We assume that you have successfully created an ASTERIX instance following the instructions at [Installing Asterix Using Managix](InstallingAsterixUsingManagix.html).
+We assume that you have successfully created an ASTERIX instance following the instructions at [Installing Asterix Using Managix](../install.html).
 _For constructing an example, we assume a single machine setup._
 
 Similar to a regular dataset, an external dataset has an associated datatype.  We shall first create the datatype associated with each record in Lineitem data.
diff --git a/asterix-doc/src/site/markdown/AsterixDBFunctions.md b/asterix-doc/src/site/markdown/aql/functions.md
similarity index 100%
rename from asterix-doc/src/site/markdown/AsterixDBFunctions.md
rename to asterix-doc/src/site/markdown/aql/functions.md
diff --git a/asterix-doc/src/site/markdown/AsterixQueryLanguageReference.md b/asterix-doc/src/site/markdown/aql/manual.md
similarity index 86%
rename from asterix-doc/src/site/markdown/AsterixQueryLanguageReference.md
rename to asterix-doc/src/site/markdown/aql/manual.md
index eddc157..da51dee 100644
--- a/asterix-doc/src/site/markdown/AsterixQueryLanguageReference.md
+++ b/asterix-doc/src/site/markdown/aql/manual.md
@@ -16,6 +16,10 @@
 
 ## 2. Expressions
 
+    Query ::= Expression
+
+An AQL query can be any legal AQL expression.
+    
     Expression ::= ( OperatorExpr | IfThenElse | FLWOR | QuantifiedExpression )
 
 AQL is a fully composable expression language.
@@ -28,12 +32,6 @@
 or a QuantifiedExpression (which yields a boolean value).
 Each will be detailed as we explore the full AQL grammar.
 
-### Queries
-
-    Query ::= Expression
-
-An AQL query can be any legal AQL expression.
-    
 ### Primary Expressions
 
     PrimaryExpr ::= Literal
@@ -226,7 +224,7 @@
 
 AQL has the usual list of suspects, plus one, for comparing pairs of atomic values.
 The "plus one" is the last operator listed above, which is the "roughly equal" operator provided for similarity queries.
-(See the separate document on AsterixDB Similarity Queries for more details on similarity matching.)
+(See the separate document on [AsterixDB Similarity Queries](similarity.html) for more details on similarity matching.)
 
 An example comparison expression (which yields the boolean value true) is shown below.
 
@@ -266,24 +264,24 @@
 The heart of AQL is the FLWOR (for-let-where-orderby-return) expression.
 The roots of this expression were borrowed from the expression of the same name in XQuery.
 A FLWOR expression starts with one or more clauses that establish variable bindings.
-A for clause binds a variable incrementally to each element of its associated expression;
+A `for` clause binds a variable incrementally to each element of its associated expression;
 it includes an optional positional variable for counting/numbering the bindings.
-By default no ordering is implied or assumed by a for clause.
-A let clause binds a variable to the collection of elements computed by its associated expression.
+By default no ordering is implied or assumed by a `for` clause.
+A `let` clause binds a variable to the collection of elements computed by its associated expression.
 
-Following the initial for or let clause(s), a FLWOR expression may contain an arbitrary sequence of other clauses.
-The where clause in a FLWOR expression filters the preceding bindings via a boolean expression, much like a where clause does in a SQL query.
-The order by clause in a FLWOR expression induces an ordering on the data.
-The group by clause, discussed further below, forms groups based on its group by expressions,
+Following the initial `for` or `let` clause(s), a FLWOR expression may contain an arbitrary sequence of other clauses.
+The `where` clause in a FLWOR expression filters the preceding bindings via a boolean expression, much like a `where` clause does in a SQL query.
+The `order by` clause in a FLWOR expression induces an ordering on the data.
+The `group by` clause, discussed further below, forms groups based on its group by expressions,
 optionally naming the expressions' values (which together form the grouping key for the expression).
-The with subclause of a group by clause specifies the variable(s) whose values should be grouped based
+The `with` subclause of a `group by` clause specifies the variable(s) whose values should be grouped based
 on the grouping key(s); following the grouping clause, only the grouping key(s) and the variables named
 in the with subclause remain in scope, and the named grouping variables now contain lists formed from their input values.
-The limit clause caps the number of values returned, optionally starting its result count from a specified offset.
+The `limit` clause caps the number of values returned, optionally starting its result count from a specified offset.
 (Web applications can use this feature for doing pagination.)
-The distinct clause is similar to the group-by clause, but it forms no groups; it serves only to eliminate duplicate values.
+The `distinct` clause is similar to the `group-by` clause, but it forms no groups; it serves only to eliminate duplicate values.
 As indicated by the grammar, the clauses in an AQL query can appear in any order.
-To interpret a query, one can think of data as flowing down through the query from the first clause to the return clause.
+To interpret a query, one can think of data as flowing down through the query from the first clause to the `return` clause.
 
 The following example shows a FLWOR expression that selects and returns one user from the dataset FacebookUsers.
 
@@ -308,7 +306,7 @@
         "message": $message.message
       };
 
-In the next example, a let clause is used to bind a variable to all of a user's FacebookMessages.
+In the next example, a `let` clause is used to bind a variable to all of a user's FacebookMessages.
 The query returns one record per user, with result records containing the user's name and the set of all messages by that user.
 
 ##### Example
@@ -325,7 +323,7 @@
       };
 
 The following example returns all TwitterUsers ordered by their followers count (most followers first) and language.
-Null is treated as being smaller than any other value if nulls are encountered in the ordering key(s).
+When ordering `null` is treated as being smaller than any other value if `null`s are encountered in the ordering key(s).
 
 ##### Example
 
@@ -333,11 +331,11 @@
       order by $user.followers_count desc, $user.lang asc
       return $user
 
-The next example illustrates the use of the group by clause in AQL.
-After the group by clause in the query, only variables that are either in the group by list or in the with list are in scope.
-The variables in the clause's with list will each contain a collection of items following the group by clause;
+The next example illustrates the use of the `group by` clause in AQL.
+After the `group by` clause in the query, only variables that are either in the `group by` list or in the `with` list are in scope.
+The variables in the clause's `with` list will each contain a collection of items following the `group by` clause;
 the collected items are the values that the source variable was bound to in the tuples that formed the group.
-Null is handled as a single value for grouping.
+For grouping `null` is handled as a single value.
 
 ##### Example
 
@@ -350,7 +348,7 @@
           "message" : $messages
         }
 
-The use of the limit clause is illustrated in the next example.
+The use of the `limit` clause is illustrated in the next example.
 
 ##### Example
 
@@ -359,11 +357,11 @@
       limit 2
       return $user
 
-The final example shows how AQL's distinct by clause works.
-Each variable in scope before the distinct clause is also in scope after the distinct clause.
-This clause works similarly to group by, but for each variable that contains more than
-one value after the distinct by clause, one value is picked nondeterministically.
-(If the variable is in the distinct by list, then its value will be deterministic.)
+The final example shows how AQL's `distinct by` clause works.
+Each variable in scope before the distinct clause is also in scope after the `distinct by` clause.
+This clause works similarly to `group by`, but for each variable that contains more than
+one value after the `distinct by` clause, one value is picked nondeterministically.
+(If the variable is in the `distinct by` list, then its value will be deterministic.)
 Nulls are treated as a single value when they occur in a grouping field.
 
 ##### Example
@@ -381,8 +379,8 @@
     IfThenElse ::= "if" "(" Expression ")" "then" Expression "else" Expression
 
 A conditional expression is useful for choosing between two alternative values based on a
-boolean condition.  If its first (if) expression is true, its second (then) expression's
-value is returned, and otherwise its third (else) expression is returned.
+boolean condition.  If its first (`if`) expression is true, its second (`then`) expression's
+value is returned, and otherwise its third (`else`) expression is returned.
 
 The following example illustrates the form of a conditional expression.
 ##### Example
@@ -396,17 +394,17 @@
       
 Quantified expressions are used for expressing existential or universal predicates involving the elements of a collection.
 
-The following pair of examples, each of which returns true, illustrate the use of a quantified
-expression to test that every (or some) element in the set [1, 2, 3] of integers is less than three.
+The following pair of examples illustrate the use of a quantified expression to test that every (or some) element in the set [1, 2, 3] of integers is less than three. 
+The first example yields `false` and second example yields `true`.
 
-It is useful to note that if the set were instead the empty set, the first expression would yield true
-("every" value in an empty set satisfies the condition) while the second expression would yield false
+It is useful to note that if the set were instead the empty set, the first expression would yield `true`
+("every" value in an empty set satisfies the condition) while the second expression would yield `false`
 (since there isn't "some" value, as there are no values in the set, that satisfies the condition).
 
 ##### Examples
 
-    every $x in [ 1, 2, 3] satisfies $x < 3
-    some $x in [ 1, 2, 3] satisfies $x < 3
+    every $x in [ 1, 2, 3 ] satisfies $x < 3
+    some $x in [ 1, 2, 3 ] satisfies $x < 3
 
 ## 3. Statements
 
@@ -423,7 +421,8 @@
 
 In addition to expresssions for queries, AQL supports a variety of statements for data
 definition and manipulation purposes as well as controlling the context to be used in
-evaluating AQL  expressions.  This section details the statement side of the AQL language.
+evaluating AQL expressions. 
+This section details the statement side of the AQL language.
 
 ### Declarations
  
@@ -437,10 +436,10 @@
 ##### Example
 
     use dataverse TinySocial;
-
-    SetStatement         ::= "set" Identifier StringLiteral
-
+    
 The set statement in AQL is used to control aspects of the expression evalation context for queries.
+    
+    SetStatement ::= "set" Identifier StringLiteral
 
 As an example, the following set statements request that Jaccard similarity with a similarity threshold 0.6
 be used for set similarity matching when the ~= operator is used in a query expression.
@@ -450,13 +449,13 @@
     set simfunction "jaccard";
     set simthreshold "0.6f"; 
 
-    FunctionDeclaration  ::= "declare" "function" Identifier ParameterList "{" Expression "}"
-    ParameterList        ::= "(" ( <VARIABLE> ( "," <VARIABLE> )* )? ")"
-
 When writing a complex AQL query, it can sometimes be helpful to define one or more
 auxilliary functions that each address a sub-piece of the overall query.
 The declare function statement supports the creation of such helper functions.
 
+    FunctionDeclaration  ::= "declare" "function" Identifier ParameterList "{" Expression "}"
+    ParameterList        ::= "(" ( <VARIABLE> ( "," <VARIABLE> )* )? ")"
+
 The following is a very simple example of a temporary AQL function definition.
 
 ##### Example
@@ -487,7 +486,7 @@
 To ease the authoring of reusable AQL scripts, its optional IfNotExists clause allows creation
 to be requested either unconditionally or only if the the dataverse does not already exist.
 If this clause is absent, an error will be returned if the specified dataverse already exists.
-The with format clause is a placeholder for future functionality that can safely be ignored.
+The `with format` clause is a placeholder for future functionality that can safely be ignored.
 
 The following example creates a dataverse named TinySocial.
 
@@ -509,7 +508,7 @@
 
 The create type statement is used to create a new named ADM datatype.
 This type can then be used to create datasets or utilized when defining one or more other ADM datatypes.
-Much more information about the Asterix Data Model (ADM) is available in the data model reference guide to ADM.
+Much more information about the Asterix Data Model (ADM) is available in the [data model reference guide](datamodel.html) to ADM.
 A new type can be a record type, a renaming of another type, an ordered list type, or an unordered list type.
 A record type can be defined as being either open or closed.
 Instances of a closed record type are not permitted to contain fields other than those specified in the create type statement.
@@ -524,12 +523,12 @@
 ##### Example
 
     create type FacebookUserType as closed {
-      id: int32,
-      alias: string,
-      name: string,
+      id:         int32,
+      alias:      string,
+      name:       string,
       user-since: datetime,
       friend-ids: {{ int32 }},
-      employment: [EmploymentType]
+      employment: [ EmploymentType ]
     }
 
 #### Datasets
@@ -557,7 +556,7 @@
 External dataset support allows AQL queries to treat external data as though it were stored in AsterixDB,
 making it possible to query "legacy" file data (e.g., Hive data) without having to physically import it into AsterixDB.
 For an external dataset, an appropriate adaptor must be selected to handle the nature of the desired external data.
-(See the guide to external data for more information on the available adaptors.)
+(See the [guide to external data](externaldata.html) for more information on the available adaptors.)
 
 The following example creates an internal dataset for storing FacefookUserType records.
 It specifies that their id field is their primary key.
@@ -566,7 +565,7 @@
     create internal dataset FacebookUsers(FacebookUserType) primary key id;
 
 The next example creates an external dataset for storing LineitemType records.
-The choice of the localfs adaptor means that its data will reside in the local filesystem of the cluster nodes.
+The choice of the `localfs` adaptor means that its data will reside in the local filesystem of the cluster nodes.
 The create statement provides several parameters used by the localfs adaptor;
 e.g., the file format is delimited text with vertical bar being the field delimiter.
 
@@ -587,9 +586,8 @@
                          | "ngram" "(" <INTEGER_LITERAL> ")"
 
 The create index statement creates a secondary index on one or more fields of a specified dataset.
-Supported index types include btree for totally ordered datatypes,
-rtree for spatial data,
-and keyword and ngram for textual (string) data.
+Supported index types include `btree` for totally ordered datatypes,
+`rtree` for spatial data, and `keyword` and `ngram` for textual (string) data.
 AsterixDB currently requires indexed fields to be part of the named type associated with a dataset.
 (Future plans include support for indexing of open fields as well.)
 
@@ -601,7 +599,9 @@
     create index fbAuthorIdx on FacebookMessages(author-id) type btree;
 
 The following example creates an rtree index called fbSenderLocIdx on the sender-location field of the FacebookMessages dataset.
-This index can be useful for accelerating spatial searches involving the sender-loction field.
+This index can be useful for accelerating queries that use the
+[`spatial-intersect` function](functions.html#spatial-intersect) in a predicate involving the 
+sender-loction field.
 
 ##### Example
 
@@ -649,20 +649,12 @@
 
     drop dataset FacebookUsers if exists;
 
-##### Example
-
     drop index fbSenderLocIndex;
 
-##### Example
-
     drop type FacebookUserType;
     
-##### Example
-
     drop dataverse TinySocial;
 
-##### Example
-
     drop function add;
 
 ### Import/Export Statements
@@ -671,7 +663,7 @@
     
 The load statement is used to initially populate a dataset via bulk loading of data from an external file.
 An appropriate adaptor must be selected to handle the nature of the desired external data.
-(See the guide to external data for more information on the available adaptors.)
+(See the [guide to external data](externaldata.html) for more information on the available adaptors.)
 
 The following example shows how to bulk load the FacebookUsers dataset from an external file containing
 data that has been prepared in ADM format.
@@ -683,6 +675,8 @@
 
 ### Modification Statements
 
+#### Insert
+
     InsertStatement ::= "insert" "into" "dataset" QualifiedName Query
 
 The AQL insert statement is used to insert data into a dataset.
@@ -699,7 +693,9 @@
     
 ##### Example
 
-    insert into dataset UsersCopy (for $user in dataset FacebookUsers return $user)
+    insert into dataset UsersCopy (for $user in dataset FacebookUsers return $user
+
+#### Delete
 
     DeleteStatement ::= "delete" Variable "from" "dataset" QualifiedName ( "where" Expression )?
 
@@ -711,7 +707,7 @@
 If the boolean expression for a delete identifies a single object, then the delete statement itself
 will be a single, atomic transaction.
 If the expression identifies multiple objects, then each object deleted will be handled independently
-as a tranaction.
+as a transaction.
 
 The following example illustrates a single-object deletion.
 
diff --git a/asterix-doc/src/site/markdown/AdmAql101.md b/asterix-doc/src/site/markdown/aql/primer.md
similarity index 99%
rename from asterix-doc/src/site/markdown/AdmAql101.md
rename to asterix-doc/src/site/markdown/aql/primer.md
index 994c464..fc8ea9a 100644
--- a/asterix-doc/src/site/markdown/AdmAql101.md
+++ b/asterix-doc/src/site/markdown/aql/primer.md
@@ -12,7 +12,7 @@
 Most importantly, it assumes you already have a running instance of AsterixDB and that you know how to query
 it using AsterixDB's basic web interface.
 For more information on these topics, you should go through the steps in 
-[Installing Asterix Using Managix](InstallingAsterixUsingManagix.html)
+[Installing Asterix Using Managix](../install.html)
 before reading this document and make sure that you have a running AsterixDB instance ready to go.
 To get your feet wet, you should probably start with a simple local installation of AsterixDB on your favorite
 machine, accepting all of the default settings that Managix offers.
@@ -350,7 +350,7 @@
 in the not-too-distant future we will also provide a complete reference manual for the language.
 In the meantime, this will get you started down the path of using AsterixDB.
 A more complete list of the supported AsterixDB primitive types and built-in functions can be
-found at [AsterixDataTypesAndFunctions](AsterixDataTypesAndFunctions.html).
+found at [Asterix Data Model (ADM)](datamodel.html) and [Asterix Functions](functions.html).
 
 AQL is an expression language.
 Even the expression 1+1 is a valid AQL query that evaluates to 2.
diff --git a/asterix-doc/src/site/markdown/SimilarityQuery.md b/asterix-doc/src/site/markdown/aql/similarity.md
similarity index 83%
rename from asterix-doc/src/site/markdown/SimilarityQuery.md
rename to asterix-doc/src/site/markdown/aql/similarity.md
index 6f332ea..244103c 100644
--- a/asterix-doc/src/site/markdown/SimilarityQuery.md
+++ b/asterix-doc/src/site/markdown/aql/similarity.md
@@ -1,7 +1,7 @@
 
-# AsterixDB  Support of Similarity Queries # 
+# AsterixDB  Support of Similarity Queries #
 
-## Motivation ## 
+## Motivation ##
 
 Similarity queries are widely used in applications where users need to
 find records that satisfy a similarity predicate, while exact matching
@@ -14,12 +14,12 @@
 users who have similar friends. To meet this type of needs, AsterixDB
 supports similarity queries using efficient indexes and algorithms.
 
-## Data Types and Similarity Functions ## 
+## Data Types and Similarity Functions ##
 
 AsterixDB supports [edit distance](http://en.wikipedia.org/wiki/Levenshtein_distance) (on strings) and
 [Jaccard](http://en.wikipedia.org/wiki/Jaccard_index) (on sets).  For
 instance, in our
-[TinySocial](AdmAql101.html#ADM:_Modeling_Semistructed_Data_in_AsterixDB)
+[TinySocial](primer.html#ADM:_Modeling_Semistructed_Data_in_AsterixDB)
 example, the `friend-ids` of a Facebook user forms a set
 of friends, and we can define a similarity between the sets of
 friends of two users. We can also convert a string to a set of grams of a length "n"
@@ -29,13 +29,13 @@
 `schwarzenegger` are `sch`, `chw`, `hwa`, ..., `ger`.
 
 AsterixDB provides
-[tokenization functions](AsterixDBFunctions.html#Tokenizing_Functions)
+[tokenization functions](functions.html#Tokenizing_Functions)
 to convert strings to sets, and the
-[similarity functions](AsterixDBFunctions.html#Similarity_Functions).
+[similarity functions](functions.html#Similarity_Functions).
 
-## Similarity Selection Queries ## 
+## Similarity Selection Queries ##
 
-The following [query](AsterixDBFunctions.html#edit-distance)
+The following [query](functions.html#edit-distance)
 asks for all the Facebook users whose name is similar to
 `Suzanna Tilson`, i.e., their edit distance is at most 2.
 
@@ -47,7 +47,7 @@
         return $user
 
 
-The following [query](AsterixDBFunctions.html#similarity-jaccard)
+The following [query](functions.html#similarity-jaccard)
 asks for all the Facebook users whose set of friend ids is
 similar to `[1,5,9]`, i.e., their Jaccard similarity is at least 0.6.
 
@@ -78,10 +78,10 @@
 using `simfunction` and then specify the threshold `0.6f` using
 `simthreshold`.
 
-## Similarity Join Queries ## 
+## Similarity Join Queries ##
 
 AsterixDB supports fuzzy joins between two sets. The following
-[query](AdmAql101.html#Query_5_-_Fuzzy_Join)
+[query](primer.html#Query_5_-_Fuzzy_Join)
 finds, for each Facebook user, all Twitter users with names
 similar to their name based on the edit distance.
 
@@ -103,7 +103,7 @@
                                 }
         };
 
-## Using Indexes to Support Similarity Queries ## 
+## Using Indexes to Support Similarity Queries ##
 
 AsterixDB uses two types of indexes to support similarity queries, namely
 "ngram index" and "keyword index".
@@ -129,13 +129,13 @@
 The number "3" in "ngram(3)" is the length "n" in the grams. This
 index can be used to optimize similarity queries on this attribute
 using 
-[edit-distance](AsterixDBFunctions.html#edit-distance), 
-[edit-distance-check](AsterixDBFunctions.html#edit-distance-check), 
-[jaccard](AsterixDBFunctions.html#similarity-jaccard),
-or [jaccard-check](AsterixDBFunctions.html#similarity-jaccard-check) 
+[edit-distance](functions.html#edit-distance), 
+[edit-distance-check](functions.html#edit-distance-check), 
+[jaccard](functions.html#similarity-jaccard),
+or [jaccard-check](functions.html#similarity-jaccard-check) 
 queries on this attribute where the
 similarity is defined on sets of 3-grams.  This index can also be used
-to optimize queries with the "[contains()]((AsterixDBFunctions.html#contains))" predicate (i.e., substring
+to optimize queries with the "[contains()]((functions.html#contains))" predicate (i.e., substring
 matching) since it can be also be solved by counting on the inverted
 lists of the grams in the query string.
 
@@ -169,6 +169,6 @@
         return $c
         
 As shown above, keyword index can be used to optimize queries with token-based similarity predicates, including
-[similarity-jaccard](AsterixDBFunctions.html#similarity-jaccard) and
-[similarity-jaccard-check](AsterixDBFunctions.html#similarity-jaccard-check).
+[similarity-jaccard](functions.html#similarity-jaccard) and
+[similarity-jaccard-check](functions.html#similarity-jaccard-check).
 
diff --git a/asterix-doc/src/site/markdown/index.md b/asterix-doc/src/site/markdown/index.md
index aeb57b4..4ee2a5f 100644
--- a/asterix-doc/src/site/markdown/index.md
+++ b/asterix-doc/src/site/markdown/index.md
@@ -32,23 +32,20 @@
 For the Beta release, we've got a start; for the Beta release a month or so from now, we will hopefully have much more.
 The following is a list of the wiki pages and supporting documents that we have available today:
 
-1. [InstallingAsterixUsingManagix](InstallingAsterixUsingManagix.html) :
+1. [Installing AsterixDB using Managix](install.html) :
 This is our installation guide, and it is where you should start.
 This document will tell you how to obtain, install, and manage instances of [AsterixDB](https://asterixdb.googlecode.com/files/asterix-installer-0.0.4-binary-assembly.zip), including both single-machine setup (for developers) as well as cluster installations (for deployment in its intended form).
 
-2. [AdmAql101](AdmAql101.html) :
+2. [AsterixDB 101: An ADM and AQL Primer](aql/primer.html) :
 This is a first-timers introduction to the user model of the AsterixDB BDMS, by which we mean the view of AsterixDB as seen from the perspective of an "average user" or Big Data application developer.
 The AsterixDB user model consists of its data modeling features (ADM) and its query capabilities (AQL).
 This document presents a tiny "social data warehousing" example and uses it as a backdrop for describing, by example, the key features of AsterixDB.
 By working through this document, you will learn how to define the artifacts needed to manage data in AsterixDB, how to load data into the system, how to use most of the basic features of its query language, and how to insert and delete data dynamically.
 
-3. [AsterixDataTypesAndFunctions](AsterixDataTypesAndFunctions.html) :
-This is a reference document that catalogs the primitive data types and built-in functions available for use in AsterixDB schemas (in ADM) and queries (in AQL).
+3. [Asterix Data Model (ADM)](aql/datamodel.html), [Asterix Functions](aql/functions.html), and [Asterix Query Language (AQL)](aql/manual.html) :
+These are reference documents that catalog the primitive data types and built-in functions available in AQL and the reference manual for AQL itself.
 
-4. [AQL Reference](AsterixQueryLanguageReference.html) :
-This is the AQL language reference manual.
-
-5. [AsterixDBRestAPI](AsterixDBRestAPI.html) :
+5. [REST API to AsterixDB](api.html) :
 Access to data in an AsterixDB instance is provided via a REST-based API.
 This is a short document that describes the REST API entry points and their URL syntax.
 
diff --git a/asterix-doc/src/site/markdown/InstallingAsterixUsingManagix.md b/asterix-doc/src/site/markdown/install.md
similarity index 100%
rename from asterix-doc/src/site/markdown/InstallingAsterixUsingManagix.md
rename to asterix-doc/src/site/markdown/install.md
diff --git a/asterix-doc/src/site/resources/images/asterixlogo.png b/asterix-doc/src/site/resources/images/asterixlogo.png
new file mode 100644
index 0000000..45cd64f
--- /dev/null
+++ b/asterix-doc/src/site/resources/images/asterixlogo.png
Binary files differ
diff --git a/asterix-doc/src/site/site.xml b/asterix-doc/src/site/site.xml
index 4953eb4..b68941c 100644
--- a/asterix-doc/src/site/site.xml
+++ b/asterix-doc/src/site/site.xml
@@ -3,22 +3,33 @@
 <project name="AsterixDB" xmlns="http://maven.apache.org/DECORATION/1.0.0"
          xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
          xsi:schemaLocation="http://maven.apache.org/DECORATION/1.0.0 http://maven.apache.org/xsd/decoration-1.0.0.xsd">
-  <!--
   <bannerLeft>
-    <name>Asterix</name>
-    <src>http://asterix.ics.uci.edu/pic/img9.jpg</src>
-    <href>http://asterix.ics.uci.edu/</href>
+    <name>AsterixDB</name>
+    <src>images/asterixlogo.png</src>
+    <href>/index.html</href>
   </bannerLeft>
-  -->
+  
+  <version position="right"/>
+
+  <poweredBy><logo name="" img=""/></poweredBy>
 
   <skin>
     <groupId>org.apache.maven.skins</groupId>
     <artifactId>maven-fluido-skin</artifactId>
-    <version>1.2.1</version>
+    <version>1.3.0</version>
   </skin>
   <custom>
     <fluidoSkin>
-      <sideBarEnabled>true</sideBarEnabled>
+      <!-- <topBarEnabled>true</topBarEnabled>
+      <topBarIcon>
+        <name>AsterixDB</name>
+        <alt>AsterixDB</alt>
+        <src>images/asterixlogo.png</src>
+        <href>/index.html</href>
+      </topBarIcon>
+      <sideBarEnabled>false</sideBarEnabled> -->
+      <!-- <topBarContainerStyle>width: 90%;</topBarContainerStyle> -->
+      <sourceLineNumbersEnabled>true</sourceLineNumbersEnabled>
       <!-- <googlePlusOne /> -->
     </fluidoSkin>
   </custom>
@@ -29,16 +40,19 @@
     </links>
 
     <menu name="Documentation">
-      <item name="Installing Asterix using Managix" href="InstallingAsterixUsingManagix.html"/>
-      <item name="AsterixDB 101: An ADM and AQL Primer" href="AdmAql101.html"/>
-      <item name="Asterix Data Model (ADM)" href="AsterixDBDataModel.html"/>
-      <item name="AsterixDB Functions" href="AsterixDBFunctions.html"/>
-      <item name="The Asterix Query Language" href="AsterixQueryLanguageReference.html"/>
-      <item name="AsterixDB Support of Similarity Queries" href="AsterixSimilarityQueries.html"/>
-      <item name="Accessing External Data in AsterixDB" href="AccessingExternalDataInAsterixDB.html"/>
-      <item name="REST API to AsterixDB" href="AsterixDBRestAPI.html"/>
+      <item name="Installing AsterixDB using Managix" href="install.html"/>
+      <item name="AsterixDB 101: An ADM and AQL Primer" href="aql/primer.html"/>
+      <item name="Asterix Data Model (ADM)" href="aql/datamodel.html"/>
+      <item name="Asterix Functions" href="aql/functions.html"/>
+      <item name="Asterix Query Language (AQL)" href="aql/manual.html"/>
+      <item name="AQL Support of Similarity Queries" href="aql/similarity.html"/>
+      <item name="Accessing External Data" href="aql/externaldata.html"/>
+      <item name="REST API to AsterixDB" href="api.html"/>
     </menu>
 
     <menu ref="reports"/>
+
+    <footer>&copy; Copyright 2013 University of California, Irvine</footer>
+
   </body>
 </project>
diff --git a/asterix-events/src/main/java/edu/uci/ics/asterix/event/management/EventExecutor.java b/asterix-events/src/main/java/edu/uci/ics/asterix/event/management/EventExecutor.java
index e702ef3..63a791b 100644
--- a/asterix-events/src/main/java/edu/uci/ics/asterix/event/management/EventExecutor.java
+++ b/asterix-events/src/main/java/edu/uci/ics/asterix/event/management/EventExecutor.java
@@ -18,7 +18,6 @@
 import java.io.File;
 import java.io.IOException;
 import java.io.StringWriter;
-import java.math.BigInteger;
 import java.util.ArrayList;
 import java.util.List;
 
diff --git a/asterix-external-data/pom.xml b/asterix-external-data/pom.xml
index 842cd67..aff6967 100644
--- a/asterix-external-data/pom.xml
+++ b/asterix-external-data/pom.xml
@@ -131,7 +131,8 @@
 		</dependency>
 		<dependency>
 			<groupId>edu.uci.ics.hyracks</groupId>
-			<artifactId>hyracks-dataflow-hadoop</artifactId>
+			<artifactId>hyracks-hdfs-core</artifactId>
+			<version>${hyracks.version}</version>
 		</dependency>
 		<dependency>
 			<groupId>jdom</groupId>
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/CNNFeedAdapterFactory.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/CNNFeedAdapterFactory.java
index f1f5884..8c8880a 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/CNNFeedAdapterFactory.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/CNNFeedAdapterFactory.java
@@ -23,9 +23,10 @@
  * A factory class for creating the @see {CNNFeedAdapter}.  
  */
 public class CNNFeedAdapterFactory implements ITypedDatasetAdapterFactory {
+    private static final long serialVersionUID = 1L;
 
     @Override
-    public IDatasourceAdapter createAdapter(Map<String, String> configuration) throws Exception {
+    public IDatasourceAdapter createAdapter(Map<String, Object> configuration) throws Exception {
         CNNFeedAdapter cnnFeedAdapter = new CNNFeedAdapter();
         cnnFeedAdapter.configure(configuration);
         return cnnFeedAdapter;
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/HDFSAdapterFactory.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/HDFSAdapterFactory.java
index 6fcb710..c267658 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/HDFSAdapterFactory.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/HDFSAdapterFactory.java
@@ -14,22 +14,79 @@
  */
 package edu.uci.ics.asterix.external.adapter.factory;
 
+import java.util.Arrays;
+import java.util.HashMap;
 import java.util.Map;
 
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+
 import edu.uci.ics.asterix.external.dataset.adapter.HDFSAdapter;
 import edu.uci.ics.asterix.external.dataset.adapter.IDatasourceAdapter;
 import edu.uci.ics.asterix.om.types.IAType;
+import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
+import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
+import edu.uci.ics.hyracks.hdfs.dataflow.ConfFactory;
+import edu.uci.ics.hyracks.hdfs.dataflow.InputSplitsFactory;
+import edu.uci.ics.hyracks.hdfs.scheduler.Scheduler;
 
 /**
  * A factory class for creating an instance of HDFSAdapter
  */
+@SuppressWarnings("deprecation")
 public class HDFSAdapterFactory implements IGenericDatasetAdapterFactory {
+    private static final long serialVersionUID = 1L;
 
     public static final String HDFS_ADAPTER_NAME = "hdfs";
+    public static final String CLUSTER_LOCATIONS = "cluster-locations";
+    public static transient String SCHEDULER = "hdfs-scheduler";
+
+    public static final String KEY_HDFS_URL = "hdfs";
+    public static final String KEY_PATH = "path";
+    public static final String KEY_INPUT_FORMAT = "input-format";
+    public static final String INPUT_FORMAT_TEXT = "text-input-format";
+    public static final String INPUT_FORMAT_SEQUENCE = "sequence-input-format";
+
+    private transient AlgebricksPartitionConstraint clusterLocations;
+    private String[] readSchedule;
+    private boolean executed[];
+    private InputSplitsFactory inputSplitsFactory;
+    private ConfFactory confFactory;
+    private boolean setup = false;
+
+    private static final Map<String, String> formatClassNames = initInputFormatMap();
+
+    private static Map<String, String> initInputFormatMap() {
+        Map<String, String> formatClassNames = new HashMap<String, String>();
+        formatClassNames.put(INPUT_FORMAT_TEXT, "org.apache.hadoop.mapred.TextInputFormat");
+        formatClassNames.put(INPUT_FORMAT_SEQUENCE, "org.apache.hadoop.mapred.SequenceFileInputFormat");
+        return formatClassNames;
+    }
 
     @Override
-    public IDatasourceAdapter createAdapter(Map<String, String> configuration, IAType atype) throws Exception {
-        HDFSAdapter hdfsAdapter = new HDFSAdapter(atype);
+    public IDatasourceAdapter createAdapter(Map<String, Object> configuration, IAType atype) throws Exception {
+        if (!setup) {
+            /** set up the factory --serializable stuff --- this if-block should be called only once for each factory instance */
+            configureJobConf(configuration);
+            JobConf conf = configureJobConf(configuration);
+            confFactory = new ConfFactory(conf);
+
+            clusterLocations = (AlgebricksPartitionConstraint) configuration.get(CLUSTER_LOCATIONS);
+            int numPartitions = ((AlgebricksAbsolutePartitionConstraint) clusterLocations).getLocations().length;
+
+            InputSplit[] inputSplits = conf.getInputFormat().getSplits(conf, numPartitions);
+            inputSplitsFactory = new InputSplitsFactory(inputSplits);
+
+            Scheduler scheduler = (Scheduler) configuration.get(SCHEDULER);
+            readSchedule = scheduler.getLocationConstraints(inputSplits);
+            executed = new boolean[readSchedule.length];
+            Arrays.fill(executed, false);
+
+            setup = true;
+        }
+        JobConf conf = confFactory.getConf();
+        InputSplit[] inputSplits = inputSplitsFactory.getSplits();
+        HDFSAdapter hdfsAdapter = new HDFSAdapter(atype, readSchedule, executed, inputSplits, conf, clusterLocations);
         hdfsAdapter.configure(configuration);
         return hdfsAdapter;
     }
@@ -39,4 +96,15 @@
         return HDFS_ADAPTER_NAME;
     }
 
+    private JobConf configureJobConf(Map<String, Object> configuration) throws Exception {
+        JobConf conf = new JobConf();
+        conf.set("fs.default.name", ((String) configuration.get(KEY_HDFS_URL)).trim());
+        conf.set("fs.hdfs.impl", "org.apache.hadoop.hdfs.DistributedFileSystem");
+        conf.setClassLoader(HDFSAdapter.class.getClassLoader());
+        conf.set("mapred.input.dir", ((String) configuration.get(KEY_PATH)).trim());
+        conf.set("mapred.input.format.class",
+                (String) formatClassNames.get(((String) configuration.get(KEY_INPUT_FORMAT)).trim()));
+        return conf;
+    }
+
 }
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/HiveAdapterFactory.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/HiveAdapterFactory.java
index 5e28eed..ba6ade5 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/HiveAdapterFactory.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/HiveAdapterFactory.java
@@ -14,20 +14,92 @@
  */
 package edu.uci.ics.asterix.external.adapter.factory;
 
+import java.util.Arrays;
+import java.util.HashMap;
 import java.util.Map;
 
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+
+import edu.uci.ics.asterix.external.dataset.adapter.HDFSAdapter;
 import edu.uci.ics.asterix.external.dataset.adapter.HiveAdapter;
 import edu.uci.ics.asterix.external.dataset.adapter.IDatasourceAdapter;
 import edu.uci.ics.asterix.om.types.IAType;
+import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
+import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
+import edu.uci.ics.hyracks.hdfs.dataflow.ConfFactory;
+import edu.uci.ics.hyracks.hdfs.dataflow.InputSplitsFactory;
+import edu.uci.ics.hyracks.hdfs.scheduler.Scheduler;
 
 /**
  * A factory class for creating an instance of HiveAdapter
  */
+@SuppressWarnings("deprecation")
 public class HiveAdapterFactory implements IGenericDatasetAdapterFactory {
+    private static final long serialVersionUID = 1L;
+
+    public static final String HDFS_ADAPTER_NAME = "hdfs";
+    public static final String CLUSTER_LOCATIONS = "cluster-locations";
+    public static transient String SCHEDULER = "hdfs-scheduler";
+
+    public static final String KEY_HDFS_URL = "hdfs";
+    public static final String KEY_PATH = "path";
+    public static final String KEY_INPUT_FORMAT = "input-format";
+    public static final String INPUT_FORMAT_TEXT = "text-input-format";
+    public static final String INPUT_FORMAT_SEQUENCE = "sequence-input-format";
+
+    public static final String KEY_FORMAT = "format";
+    public static final String KEY_PARSER_FACTORY = "parser";
+    public static final String FORMAT_DELIMITED_TEXT = "delimited-text";
+    public static final String FORMAT_ADM = "adm";
+
+    public static final String HIVE_DATABASE = "database";
+    public static final String HIVE_TABLE = "table";
+    public static final String HIVE_HOME = "hive-home";
+    public static final String HIVE_METASTORE_URI = "metastore-uri";
+    public static final String HIVE_WAREHOUSE_DIR = "warehouse-dir";
+    public static final String HIVE_METASTORE_RAWSTORE_IMPL = "rawstore-impl";
+
+    private String[] readSchedule;
+    private boolean executed[];
+    private InputSplitsFactory inputSplitsFactory;
+    private ConfFactory confFactory;
+    private transient AlgebricksPartitionConstraint clusterLocations;
+    private boolean setup = false;
+
+    private static final Map<String, String> formatClassNames = initInputFormatMap();
+
+    private static Map<String, String> initInputFormatMap() {
+        Map<String, String> formatClassNames = new HashMap<String, String>();
+        formatClassNames.put(INPUT_FORMAT_TEXT, "org.apache.hadoop.mapred.TextInputFormat");
+        formatClassNames.put(INPUT_FORMAT_SEQUENCE, "org.apache.hadoop.mapred.SequenceFileInputFormat");
+        return formatClassNames;
+    }
 
     @Override
-    public IDatasourceAdapter createAdapter(Map<String, String> configuration, IAType type) throws Exception {
-        HiveAdapter hiveAdapter = new HiveAdapter(type);
+    public IDatasourceAdapter createAdapter(Map<String, Object> configuration, IAType atype) throws Exception {
+        if (!setup) {
+            /** set up the factory --serializable stuff --- this if-block should be called only once for each factory instance */
+            configureJobConf(configuration);
+            JobConf conf = configureJobConf(configuration);
+            confFactory = new ConfFactory(conf);
+
+            clusterLocations = (AlgebricksPartitionConstraint) configuration.get(CLUSTER_LOCATIONS);
+            int numPartitions = ((AlgebricksAbsolutePartitionConstraint) clusterLocations).getLocations().length;
+
+            InputSplit[] inputSplits = conf.getInputFormat().getSplits(conf, numPartitions);
+            inputSplitsFactory = new InputSplitsFactory(inputSplits);
+
+            Scheduler scheduler = (Scheduler) configuration.get(SCHEDULER);
+            readSchedule = scheduler.getLocationConstraints(inputSplits);
+            executed = new boolean[readSchedule.length];
+            Arrays.fill(executed, false);
+
+            setup = true;
+        }
+        JobConf conf = confFactory.getConf();
+        InputSplit[] inputSplits = inputSplitsFactory.getSplits();
+        HiveAdapter hiveAdapter = new HiveAdapter(atype, readSchedule, executed, inputSplits, conf, clusterLocations);
         hiveAdapter.configure(configuration);
         return hiveAdapter;
     }
@@ -36,4 +108,37 @@
     public String getName() {
         return "hive";
     }
+
+    private JobConf configureJobConf(Map<String, Object> configuration) throws Exception {
+        JobConf conf = new JobConf();
+
+        /** configure hive */
+        String database = (String) configuration.get(HIVE_DATABASE);
+        String tablePath = null;
+        if (database == null) {
+            tablePath = configuration.get(HIVE_WAREHOUSE_DIR) + "/" + configuration.get(HIVE_TABLE);
+        } else {
+            tablePath = configuration.get(HIVE_WAREHOUSE_DIR) + "/" + tablePath + ".db" + "/"
+                    + configuration.get(HIVE_TABLE);
+        }
+        configuration.put(HDFSAdapter.KEY_PATH, tablePath);
+        if (!configuration.get(KEY_FORMAT).equals(FORMAT_DELIMITED_TEXT)) {
+            throw new IllegalArgumentException("format" + configuration.get(KEY_FORMAT) + " is not supported");
+        }
+
+        if (!(configuration.get(HDFSAdapterFactory.KEY_INPUT_FORMAT).equals(HDFSAdapterFactory.INPUT_FORMAT_TEXT) || configuration
+                .get(HDFSAdapterFactory.KEY_INPUT_FORMAT).equals(HDFSAdapterFactory.INPUT_FORMAT_SEQUENCE))) {
+            throw new IllegalArgumentException("file input format"
+                    + configuration.get(HDFSAdapterFactory.KEY_INPUT_FORMAT) + " is not supported");
+        }
+
+        /** configure hdfs */
+        conf.set("fs.default.name", ((String) configuration.get(KEY_HDFS_URL)).trim());
+        conf.set("fs.hdfs.impl", "org.apache.hadoop.hdfs.DistributedFileSystem");
+        conf.setClassLoader(HDFSAdapter.class.getClassLoader());
+        conf.set("mapred.input.dir", ((String) configuration.get(KEY_PATH)).trim());
+        conf.set("mapred.input.format.class",
+                (String) formatClassNames.get(((String) configuration.get(KEY_INPUT_FORMAT)).trim()));
+        return conf;
+    }
 }
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/IAdapterFactory.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/IAdapterFactory.java
index 45fd6cf..697c8ea 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/IAdapterFactory.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/IAdapterFactory.java
@@ -14,12 +14,14 @@
  */
 package edu.uci.ics.asterix.external.adapter.factory;
 
+import java.io.Serializable;
+
 /**
  * Base interface for IGenericDatasetAdapterFactory and ITypedDatasetAdapterFactory.
  * Acts as a marker interface indicating that the implementation provides functionality
  * for creating an adapter.
  */
-public interface IAdapterFactory {
+public interface IAdapterFactory extends Serializable {
 
     /**
      * Returns the display name corresponding to the Adapter type that is created by the factory.
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/IGenericDatasetAdapterFactory.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/IGenericDatasetAdapterFactory.java
index 093a3dd..e8d120a 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/IGenericDatasetAdapterFactory.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/IGenericDatasetAdapterFactory.java
@@ -38,6 +38,6 @@
      * @return An instance of IDatasourceAdapter.
      * @throws Exception
      */
-    public IDatasourceAdapter createAdapter(Map<String, String> configuration, IAType atype) throws Exception;
+    public IDatasourceAdapter createAdapter(Map<String, Object> configuration, IAType atype) throws Exception;
 
 }
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/ITypedDatasetAdapterFactory.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/ITypedDatasetAdapterFactory.java
index 0f9978e..84a5ca8 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/ITypedDatasetAdapterFactory.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/ITypedDatasetAdapterFactory.java
@@ -33,6 +33,6 @@
      * @return An instance of IDatasourceAdapter.
      * @throws Exception
      */
-    public IDatasourceAdapter createAdapter(Map<String, String> configuration) throws Exception;
+    public IDatasourceAdapter createAdapter(Map<String, Object> configuration) throws Exception;
 
 }
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/NCFileSystemAdapterFactory.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/NCFileSystemAdapterFactory.java
index 2040949..659fd23 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/NCFileSystemAdapterFactory.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/NCFileSystemAdapterFactory.java
@@ -26,11 +26,11 @@
  * an NC.
  */
 public class NCFileSystemAdapterFactory implements IGenericDatasetAdapterFactory {
-
+    private static final long serialVersionUID = 1L;
     public static final String NC_FILE_SYSTEM_ADAPTER_NAME = "localfs";
 
     @Override
-    public IDatasourceAdapter createAdapter(Map<String, String> configuration, IAType atype) throws Exception {
+    public IDatasourceAdapter createAdapter(Map<String, Object> configuration, IAType atype) throws Exception {
         NCFileSystemAdapter fsAdapter = new NCFileSystemAdapter(atype);
         fsAdapter.configure(configuration);
         return fsAdapter;
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/PullBasedTwitterAdapterFactory.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/PullBasedTwitterAdapterFactory.java
index bc00469..e63be17 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/PullBasedTwitterAdapterFactory.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/PullBasedTwitterAdapterFactory.java
@@ -25,11 +25,11 @@
  * via pull-based Twitter API.
  */
 public class PullBasedTwitterAdapterFactory implements ITypedDatasetAdapterFactory {
-
+    private static final long serialVersionUID = 1L;
     public static final String PULL_BASED_TWITTER_ADAPTER_NAME = "pull_twitter";
 
     @Override
-    public IDatasourceAdapter createAdapter(Map<String, String> configuration) throws Exception {
+    public IDatasourceAdapter createAdapter(Map<String, Object> configuration) throws Exception {
         PullBasedTwitterAdapter twitterAdapter = new PullBasedTwitterAdapter();
         twitterAdapter.configure(configuration);
         return twitterAdapter;
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/RSSFeedAdapterFactory.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/RSSFeedAdapterFactory.java
index bbbea38..3cd22e8 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/RSSFeedAdapterFactory.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/RSSFeedAdapterFactory.java
@@ -24,11 +24,11 @@
  * RSSFeedAdapter provides the functionality of fetching an RSS based feed.
  */
 public class RSSFeedAdapterFactory implements ITypedDatasetAdapterFactory {
-
+    private static final long serialVersionUID = 1L;
     public static final String RSS_FEED_ADAPTER_NAME = "rss_feed";
 
     @Override
-    public IDatasourceAdapter createAdapter(Map<String, String> configuration) throws Exception {
+    public IDatasourceAdapter createAdapter(Map<String, Object> configuration) throws Exception {
         RSSFeedAdapter rssFeedAdapter = new RSSFeedAdapter();
         rssFeedAdapter.configure(configuration);
         return rssFeedAdapter;
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/data/operator/ExternalDataScanOperatorDescriptor.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/data/operator/ExternalDataScanOperatorDescriptor.java
index fb4cc99..f9f72cf 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/data/operator/ExternalDataScanOperatorDescriptor.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/data/operator/ExternalDataScanOperatorDescriptor.java
@@ -19,8 +19,6 @@
 import edu.uci.ics.asterix.external.adapter.factory.IGenericDatasetAdapterFactory;
 import edu.uci.ics.asterix.external.dataset.adapter.IDatasourceAdapter;
 import edu.uci.ics.asterix.om.types.IAType;
-import edu.uci.ics.hyracks.api.application.ICCApplicationContext;
-import edu.uci.ics.hyracks.api.constraints.IConstraintAcceptor;
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
 import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
 import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
@@ -38,61 +36,24 @@
 
     private static final long serialVersionUID = 1L;
 
-    private final String adapterFactory;
-    private final Map<String, String> adapterConfiguration;
+    private final Map<String, Object> adapterConfiguration;
     private final IAType atype;
     private IGenericDatasetAdapterFactory datasourceAdapterFactory;
 
-    public ExternalDataScanOperatorDescriptor(JobSpecification spec, String adapter, Map<String, String> arguments,
-            IAType atype, RecordDescriptor rDesc) {
+    public ExternalDataScanOperatorDescriptor(JobSpecification spec, Map<String, Object> arguments, IAType atype,
+            RecordDescriptor rDesc, IGenericDatasetAdapterFactory dataSourceAdapterFactory) {
         super(spec, 0, 1);
         recordDescriptors[0] = rDesc;
-        this.adapterFactory = adapter;
         this.adapterConfiguration = arguments;
         this.atype = atype;
-    }
-
-    @Override
-    public void contributeSchedulingConstraints(IConstraintAcceptor constraintAcceptor, ICCApplicationContext appCtx) {
-
-        /*
-        Comment: The following code is commented out. This is because constraints are being set at compile time so that they can 
-        be propagated to upstream Asterix operators. Hyracks has to provide a way to propagate constraints to upstream operators.
-        Once it is there, we will uncomment the following code.  
-        
-        AlgebricksPartitionConstraint constraint = datasourceReadAdapter.getPartitionConstraint();
-        switch (constraint.getPartitionConstraintType()) {
-            case ABSOLUTE:
-                String[] locations = ((AlgebricksAbsolutePartitionConstraint) constraint).getLocations();
-                for (int i = 0; i < locations.length; ++i) {
-                    constraintAcceptor.addConstraint(new Constraint(new PartitionLocationExpression(this.odId, i),
-                            new ConstantExpression(locations[i])));
-                }
-                constraintAcceptor.addConstraint(new Constraint(new PartitionCountExpression(this.odId),
-                        new ConstantExpression(locations.length)));
-
-                break;
-            case COUNT:
-                constraintAcceptor.addConstraint(new Constraint(new PartitionCountExpression(this.odId),
-                        new ConstantExpression(((AlgebricksCountPartitionConstraint) constraint).getCount())));
-                break;
-            default:
-                throw new IllegalStateException(" Constraint type :" + constraint.getPartitionConstraintType()
-                        + " not supported");
-
-        }*/
-
+        this.datasourceAdapterFactory = dataSourceAdapterFactory;
     }
 
     @Override
     public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
             IRecordDescriptorProvider recordDescProvider, final int partition, int nPartitions)
             throws HyracksDataException {
-        try {
-            datasourceAdapterFactory = (IGenericDatasetAdapterFactory) Class.forName(adapterFactory).newInstance();
-        } catch (Exception e) {
-            throw new HyracksDataException("initialization of adapter failed", e);
-        }
+
         return new AbstractUnaryOutputSourceOperatorNodePushable() {
             @Override
             public void initialize() throws HyracksDataException {
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/data/operator/FeedIntakeOperatorDescriptor.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/data/operator/FeedIntakeOperatorDescriptor.java
index 2da4e76..f07168a 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/data/operator/FeedIntakeOperatorDescriptor.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/data/operator/FeedIntakeOperatorDescriptor.java
@@ -35,52 +35,46 @@
  * Operator responsible for ingesting data from an external source. This
  * operator uses a (configurable) adapter associated with the feed dataset.
  */
-public class FeedIntakeOperatorDescriptor extends
-		AbstractSingleActivityOperatorDescriptor {
+public class FeedIntakeOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor {
 
-	private static final long serialVersionUID = 1L;
+    private static final long serialVersionUID = 1L;
 
-	private final String adapterFactoryClassName;
-	private final Map<String, String> adapterConfiguration;
-	private final IAType atype;
-	private final FeedId feedId;
+    private final String adapterFactoryClassName;
+    private final Map<String, Object> adapterConfiguration;
+    private final IAType atype;
+    private final FeedId feedId;
+    private final IAdapterFactory datasourceAdapterFactory;
 
-	private transient IAdapterFactory datasourceAdapterFactory;
+    public FeedIntakeOperatorDescriptor(JobSpecification spec, FeedId feedId, String adapter,
+            Map<String, Object> arguments, ARecordType atype, RecordDescriptor rDesc,
+            IAdapterFactory datasourceAdapterFactory) {
+        super(spec, 1, 1);
+        recordDescriptors[0] = rDesc;
+        this.adapterFactoryClassName = adapter;
+        this.adapterConfiguration = arguments;
+        this.atype = atype;
+        this.feedId = feedId;
+        this.datasourceAdapterFactory = datasourceAdapterFactory;
+    }
 
-	public FeedIntakeOperatorDescriptor(JobSpecification spec, FeedId feedId,
-			String adapter, Map<String, String> arguments, ARecordType atype,
-			RecordDescriptor rDesc) {
-		super(spec, 1, 1);
-		recordDescriptors[0] = rDesc;
-		this.adapterFactoryClassName = adapter;
-		this.adapterConfiguration = arguments;
-		this.atype = atype;
-		this.feedId = feedId;
-	}
-
-	public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx,
-			IRecordDescriptorProvider recordDescProvider, final int partition,
-			int nPartitions) throws HyracksDataException {
-		ITypedDatasourceAdapter adapter;
-		try {
-			datasourceAdapterFactory = (IAdapterFactory) Class.forName(
-					adapterFactoryClassName).newInstance();
-			if (datasourceAdapterFactory instanceof IGenericDatasetAdapterFactory) {
-				adapter = (ITypedDatasourceAdapter) ((IGenericDatasetAdapterFactory) datasourceAdapterFactory)
-						.createAdapter(adapterConfiguration, atype);
-			} else if (datasourceAdapterFactory instanceof ITypedDatasetAdapterFactory) {
-				adapter = (ITypedDatasourceAdapter) ((ITypedDatasetAdapterFactory) datasourceAdapterFactory)
-						.createAdapter(adapterConfiguration);
-			} else {
-				throw new IllegalStateException(
-						" Unknown adapter factory type for "
-								+ adapterFactoryClassName);
-			}
-			adapter.initialize(ctx);
-		} catch (Exception e) {
-			throw new HyracksDataException("initialization of adapter failed",
-					e);
-		}
-		return new FeedIntakeOperatorNodePushable(feedId, adapter, partition);
-	}
+    public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx,
+            IRecordDescriptorProvider recordDescProvider, final int partition, int nPartitions)
+            throws HyracksDataException {
+        ITypedDatasourceAdapter adapter;
+        try {
+            if (datasourceAdapterFactory instanceof IGenericDatasetAdapterFactory) {
+                adapter = (ITypedDatasourceAdapter) ((IGenericDatasetAdapterFactory) datasourceAdapterFactory)
+                        .createAdapter(adapterConfiguration, atype);
+            } else if (datasourceAdapterFactory instanceof ITypedDatasetAdapterFactory) {
+                adapter = (ITypedDatasourceAdapter) ((ITypedDatasetAdapterFactory) datasourceAdapterFactory)
+                        .createAdapter(adapterConfiguration);
+            } else {
+                throw new IllegalStateException(" Unknown adapter factory type for " + adapterFactoryClassName);
+            }
+            adapter.initialize(ctx);
+        } catch (Exception e) {
+            throw new HyracksDataException("initialization of adapter failed", e);
+        }
+        return new FeedIntakeOperatorNodePushable(feedId, adapter, partition);
+    }
 }
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/data/operator/FeedIntakeOperatorNodePushable.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/data/operator/FeedIntakeOperatorNodePushable.java
index d0dbb98..fd40b03 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/data/operator/FeedIntakeOperatorNodePushable.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/data/operator/FeedIntakeOperatorNodePushable.java
@@ -17,6 +17,7 @@
 import java.nio.ByteBuffer;
 import java.util.concurrent.LinkedBlockingQueue;
 
+import edu.uci.ics.asterix.common.api.AsterixThreadExecutor;
 import edu.uci.ics.asterix.external.dataset.adapter.IDatasourceAdapter;
 import edu.uci.ics.asterix.external.feed.lifecycle.AlterFeedMessage;
 import edu.uci.ics.asterix.external.feed.lifecycle.FeedId;
@@ -51,7 +52,7 @@
     public void open() throws HyracksDataException {
         if (adapter instanceof IManagedFeedAdapter) {
             feedInboxMonitor = new FeedInboxMonitor((IManagedFeedAdapter) adapter, inbox, partition);
-            feedInboxMonitor.start();
+            AsterixThreadExecutor.INSTANCE.execute(feedInboxMonitor);
             feedManager.registerFeedMsgQueue(feedId, inbox);
         }
         writer.open();
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/AbstractDatasourceAdapter.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/AbstractDatasourceAdapter.java
index 440ee8c..23e545d 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/AbstractDatasourceAdapter.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/AbstractDatasourceAdapter.java
@@ -36,7 +36,7 @@
 
     private static final long serialVersionUID = 1L;
 
-    protected Map<String, String> configuration;
+    protected Map<String, Object> configuration;
     protected transient AlgebricksPartitionConstraint partitionConstraint;
     protected IAType atype;
     protected IHyracksTaskContext ctx;
@@ -51,15 +51,15 @@
         typeToValueParserFactMap.put(ATypeTag.STRING, UTF8StringParserFactory.INSTANCE);
     }
 
-    protected static final Map<String, String> formatToParserFactoryMap = initializeFormatParserFactoryMap();
+    protected static final Map<String, Object> formatToParserFactoryMap = initializeFormatParserFactoryMap();
 
     public static final String KEY_FORMAT = "format";
     public static final String KEY_PARSER_FACTORY = "parser";
     public static final String FORMAT_DELIMITED_TEXT = "delimited-text";
     public static final String FORMAT_ADM = "adm";
 
-    private static Map<String, String> initializeFormatParserFactoryMap() {
-        Map<String, String> map = new HashMap<String, String>();
+    private static Map<String, Object> initializeFormatParserFactoryMap() {
+        Map<String, Object> map = new HashMap<String, Object>();
         map.put(FORMAT_DELIMITED_TEXT, "edu.uci.ics.asterix.runtime.operators.file.NtDelimitedDataTupleParserFactory");
         map.put(FORMAT_ADM, "edu.uci.ics.asterix.runtime.operators.file.AdmSchemafullRecordParserFactory");
         return map;
@@ -77,7 +77,7 @@
      * @param attribute
      *            The attribute whose value needs to be obtained.
      */
-    public String getAdapterProperty(String attribute) {
+    public Object getAdapterProperty(String attribute) {
         return configuration.get(attribute);
     }
 
@@ -86,7 +86,7 @@
      * 
      * @return A Map<String,String> instance representing the adapter configuration.
      */
-    public Map<String, String> getConfiguration() {
+    public Map<String, Object> getConfiguration() {
         return configuration;
     }
 
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/CNNFeedAdapter.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/CNNFeedAdapter.java
index 3898f7e..8976f7a 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/CNNFeedAdapter.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/CNNFeedAdapter.java
@@ -70,9 +70,9 @@
     }
 
     @Override
-    public void configure(Map<String, String> arguments) throws Exception {
+    public void configure(Map<String, Object> arguments) throws Exception {
         configuration = arguments;
-        String rssURLProperty = configuration.get(KEY_RSS_URL);
+        String rssURLProperty = (String) configuration.get(KEY_RSS_URL);
         if (rssURLProperty == null) {
             throw new IllegalArgumentException("no rss url provided");
         }
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/FileSystemBasedAdapter.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/FileSystemBasedAdapter.java
index 9f8cedc..8ab252d 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/FileSystemBasedAdapter.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/FileSystemBasedAdapter.java
@@ -69,7 +69,7 @@
     public abstract void initialize(IHyracksTaskContext ctx) throws Exception;
 
     @Override
-    public abstract void configure(Map<String, String> arguments) throws Exception;
+    public abstract void configure(Map<String, Object> arguments) throws Exception;
 
     @Override
     public abstract AdapterType getAdapterType();
@@ -82,14 +82,14 @@
     }
 
     protected void configureFormat() throws Exception {
-        String parserFactoryClassname = configuration.get(KEY_PARSER_FACTORY);
+        String parserFactoryClassname = (String) configuration.get(KEY_PARSER_FACTORY);
         if (parserFactoryClassname == null) {
-            String specifiedFormat = configuration.get(KEY_FORMAT);
+            String specifiedFormat = (String) configuration.get(KEY_FORMAT);
             if (specifiedFormat == null) {
                 throw new IllegalArgumentException(" Unspecified data format");
             } else if (FORMAT_DELIMITED_TEXT.equalsIgnoreCase(specifiedFormat)) {
                 parserFactory = getDelimitedDataTupleParserFactory((ARecordType) atype);
-            } else if (FORMAT_ADM.equalsIgnoreCase(configuration.get(KEY_FORMAT))) {
+            } else if (FORMAT_ADM.equalsIgnoreCase((String)configuration.get(KEY_FORMAT))) {
                 parserFactory = getADMDataTupleParserFactory((ARecordType) atype);
             } else {
                 throw new IllegalArgumentException(" format " + configuration.get(KEY_FORMAT) + " not supported");
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/HDFSAdapter.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/HDFSAdapter.java
index 1e05b2f..02eacf5 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/HDFSAdapter.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/HDFSAdapter.java
@@ -14,18 +14,9 @@
  */
 package edu.uci.ics.asterix.external.dataset.adapter;
 
-import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.InputStream;
-import java.net.UnknownHostException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
-import java.util.Random;
-import java.util.Set;
-import java.util.logging.Level;
-import java.util.logging.Logger;
 
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapred.Counters.Counter;
@@ -37,14 +28,9 @@
 import org.apache.hadoop.mapred.TextInputFormat;
 
 import edu.uci.ics.asterix.om.types.IAType;
-import edu.uci.ics.asterix.om.util.AsterixRuntimeUtil;
-import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
-import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksCountPartitionConstraint;
 import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
 import edu.uci.ics.hyracks.algebricks.common.exceptions.NotImplementedException;
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.dataflow.hadoop.util.InputSplitsProxy;
 
 /**
  * Provides functionality for fetching external data stored in an HDFS instance.
@@ -53,118 +39,29 @@
 public class HDFSAdapter extends FileSystemBasedAdapter {
 
     private static final long serialVersionUID = 1L;
-    private static final Logger LOGGER = Logger.getLogger(HDFSAdapter.class.getName());
 
-    public static final String KEY_HDFS_URL = "hdfs";
-    public static final String KEY_INPUT_FORMAT = "input-format";
-    public static final String INPUT_FORMAT_TEXT = "text-input-format";
-    public static final String INPUT_FORMAT_SEQUENCE = "sequence-input-format";
-
-    private Object[] inputSplits;
+    private transient String[] readSchedule;
+    private transient boolean executed[];
+    private transient InputSplit[] inputSplits;
     private transient JobConf conf;
-    private InputSplitsProxy inputSplitsProxy;
-    private static final Map<String, String> formatClassNames = initInputFormatMap();
+    private transient AlgebricksPartitionConstraint clusterLocations;
 
-    private static Map<String, String> initInputFormatMap() {
-        Map<String, String> formatClassNames = new HashMap<String, String>();
-        formatClassNames.put(INPUT_FORMAT_TEXT, "org.apache.hadoop.mapred.TextInputFormat");
-        formatClassNames.put(INPUT_FORMAT_SEQUENCE, "org.apache.hadoop.mapred.SequenceFileInputFormat");
-        return formatClassNames;
-    }
+    private transient String nodeName;
 
-    public HDFSAdapter(IAType atype) {
+    public HDFSAdapter(IAType atype, String[] readSchedule, boolean[] executed, InputSplit[] inputSplits, JobConf conf,
+            AlgebricksPartitionConstraint clusterLocations) {
         super(atype);
+        this.readSchedule = readSchedule;
+        this.executed = executed;
+        this.inputSplits = inputSplits;
+        this.conf = conf;
+        this.clusterLocations = clusterLocations;
     }
 
     @Override
-    public void configure(Map<String, String> arguments) throws Exception {
-        configuration = arguments;
+    public void configure(Map<String, Object> arguments) throws Exception {
+        this.configuration = arguments;
         configureFormat();
-        configureJobConf();
-        configureSplits();
-    }
-
-    private void configureSplits() throws IOException {
-        if (inputSplitsProxy == null) {
-            inputSplits = conf.getInputFormat().getSplits(conf, 0);
-        }
-        inputSplitsProxy = new InputSplitsProxy(conf, inputSplits);
-    }
-
-    private void configurePartitionConstraint() throws Exception {
-        List<String> locations = new ArrayList<String>();
-        Random random = new Random();
-        boolean couldConfigureLocationConstraints = false;
-        try {
-            Map<String, Set<String>> nodeControllers = AsterixRuntimeUtil.getNodeControllerMap();
-            for (Object inputSplit : inputSplits) {
-                String[] dataNodeLocations = ((InputSplit) inputSplit).getLocations();
-                if (dataNodeLocations == null || dataNodeLocations.length == 0) {
-                    throw new IllegalArgumentException("No datanode locations found: check hdfs path");
-                }
-
-                // loop over all replicas until a split location coincides
-                // with an asterix datanode location
-                for (String datanodeLocation : dataNodeLocations) {
-                    Set<String> nodeControllersAtLocation = null;
-                    try {
-                        nodeControllersAtLocation = nodeControllers.get(AsterixRuntimeUtil
-                                .getIPAddress(datanodeLocation));
-                    } catch (UnknownHostException uhe) {
-                        if (LOGGER.isLoggable(Level.WARNING)) {
-                            LOGGER.log(Level.WARNING, "Unknown host :" + datanodeLocation);
-                        }
-                        continue;
-                    }
-                    if (nodeControllersAtLocation == null || nodeControllersAtLocation.size() == 0) {
-                        if (LOGGER.isLoggable(Level.WARNING)) {
-                            LOGGER.log(Level.WARNING, "No node controller found at " + datanodeLocation
-                                    + " will look at replica location");
-                        }
-                        couldConfigureLocationConstraints = false;
-                    } else {
-                        int locationIndex = random.nextInt(nodeControllersAtLocation.size());
-                        String chosenLocation = (String) nodeControllersAtLocation.toArray()[locationIndex];
-                        locations.add(chosenLocation);
-                        if (LOGGER.isLoggable(Level.INFO)) {
-                            LOGGER.log(Level.INFO, "split : " + inputSplit + " to be processed by :" + chosenLocation);
-                        }
-                        couldConfigureLocationConstraints = true;
-                        break;
-                    }
-                }
-
-                /* none of the replica locations coincides with an Asterix
-                   node controller location.
-                */
-                if (!couldConfigureLocationConstraints) {
-                    List<String> allNodeControllers = AsterixRuntimeUtil.getAllNodeControllers();
-                    int locationIndex = random.nextInt(allNodeControllers.size());
-                    String chosenLocation = allNodeControllers.get(locationIndex);
-                    locations.add(chosenLocation);
-                    if (LOGGER.isLoggable(Level.SEVERE)) {
-                        LOGGER.log(Level.SEVERE, "No local node controller found to process split : " + inputSplit
-                                + " will be processed by a remote node controller:" + chosenLocation);
-                    }
-                }
-            }
-            partitionConstraint = new AlgebricksAbsolutePartitionConstraint(locations.toArray(new String[] {}));
-        } catch (Exception e) {
-            if (LOGGER.isLoggable(Level.SEVERE)) {
-                LOGGER.log(Level.SEVERE, "Encountered exception :" + e + " using count constraints");
-            }
-            partitionConstraint = new AlgebricksCountPartitionConstraint(inputSplits.length);
-        }
-    }
-
-    private JobConf configureJobConf() throws Exception {
-        conf = new JobConf();
-        conf.set("fs.default.name", configuration.get(KEY_HDFS_URL).trim());
-        conf.set("fs.hdfs.impl", "org.apache.hadoop.hdfs.DistributedFileSystem");
-        conf.setClassLoader(HDFSAdapter.class.getClassLoader());
-        conf.set("mapred.input.dir", configuration.get(KEY_PATH).trim());
-        conf.set("mapred.input.format.class", formatClassNames.get(configuration.get(KEY_INPUT_FORMAT).trim()));
-        return conf;
     }
 
     public AdapterType getAdapterType() {
@@ -174,7 +71,7 @@
     @Override
     public void initialize(IHyracksTaskContext ctx) throws Exception {
         this.ctx = ctx;
-        inputSplits = inputSplitsProxy.toInputSplits(conf);
+        this.nodeName = ctx.getJobletContext().getApplicationContext().getNodeId();
     }
 
     private Reporter getReporter() {
@@ -215,98 +112,124 @@
         return reporter;
     }
 
-    @SuppressWarnings("unchecked")
     @Override
     public InputStream getInputStream(int partition) throws IOException {
-        try {
-            InputStream inputStream;
-            if (conf.getInputFormat() instanceof SequenceFileInputFormat) {
-                SequenceFileInputFormat format = (SequenceFileInputFormat) conf.getInputFormat();
-                RecordReader reader = format.getRecordReader(
-                        (org.apache.hadoop.mapred.FileSplit) inputSplits[partition], conf, getReporter());
-                inputStream = new HDFSStream(reader, ctx);
-            } else {
-                try {
+
+        return new InputStream() {
+
+            private RecordReader<Object, Text> reader;
+            private Object key;
+            private Text value;
+            private boolean hasMore = false;
+            private int EOL = "\n".getBytes()[0];
+            private Text pendingValue = null;
+            private int currentSplitIndex = 0;
+
+            @SuppressWarnings("unchecked")
+            private boolean moveToNext() throws IOException {
+                for (; currentSplitIndex < inputSplits.length; currentSplitIndex++) {
+                    /**
+                     * read all the partitions scheduled to the current node
+                     */
+                    if (readSchedule[currentSplitIndex].equals(nodeName)) {
+                        /**
+                         * pick an unread split to read
+                         * synchronize among simultaneous partitions in the same machine
+                         */
+                        synchronized (executed) {
+                            if (executed[currentSplitIndex] == false) {
+                                executed[currentSplitIndex] = true;
+                            } else {
+                                continue;
+                            }
+                        }
+
+                        /**
+                         * read the split
+                         */
+                        reader = getRecordReader(currentSplitIndex);
+                        key = reader.createKey();
+                        value = (Text) reader.createValue();
+                        return true;
+                    }
+                }
+                return false;
+            }
+
+            @Override
+            public int read(byte[] buffer, int offset, int len) throws IOException {
+                if (reader == null) {
+                    if (!moveToNext()) {
+                        //nothing to read
+                        return -1;
+                    }
+                }
+
+                int numBytes = 0;
+                if (pendingValue != null) {
+                    System.arraycopy(pendingValue.getBytes(), 0, buffer, offset + numBytes, pendingValue.getLength());
+                    buffer[offset + numBytes + pendingValue.getLength()] = (byte) EOL;
+                    numBytes += pendingValue.getLength() + 1;
+                    pendingValue = null;
+                }
+
+                while (numBytes < len) {
+                    hasMore = reader.next(key, value);
+                    if (!hasMore) {
+                        while (moveToNext()) {
+                            hasMore = reader.next(key, value);
+                            if (hasMore) {
+                                //move to the next non-empty split
+                                break;
+                            }
+                        }
+                    }
+                    if (!hasMore) {
+                        return (numBytes == 0) ? -1 : numBytes;
+                    }
+                    int sizeOfNextTuple = value.getLength() + 1;
+                    if (numBytes + sizeOfNextTuple > len) {
+                        // cannot add tuple to current buffer
+                        // but the reader has moved pass the fetched tuple
+                        // we need to store this for a subsequent read call.
+                        // and return this then.
+                        pendingValue = value;
+                        break;
+                    } else {
+                        System.arraycopy(value.getBytes(), 0, buffer, offset + numBytes, value.getLength());
+                        buffer[offset + numBytes + value.getLength()] = (byte) EOL;
+                        numBytes += sizeOfNextTuple;
+                    }
+                }
+                return numBytes;
+            }
+
+            @Override
+            public int read() throws IOException {
+                throw new NotImplementedException("Use read(byte[], int, int");
+            }
+
+            private RecordReader getRecordReader(int slitIndex) throws IOException {
+                if (conf.getInputFormat() instanceof SequenceFileInputFormat) {
+                    SequenceFileInputFormat format = (SequenceFileInputFormat) conf.getInputFormat();
+                    RecordReader reader = format.getRecordReader(
+                            (org.apache.hadoop.mapred.FileSplit) inputSplits[slitIndex], conf, getReporter());
+                    return reader;
+                } else {
                     TextInputFormat format = (TextInputFormat) conf.getInputFormat();
                     RecordReader reader = format.getRecordReader(
-                            (org.apache.hadoop.mapred.FileSplit) inputSplits[partition], conf, getReporter());
-                    inputStream = new HDFSStream(reader, ctx);
-                } catch (FileNotFoundException e) {
-                    throw new HyracksDataException(e);
+                            (org.apache.hadoop.mapred.FileSplit) inputSplits[slitIndex], conf, getReporter());
+                    return reader;
                 }
             }
-            return inputStream;
-        } catch (Exception e) {
-            throw new IOException(e);
-        }
+
+        };
 
     }
 
     @Override
     public AlgebricksPartitionConstraint getPartitionConstraint() throws Exception {
-        if (partitionConstraint == null) {
-            configurePartitionConstraint();
-        }
-        return partitionConstraint;
-    }
-
-}
-
-class HDFSStream extends InputStream {
-
-    private RecordReader<Object, Text> reader;
-    private final Object key;
-    private final Text value;
-    private boolean hasMore = false;
-    private static final int EOL = "\n".getBytes()[0];
-    private Text pendingValue = null;
-
-    public HDFSStream(RecordReader<Object, Text> reader, IHyracksTaskContext ctx) throws Exception {
-        this.reader = reader;
-        key = reader.createKey();
-        try {
-            value = (Text) reader.createValue();
-        } catch (ClassCastException cce) {
-            throw new Exception("value is not of type org.apache.hadoop.io.Text"
-                    + " type not supported in sequence file format", cce);
-        }
-    }
-
-    @Override
-    public int read(byte[] buffer, int offset, int len) throws IOException {
-        int numBytes = 0;
-        if (pendingValue != null) {
-            System.arraycopy(pendingValue.getBytes(), 0, buffer, offset + numBytes, pendingValue.getLength());
-            buffer[offset + numBytes + pendingValue.getLength()] = (byte) EOL;
-            numBytes += pendingValue.getLength() + 1;
-            pendingValue = null;
-        }
-
-        while (numBytes < len) {
-            hasMore = reader.next(key, value);
-            if (!hasMore) {
-                return (numBytes == 0) ? -1 : numBytes;
-            }
-            int sizeOfNextTuple = value.getLength() + 1;
-            if (numBytes + sizeOfNextTuple > len) {
-                // cannot add tuple to current buffer
-                // but the reader has moved pass the fetched tuple
-                // we need to store this for a subsequent read call.
-                // and return this then.
-                pendingValue = value;
-                break;
-            } else {
-                System.arraycopy(value.getBytes(), 0, buffer, offset + numBytes, value.getLength());
-                buffer[offset + numBytes + value.getLength()] = (byte) EOL;
-                numBytes += sizeOfNextTuple;
-            }
-        }
-        return numBytes;
-    }
-
-    @Override
-    public int read() throws IOException {
-        throw new NotImplementedException("Use read(byte[], int, int");
+        return clusterLocations;
     }
 
 }
\ No newline at end of file
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/HiveAdapter.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/HiveAdapter.java
index 3731eba..5e48834 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/HiveAdapter.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/HiveAdapter.java
@@ -16,6 +16,9 @@
 
 import java.util.Map;
 
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+
 import edu.uci.ics.asterix.om.types.IAType;
 import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
 import edu.uci.ics.hyracks.api.comm.IFrameWriter;
@@ -24,6 +27,7 @@
 /**
  * Provides the functionality of fetching data in form of ADM records from a Hive dataset.
  */
+@SuppressWarnings("deprecation")
 public class HiveAdapter extends AbstractDatasourceAdapter {
 
     private static final long serialVersionUID = 1L;
@@ -37,8 +41,9 @@
 
     private HDFSAdapter hdfsAdapter;
 
-    public HiveAdapter(IAType atype) {
-        this.hdfsAdapter = new HDFSAdapter(atype);
+    public HiveAdapter(IAType atype, String[] readSchedule, boolean[] executed, InputSplit[] inputSplits, JobConf conf,
+            AlgebricksPartitionConstraint clusterLocations) {
+        this.hdfsAdapter = new HDFSAdapter(atype, readSchedule, executed, inputSplits, conf, clusterLocations);
         this.atype = atype;
     }
 
@@ -48,33 +53,9 @@
     }
 
     @Override
-    public void configure(Map<String, String> arguments) throws Exception {
-        configuration = arguments;
-        configureHadoopAdapter();
-    }
-
-    private void configureHadoopAdapter() throws Exception {
-        String database = configuration.get(HIVE_DATABASE);
-        String tablePath = null;
-        if (database == null) {
-            tablePath = configuration.get(HIVE_WAREHOUSE_DIR) + "/" + configuration.get(HIVE_TABLE);
-        } else {
-            tablePath = configuration.get(HIVE_WAREHOUSE_DIR) + "/" + tablePath + ".db" + "/"
-                    + configuration.get(HIVE_TABLE);
-        }
-        configuration.put(HDFSAdapter.KEY_PATH, tablePath);
-        if (!configuration.get(KEY_FORMAT).equals(FORMAT_DELIMITED_TEXT)) {
-            throw new IllegalArgumentException("format" + configuration.get(KEY_FORMAT) + " is not supported");
-        }
-
-        if (!(configuration.get(HDFSAdapter.KEY_INPUT_FORMAT).equals(HDFSAdapter.INPUT_FORMAT_TEXT) || configuration
-                .get(HDFSAdapter.KEY_INPUT_FORMAT).equals(HDFSAdapter.INPUT_FORMAT_SEQUENCE))) {
-            throw new IllegalArgumentException("file input format" + configuration.get(HDFSAdapter.KEY_INPUT_FORMAT)
-                    + " is not supported");
-        }
-
-        hdfsAdapter = new HDFSAdapter(atype);
-        hdfsAdapter.configure(configuration);
+    public void configure(Map<String, Object> arguments) throws Exception {
+        this.configuration = arguments;
+        this.hdfsAdapter.configure(arguments);
     }
 
     @Override
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/IDatasourceAdapter.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/IDatasourceAdapter.java
index b0dc32f..031f34f 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/IDatasourceAdapter.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/IDatasourceAdapter.java
@@ -70,7 +70,7 @@
      * @return String the value corresponding to the configuration parameter
      *         represented by the key- attributeKey.
      */
-    public String getAdapterProperty(String propertyKey);
+    public Object getAdapterProperty(String propertyKey);
 
     /**
      * Configures the IDatasourceAdapter instance.
@@ -100,7 +100,7 @@
      *            providing all arguments as a set of (key,value) pairs. These
      *            arguments are put into the metadata.
      */
-    public void configure(Map<String, String> arguments) throws Exception;
+    public void configure(Map<String, Object> arguments) throws Exception;
 
     /**
      * Returns a list of partition constraints. A partition constraint can be a
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/NCFileSystemAdapter.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/NCFileSystemAdapter.java
index ef39d45..9abc92a 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/NCFileSystemAdapter.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/NCFileSystemAdapter.java
@@ -44,9 +44,9 @@
     }
 
     @Override
-    public void configure(Map<String, String> arguments) throws Exception {
+    public void configure(Map<String, Object> arguments) throws Exception {
         this.configuration = arguments;
-        String[] splits = arguments.get(KEY_PATH).split(",");
+        String[] splits = ((String) arguments.get(KEY_PATH)).split(",");
         configureFileSplits(splits);
         configureFormat();
     }
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedTwitterAdapter.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedTwitterAdapter.java
index ebfbcad..66d9f98 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedTwitterAdapter.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedTwitterAdapter.java
@@ -31,9 +31,8 @@
  */
 public class PullBasedTwitterAdapter extends PullBasedAdapter implements IManagedFeedAdapter {
 
-   
     private static final long serialVersionUID = 1L;
-    
+
     public static final String QUERY = "query";
     public static final String INTERVAL = "interval";
 
@@ -49,7 +48,7 @@
     }
 
     @Override
-    public void configure(Map<String, String> arguments) throws Exception {
+    public void configure(Map<String, Object> arguments) throws Exception {
         configuration = arguments;
         String[] fieldNames = { "id", "username", "location", "text", "timestamp" };
         IAType[] fieldTypes = { BuiltinType.ASTRING, BuiltinType.ASTRING, BuiltinType.ASTRING, BuiltinType.ASTRING,
@@ -69,12 +68,12 @@
     }
 
     @Override
-    public void stop()  {
+    public void stop() {
         tweetClient.stop();
     }
 
     @Override
-    public void alter(Map<String, String> properties)  {
+    public void alter(Map<String, String> properties) {
         alterRequested = true;
         this.alteredParams = properties;
     }
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedTwitterFeedClient.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedTwitterFeedClient.java
index 2a07472..06cddfd 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedTwitterFeedClient.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedTwitterFeedClient.java
@@ -63,8 +63,8 @@
         tupleFieldValues = new String[recordType.getFieldNames().length];
     }
 
-    public void initialize(Map<String, String> params) {
-        this.keywords = params.get(PullBasedTwitterAdapter.QUERY);
+    public void initialize(Map<String, Object> params) {
+        this.keywords = (String) params.get(PullBasedTwitterAdapter.QUERY);
         this.query = new Query(keywords);
         query.setRpp(100);
     }
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/RSSFeedAdapter.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/RSSFeedAdapter.java
index 611183c..ccd6516 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/RSSFeedAdapter.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/RSSFeedAdapter.java
@@ -72,9 +72,9 @@
     }
 
     @Override
-    public void configure(Map<String, String> arguments) throws Exception {
+    public void configure(Map<String, Object> arguments) throws Exception {
         configuration = arguments;
-        String rssURLProperty = configuration.get(KEY_RSS_URL);
+        String rssURLProperty = (String) configuration.get(KEY_RSS_URL);
         if (rssURLProperty == null) {
             throw new IllegalArgumentException("no rss url provided");
         }
@@ -94,7 +94,7 @@
     }
 
     protected void reconfigure(Map<String, String> arguments) {
-        String rssURLProperty = configuration.get(KEY_RSS_URL);
+        String rssURLProperty = (String) configuration.get(KEY_RSS_URL);
         if (rssURLProperty != null) {
             initializeFeedURLs(rssURLProperty);
         }
diff --git a/asterix-installer/src/main/java/edu/uci/ics/asterix/installer/driver/InstallerUtil.java b/asterix-installer/src/main/java/edu/uci/ics/asterix/installer/driver/InstallerUtil.java
index 2b884ef..abf0420 100644
--- a/asterix-installer/src/main/java/edu/uci/ics/asterix/installer/driver/InstallerUtil.java
+++ b/asterix-installer/src/main/java/edu/uci/ics/asterix/installer/driver/InstallerUtil.java
@@ -49,6 +49,7 @@
 
 import edu.uci.ics.asterix.common.configuration.AsterixConfiguration;
 import edu.uci.ics.asterix.common.configuration.Store;
+import edu.uci.ics.asterix.common.configuration.Coredump;
 import edu.uci.ics.asterix.event.driver.EventDriver;
 import edu.uci.ics.asterix.event.management.EventrixClient;
 import edu.uci.ics.asterix.event.management.EventUtil;
@@ -223,6 +224,14 @@
         }
         configuration.setStore(stores);
 
+        List<Coredump> coredump = new ArrayList<Coredump>();
+        String coredumpDir = null;
+        for (Node node : cluster.getNode()) {
+            coredumpDir = node.getLogDir() == null ? cluster.getLogDir() : node.getLogDir();
+            coredump.add(new Coredump(asterixInstanceName + "_" + node.getId(), coredumpDir));
+        }
+        configuration.setCoredump(coredump);
+
         File asterixConfDir = new File(InstallerDriver.getAsterixDir() + File.separator + asterixInstanceName);
         asterixConfDir.mkdirs();
 
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataManager.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataManager.java
index f9f5260..c33d130 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataManager.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataManager.java
@@ -43,38 +43,29 @@
  * received from the metadata node, to avoid contacting the metadata node
  * repeatedly. We assume that this metadata manager is the only metadata manager
  * in an Asterix cluster. Therefore, no separate cache-invalidation mechanism is
- * needed at this point.
- * Assumptions/Limitations:
- * The metadata subsystem is started during NC Bootstrap start, i.e., when
- * Asterix is deployed.
- * The metadata subsystem is destroyed in NC Bootstrap end, i.e., when Asterix
- * is undeployed.
- * The metadata subsystem consists of the MetadataManager and the MatadataNode.
- * The MetadataManager provides users access to the metadata.
- * The MetadataNode implements direct access to the storage layer on behalf of
- * the MetadataManager, and translates the binary representation of ADM into
- * Java objects for consumption by the MetadataManager's users.
- * There is exactly one instance of the MetadataManager and of the MetadataNode
- * in the cluster, which may or may not be co-located on the same machine (or in
- * the same JVM).
- * The MetadataManager exists in the same JVM as its user's (e.g., the query
- * compiler).
- * The MetadataNode exists in the same JVM as it's transactional components
- * (LockManager, LogManager, etc.)
- * Users shall access the metadata only through the MetadataManager, and never
- * via the MetadataNode directly.
+ * needed at this point. Assumptions/Limitations: The metadata subsystem is
+ * started during NC Bootstrap start, i.e., when Asterix is deployed. The
+ * metadata subsystem is destroyed in NC Bootstrap end, i.e., when Asterix is
+ * undeployed. The metadata subsystem consists of the MetadataManager and the
+ * MatadataNode. The MetadataManager provides users access to the metadata. The
+ * MetadataNode implements direct access to the storage layer on behalf of the
+ * MetadataManager, and translates the binary representation of ADM into Java
+ * objects for consumption by the MetadataManager's users. There is exactly one
+ * instance of the MetadataManager and of the MetadataNode in the cluster, which
+ * may or may not be co-located on the same machine (or in the same JVM). The
+ * MetadataManager exists in the same JVM as its user's (e.g., the query
+ * compiler). The MetadataNode exists in the same JVM as it's transactional
+ * components (LockManager, LogManager, etc.) Users shall access the metadata
+ * only through the MetadataManager, and never via the MetadataNode directly.
  * Multiple threads may issue requests to the MetadataManager concurrently. For
  * the sake of accessing metadata, we assume a transaction consists of one
- * thread.
- * Users are responsible for locking the metadata (using the MetadataManager
- * API) before issuing requests.
- * The MetadataNode is responsible for acquiring finer-grained locks on behalf
- * of requests from the MetadataManager. Currently, locks are acquired per
- * BTree, since the BTree does not acquire even finer-grained locks yet
- * internally.
- * The metadata can be queried with AQL DML like any other dataset, but can only
- * be changed with AQL DDL.
- * The transaction ids for metadata transactions must be unique across the
+ * thread. Users are responsible for locking the metadata (using the
+ * MetadataManager API) before issuing requests. The MetadataNode is responsible
+ * for acquiring finer-grained locks on behalf of requests from the
+ * MetadataManager. Currently, locks are acquired per BTree, since the BTree
+ * does not acquire even finer-grained locks yet internally. The metadata can be
+ * queried with AQL DML like any other dataset, but can only be changed with AQL
+ * DDL. The transaction ids for metadata transactions must be unique across the
  * cluster, i.e., metadata transaction ids shall never "accidentally" overlap
  * with transaction ids of regular jobs or other metadata transactions.
  */
@@ -220,7 +211,7 @@
 
     @Override
     public void addDataset(MetadataTransactionContext ctx, Dataset dataset) throws MetadataException {
-        // add dataset into metadataNode 
+        // add dataset into metadataNode
         try {
             metadataNode.addDataset(ctx.getJobId(), dataset);
         } catch (RemoteException e) {
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java
index 998eac4..3993719 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java
@@ -37,6 +37,7 @@
 import edu.uci.ics.asterix.common.transactions.IResourceManager.ResourceType;
 import edu.uci.ics.asterix.common.transactions.JobId;
 import edu.uci.ics.asterix.dataflow.data.nontagged.valueproviders.AqlPrimitiveValueProviderFactory;
+import edu.uci.ics.asterix.external.adapter.factory.HDFSAdapterFactory;
 import edu.uci.ics.asterix.external.adapter.factory.IAdapterFactory;
 import edu.uci.ics.asterix.external.adapter.factory.IGenericDatasetAdapterFactory;
 import edu.uci.ics.asterix.external.adapter.factory.ITypedDatasetAdapterFactory;
@@ -105,6 +106,7 @@
 import edu.uci.ics.hyracks.algebricks.runtime.base.IPushRuntimeFactory;
 import edu.uci.ics.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
 import edu.uci.ics.hyracks.algebricks.runtime.operators.std.SinkWriterRuntimeFactory;
+import edu.uci.ics.hyracks.api.context.ICCContext;
 import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
 import edu.uci.ics.hyracks.api.dataflow.value.ILinearizeComparatorFactory;
@@ -123,6 +125,7 @@
 import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
 import edu.uci.ics.hyracks.dataflow.std.file.ITupleParserFactory;
 import edu.uci.ics.hyracks.dataflow.std.result.ResultWriterOperatorDescriptor;
+import edu.uci.ics.hyracks.hdfs.scheduler.Scheduler;
 import edu.uci.ics.hyracks.storage.am.btree.dataflow.BTreeSearchOperatorDescriptor;
 import edu.uci.ics.hyracks.storage.am.btree.frames.BTreeNSMInteriorFrameFactory;
 import edu.uci.ics.hyracks.storage.am.common.api.IPrimitiveValueProviderFactory;
@@ -159,6 +162,7 @@
     private final AsterixStorageProperties storageProperties;
 
     private static final Map<String, String> adapterFactoryMapping = initializeAdapterFactoryMapping();
+    private static Scheduler hdfsScheduler;
 
     public String getPropertyValue(String propertyName) {
         return config.get(propertyName);
@@ -180,6 +184,16 @@
         this.defaultDataverse = defaultDataverse;
         this.stores = AsterixAppContextInfo.getInstance().getMetadataProperties().getStores();
         this.storageProperties = AsterixAppContextInfo.getInstance().getStorageProperties();
+        ICCContext ccContext = AsterixAppContextInfo.getInstance().getCCApplicationContext().getCCContext();
+        try {
+            if (hdfsScheduler == null) {
+                //set the singleton hdfs scheduler
+                hdfsScheduler = new Scheduler(ccContext.getClusterControllerInfo().getClientNetAddress(), ccContext
+                        .getClusterControllerInfo().getClientNetPort());
+            }
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
     }
 
     public void setJobId(JobId jobId) {
@@ -345,8 +359,8 @@
                 adapterFactory = (IGenericDatasetAdapterFactory) Class.forName(adapterFactoryClassname).newInstance();
             }
 
-            adapter = ((IGenericDatasetAdapterFactory) adapterFactory).createAdapter(datasetDetails.getProperties(),
-                    itemType);
+            adapter = ((IGenericDatasetAdapterFactory) adapterFactory).createAdapter(
+                    wrapProperties(datasetDetails.getProperties()), itemType);
         } catch (AlgebricksException ae) {
             throw ae;
         } catch (Exception e) {
@@ -364,7 +378,7 @@
         RecordDescriptor scannerDesc = new RecordDescriptor(new ISerializerDeserializer[] { payloadSerde });
 
         ExternalDataScanOperatorDescriptor dataScanner = new ExternalDataScanOperatorDescriptor(jobSpec,
-                adapterFactoryClassname, datasetDetails.getProperties(), rt, scannerDesc);
+                wrapPropertiesEmpty(datasetDetails.getProperties()), rt, scannerDesc, adapterFactory);
 
         AlgebricksPartitionConstraint constraint;
         try {
@@ -429,14 +443,15 @@
             }
 
             if (adapterFactory instanceof ITypedDatasetAdapterFactory) {
-                adapter = ((ITypedDatasetAdapterFactory) adapterFactory).createAdapter(datasetDetails.getProperties());
+                adapter = ((ITypedDatasetAdapterFactory) adapterFactory).createAdapter(wrapProperties(datasetDetails
+                        .getProperties()));
                 adapterOutputType = ((ITypedDatasourceAdapter) adapter).getAdapterOutputType();
             } else if (adapterFactory instanceof IGenericDatasetAdapterFactory) {
                 String outputTypeName = datasetDetails.getProperties().get(IGenericDatasetAdapterFactory.KEY_TYPE_NAME);
                 adapterOutputType = MetadataManager.INSTANCE.getDatatype(mdTxnCtx, dataset.getDataverseName(),
                         outputTypeName).getDatatype();
                 adapter = ((IGenericDatasetAdapterFactory) adapterFactory).createAdapter(
-                        datasetDetails.getProperties(), adapterOutputType);
+                        wrapProperties(datasetDetails.getProperties()), adapterOutputType);
             } else {
                 throw new IllegalStateException(" Unknown factory type for " + adapterFactoryClassname);
             }
@@ -453,7 +468,8 @@
 
         FeedIntakeOperatorDescriptor feedIngestor = new FeedIntakeOperatorDescriptor(jobSpec, new FeedId(
                 dataset.getDataverseName(), dataset.getDatasetName()), adapterFactoryClassname,
-                datasetDetails.getProperties(), (ARecordType) adapterOutputType, feedDesc);
+                this.wrapPropertiesEmpty(datasetDetails.getProperties()), (ARecordType) adapterOutputType, feedDesc,
+                adapterFactory);
 
         AlgebricksPartitionConstraint constraint = null;
         try {
@@ -1479,4 +1495,32 @@
         return FormatUtils.getDefaultFormat();
     }
 
+    /**
+     * Add HDFS scheduler and the cluster location constraint into the scheduler
+     * 
+     * @param properties
+     *            the original dataset properties
+     * @return a new map containing the original dataset properties and the scheduler/locations
+     */
+    private Map<String, Object> wrapProperties(Map<String, String> properties) {
+        Map<String, Object> wrappedProperties = new HashMap<String, Object>();
+        wrappedProperties.putAll(properties);
+        wrappedProperties.put(HDFSAdapterFactory.SCHEDULER, hdfsScheduler);
+        wrappedProperties.put(HDFSAdapterFactory.CLUSTER_LOCATIONS, getClusterLocations());
+        return wrappedProperties;
+    }
+
+    /**
+     * Adapt the original properties to a string-object map
+     * 
+     * @param properties
+     *            the original properties
+     * @return the new stirng-object map
+     */
+    private Map<String, Object> wrapPropertiesEmpty(Map<String, String> properties) {
+        Map<String, Object> wrappedProperties = new HashMap<String, Object>();
+        wrappedProperties.putAll(properties);
+        return wrappedProperties;
+    }
+
 }
diff --git a/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/RateControlledFileSystemBasedAdapter.java b/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/RateControlledFileSystemBasedAdapter.java
index 84b989d..afdf343 100644
--- a/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/RateControlledFileSystemBasedAdapter.java
+++ b/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/RateControlledFileSystemBasedAdapter.java
@@ -19,7 +19,6 @@
 import java.util.Map;
 
 import edu.uci.ics.asterix.common.exceptions.AsterixException;
-import edu.uci.ics.asterix.external.adapter.factory.IGenericDatasetAdapterFactory;
 import edu.uci.ics.asterix.external.dataset.adapter.FileSystemBasedAdapter;
 import edu.uci.ics.asterix.external.dataset.adapter.ITypedDatasourceAdapter;
 import edu.uci.ics.asterix.feed.managed.adapter.IManagedFeedAdapter;
@@ -47,48 +46,15 @@
         IManagedFeedAdapter {
 
     private static final long serialVersionUID = 1L;
+    private FileSystemBasedAdapter coreAdapter;
+    private String format;
 
-    public static final String KEY_FILE_SYSTEM = "fs";
-    public static final String LOCAL_FS = "localfs";
-    public static final String HDFS = "hdfs";
-
-    private final FileSystemBasedAdapter coreAdapter;
-    private final Map<String, String> configuration;
-    private final String fileSystem;
-    private final String format;
-
-    public RateControlledFileSystemBasedAdapter(ARecordType atype, Map<String, String> configuration) throws Exception {
+    public RateControlledFileSystemBasedAdapter(ARecordType atype, Map<String, Object> configuration,
+            FileSystemBasedAdapter coreAdapter, String format) throws Exception {
         super(atype);
-        checkRequiredArgs(configuration);
-        fileSystem = configuration.get(KEY_FILE_SYSTEM);
-        String adapterFactoryClass = null;
-        if (fileSystem.equalsIgnoreCase(LOCAL_FS)) {
-            adapterFactoryClass = "edu.uci.ics.asterix.external.adapter.factory.NCFileSystemAdapterFactory";
-        } else if (fileSystem.equals(HDFS)) {
-            adapterFactoryClass = "edu.uci.ics.asterix.external.adapter.factory.HDFSAdapterFactory";
-        } else {
-            throw new AsterixException("Unsupported file system type " + fileSystem);
-        }
-        format = configuration.get(KEY_FORMAT);
-        IGenericDatasetAdapterFactory adapterFactory = (IGenericDatasetAdapterFactory) Class.forName(
-                adapterFactoryClass).newInstance();
-        coreAdapter = (FileSystemBasedAdapter) adapterFactory.createAdapter(configuration, atype);
         this.configuration = configuration;
-    }
-
-    private void checkRequiredArgs(Map<String, String> configuration) throws Exception {
-        if (configuration.get(KEY_FILE_SYSTEM) == null) {
-            throw new Exception("File system type not specified. (fs=?) File system could be 'localfs' or 'hdfs'");
-        }
-        if (configuration.get(IGenericDatasetAdapterFactory.KEY_TYPE_NAME) == null) {
-            throw new Exception("Record type not specified (output-type-name=?)");
-        }
-        if (configuration.get(KEY_PATH) == null) {
-            throw new Exception("File path not specified (path=?)");
-        }
-        if (configuration.get(KEY_FORMAT) == null) {
-            throw new Exception("File format not specified (format=?)");
-        }
+        this.coreAdapter = coreAdapter;
+        this.format = format;
     }
 
     @Override
@@ -103,7 +69,7 @@
     }
 
     @Override
-    public void configure(Map<String, String> arguments) throws Exception {
+    public void configure(Map<String, Object> arguments) throws Exception {
         coreAdapter.configure(arguments);
     }
 
@@ -189,16 +155,16 @@
 
     private final ARecordType recordType;
     private final IDataParser dataParser;
-    private final Map<String, String> configuration;
+    private final Map<String, Object> configuration;
 
     public RateControlledTupleParserFactory(ARecordType recordType, IValueParserFactory[] valueParserFactories,
-            char fieldDelimiter, Map<String, String> configuration) {
+            char fieldDelimiter, Map<String, Object> configuration) {
         this.recordType = recordType;
         dataParser = new DelimitedDataParser(recordType, valueParserFactories, fieldDelimiter);
         this.configuration = configuration;
     }
 
-    public RateControlledTupleParserFactory(ARecordType recordType, Map<String, String> configuration) {
+    public RateControlledTupleParserFactory(ARecordType recordType, Map<String, Object> configuration) {
         this.recordType = recordType;
         dataParser = new ADMDataParser();
         this.configuration = configuration;
@@ -221,10 +187,10 @@
     public static final String INTER_TUPLE_INTERVAL = "tuple-interval";
 
     public RateControlledTupleParser(IHyracksTaskContext ctx, ARecordType recType, IDataParser dataParser,
-            Map<String, String> configuration) {
+            Map<String, Object> configuration) {
         super(ctx, recType);
         this.dataParser = dataParser;
-        String propValue = configuration.get(INTER_TUPLE_INTERVAL);
+        String propValue = (String) configuration.get(INTER_TUPLE_INTERVAL);
         if (propValue != null) {
             interTupleInterval = Long.parseLong(propValue);
         } else {
diff --git a/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/RateControlledFileSystemBasedAdapterFactory.java b/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/RateControlledFileSystemBasedAdapterFactory.java
index 6c32acb..bf1c268 100644
--- a/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/RateControlledFileSystemBasedAdapterFactory.java
+++ b/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/RateControlledFileSystemBasedAdapterFactory.java
@@ -16,7 +16,9 @@
 
 import java.util.Map;
 
+import edu.uci.ics.asterix.common.exceptions.AsterixException;
 import edu.uci.ics.asterix.external.adapter.factory.IGenericDatasetAdapterFactory;
+import edu.uci.ics.asterix.external.dataset.adapter.FileSystemBasedAdapter;
 import edu.uci.ics.asterix.external.dataset.adapter.IDatasourceAdapter;
 import edu.uci.ics.asterix.om.types.ARecordType;
 import edu.uci.ics.asterix.om.types.IAType;
@@ -28,10 +30,37 @@
  * source file has been ingested.
  */
 public class RateControlledFileSystemBasedAdapterFactory implements IGenericDatasetAdapterFactory {
+    private static final long serialVersionUID = 1L;
+    
+    public static final String KEY_FILE_SYSTEM = "fs";
+    public static final String LOCAL_FS = "localfs";
+    public static final String HDFS = "hdfs";
+    public static final String KEY_PATH = "path";
+    public static final String KEY_FORMAT = "format";
+
+    private IGenericDatasetAdapterFactory adapterFactory;
+    private String format;
+    private boolean setup = false;
 
     @Override
-    public IDatasourceAdapter createAdapter(Map<String, String> configuration, IAType type) throws Exception {
-        return new RateControlledFileSystemBasedAdapter((ARecordType) type, configuration);
+    public IDatasourceAdapter createAdapter(Map<String, Object> configuration, IAType type) throws Exception {
+        if (!setup) {
+            checkRequiredArgs(configuration);
+            String fileSystem = (String) configuration.get(KEY_FILE_SYSTEM);
+            String adapterFactoryClass = null;
+            if (fileSystem.equalsIgnoreCase(LOCAL_FS)) {
+                adapterFactoryClass = "edu.uci.ics.asterix.external.adapter.factory.NCFileSystemAdapterFactory";
+            } else if (fileSystem.equals(HDFS)) {
+                adapterFactoryClass = "edu.uci.ics.asterix.external.adapter.factory.HDFSAdapterFactory";
+            } else {
+                throw new AsterixException("Unsupported file system type " + fileSystem);
+            }
+            format = (String) configuration.get(KEY_FORMAT);
+            adapterFactory = (IGenericDatasetAdapterFactory) Class.forName(adapterFactoryClass).newInstance();
+            setup = true;
+        }
+        return new RateControlledFileSystemBasedAdapter((ARecordType) type, configuration,
+                (FileSystemBasedAdapter) adapterFactory.createAdapter(configuration, type), format);
     }
 
     @Override
@@ -39,4 +68,19 @@
         return "file_feed";
     }
 
+    private void checkRequiredArgs(Map<String, Object> configuration) throws Exception {
+        if (configuration.get(KEY_FILE_SYSTEM) == null) {
+            throw new Exception("File system type not specified. (fs=?) File system could be 'localfs' or 'hdfs'");
+        }
+        if (configuration.get(IGenericDatasetAdapterFactory.KEY_TYPE_NAME) == null) {
+            throw new Exception("Record type not specified (output-type-name=?)");
+        }
+        if (configuration.get(KEY_PATH) == null) {
+            throw new Exception("File path not specified (path=?)");
+        }
+        if (configuration.get(KEY_FORMAT) == null) {
+            throw new Exception("File format not specified (format=?)");
+        }
+    }
+
 }
\ No newline at end of file
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/DatasetLockInfo.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/DatasetLockInfo.java
index d5e525a..d583352 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/DatasetLockInfo.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/DatasetLockInfo.java
@@ -437,6 +437,20 @@
 
         return s.toString();
     }
+    
+    public String coreDump() {
+        StringBuilder sb = new StringBuilder();
+        sb.append("\n\t firstUpgrader: " + firstUpgrader);
+        sb.append("\n\t firstWaiter: " + firstWaiter);
+        sb.append("\n\t lastHolder: " + lastHolder);
+        sb.append("\n\t ISCount: " + ISCount);
+        sb.append("\n\t IXCount: " + IXCount);
+        sb.append("\n\t SCount: " + SCount);
+        sb.append("\n\t XCount: " + XCount);
+        sb.append("\n\t entityResourceHT");
+        sb.append(entityResourceHT.prettyPrint());
+        return sb.toString();
+    }
 
     /////////////////////////////////////////////////////////
     //  set/get method for private variable
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/EntityInfoManager.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/EntityInfoManager.java
index 5d81e8a..206a3bf 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/EntityInfoManager.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/EntityInfoManager.java
@@ -15,6 +15,8 @@
 
 package edu.uci.ics.asterix.transaction.management.service.locking;
 
+import java.io.IOException;
+import java.io.OutputStream;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 
@@ -272,29 +274,35 @@
             if (child.isDeinitialized()) {
                 continue;
             }
-            s.append("child[" + i + "]: occupiedSlots:" + child.getNumOfOccupiedSlots());
-            s.append(" freeSlotNum:" + child.getFreeSlotNum() + "\n");
-            s.append("\tjid\t").append("did\t").append("PK\t").append("DLM\t").append("DLC\t").append("ELM\t")
-                    .append("ELC\t").append("NEA\t").append("PJR\t").append("NJR\n");
-            for (int j = 0; j < ChildEntityInfoArrayManager.NUM_OF_SLOTS; j++) {
-                s.append(j).append(": ");
-                s.append("\t" + child.getJobId(j));
-                s.append("\t" + child.getDatasetId(j));
-                s.append("\t" + child.getPKHashVal(j));
-                s.append("\t" + child.getDatasetLockMode(j));
-                s.append("\t" + child.getDatasetLockCount(j));
-                s.append("\t" + child.getEntityLockMode(j));
-                s.append("\t" + child.getEntityLockCount(j));
-                s.append("\t" + child.getNextEntityActor(j));
-                s.append("\t" + child.getPrevJobResource(j));
-                s.append("\t" + child.getNextJobResource(j));
-                //s.append("\t" + child.getNextDatasetActor(j));
-                s.append("\n");
-            }
-            s.append("\n");
+            s.append("child[" + i + "]");
+            s.append(child.prettyPrint());
         }
         return s.toString();
     }
+    
+    public void coreDump(OutputStream os) {
+        StringBuilder sb = new StringBuilder("\n\t########### EntityLockInfoManager Status #############\n");
+        int size = pArray.size();
+        ChildEntityInfoArrayManager child;
+
+        sb.append("Number of Child: " + size + "\n"); 
+        for (int i = 0; i < size; i++) {
+            try {
+                child = pArray.get(i);
+                sb.append("child[" + i + "]");
+                sb.append(child.prettyPrint());
+                
+                os.write(sb.toString().getBytes());
+            } catch (IOException e) {
+                //ignore IOException
+            }
+            sb = new StringBuilder();
+        }
+    }
+    
+    public int getShrinkTimerThreshold() {
+        return SHRINK_TIMER_THRESHOLD;
+    }
 
     public void initEntityInfo(int slotNum, int jobId, int datasetId, int PKHashVal, byte lockMode) {
         pArray.get(slotNum / ChildEntityInfoArrayManager.NUM_OF_SLOTS).initEntityInfo(
@@ -567,6 +575,29 @@
     public int getFreeSlotNum() {
         return freeSlotNum;
     }
+    
+    public String prettyPrint() {
+        StringBuilder sb = new StringBuilder();
+        sb.append("\n\toccupiedSlots:" + getNumOfOccupiedSlots());
+        sb.append("\n\tfreeSlotNum:" + getFreeSlotNum() + "\n");
+        sb.append("\tjid\t").append("did\t").append("PK\t").append("DLM\t").append("DLC\t").append("ELM\t")
+                .append("ELC\t").append("NEA\t").append("PJR\t").append("NJR\n");
+        for (int j = 0; j < ChildEntityInfoArrayManager.NUM_OF_SLOTS; j++) {
+            sb.append(j).append(": ");
+            sb.append("\t" + getJobId(j));
+            sb.append("\t" + getDatasetId(j));
+            sb.append("\t" + getPKHashVal(j));
+            sb.append("\t" + getDatasetLockMode(j));
+            sb.append("\t" + getDatasetLockCount(j));
+            sb.append("\t" + getEntityLockMode(j));
+            sb.append("\t" + getEntityLockCount(j));
+            sb.append("\t" + getNextEntityActor(j));
+            sb.append("\t" + getPrevJobResource(j));
+            sb.append("\t" + getNextJobResource(j));
+            sb.append("\n");
+        }
+        return sb.toString();
+    }
 
     //////////////////////////////////////////////////////////////////
     //   set/get method for each field of EntityInfo plus freeSlot
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/EntityLockInfoManager.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/EntityLockInfoManager.java
index ca00aa2..2fae460 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/EntityLockInfoManager.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/EntityLockInfoManager.java
@@ -15,6 +15,8 @@
 
 package edu.uci.ics.asterix.transaction.management.service.locking;
 
+import java.io.IOException;
+import java.io.OutputStream;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 
@@ -271,22 +273,35 @@
             if (child.isDeinitialized()) {
                 continue;
             }
-            s.append("child[" + i + "]: occupiedSlots:" + child.getNumOfOccupiedSlots());
-            s.append(" freeSlotNum:" + child.getFreeSlotNum() + "\n");
-            s.append("\tX\t").append("S\t").append("LH\t").append("FW\t").append("UP\n");
-            for (int j = 0; j < ChildEntityLockInfoArrayManager.NUM_OF_SLOTS; j++) {
-                s.append(j).append(": ");
-                s.append("\t" + child.getXCount(j));
-                s.append("\t" + child.getSCount(j));
-                s.append("\t" + child.getLastHolder(j));
-                s.append("\t" + child.getFirstWaiter(j));
-                s.append("\t" + child.getUpgrader(j));
-                s.append("\n");
-            }
-            s.append("\n");
+            s.append("child[" + i + "]");
+            s.append(child.prettyPrint());
         }
         return s.toString();
     }
+    
+    public void coreDump(OutputStream os) {
+        StringBuilder sb = new StringBuilder("\n\t########### EntityLockInfoManager Status #############\n");
+        int size = pArray.size();
+        ChildEntityLockInfoArrayManager child;
+
+        sb.append("Number of Child: " + size + "\n"); 
+        for (int i = 0; i < size; i++) {
+            try {
+                child = pArray.get(i);
+                sb.append("child[" + i + "]");
+                sb.append(child.prettyPrint());
+                
+                os.write(sb.toString().getBytes());
+            } catch (IOException e) {
+                //ignore IOException
+            }
+            sb = new StringBuilder();
+        }
+    }
+    
+    public int getShrinkTimerThreshold() {
+        return SHRINK_TIMER_THRESHOLD;
+    }
 
     //debugging method
     public String printWaiters(int slotNum) {
@@ -736,6 +751,23 @@
     public int getFreeSlotNum() {
         return freeSlotNum;
     }
+    
+    public String prettyPrint() {
+        StringBuilder sb = new StringBuilder();
+        sb.append("\n\toccupiedSlots:" + getNumOfOccupiedSlots());
+        sb.append("\n\tfreeSlotNum:" + getFreeSlotNum());
+        sb.append("\n\tX\t").append("S\t").append("LH\t").append("FW\t").append("UP\n");
+        for (int j = 0; j < ChildEntityLockInfoArrayManager.NUM_OF_SLOTS; j++) {
+            sb.append(j).append(": ");
+            sb.append("\t" + getXCount(j));
+            sb.append("\t" + getSCount(j));
+            sb.append("\t" + getLastHolder(j));
+            sb.append("\t" + getFirstWaiter(j));
+            sb.append("\t" + getUpgrader(j));
+            sb.append("\n");
+        }
+        return sb.toString();
+    }
 
     //////////////////////////////////////////////////////////////////
     //   set/get method for each field of EntityLockInfo plus freeSlot
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/JobInfo.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/JobInfo.java
index d846603..8bf5304 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/JobInfo.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/JobInfo.java
@@ -275,6 +275,17 @@
         }
         return s.toString();
     }
+    
+    public String coreDump() {
+        StringBuilder sb = new StringBuilder();
+        sb.append("\n\t datasetISLockHT");
+        sb.append(datasetISLockHT.prettyPrint());
+        sb.append("\n\t firstWaitingResource: " + firstWaitingResource);
+        sb.append("\n\t lastHoldingResource: " + lastHoldingResource);
+        sb.append("\n\t upgradingResource: " + upgradingResource);
+        sb.append("\n\t jobCtx.jobId: " + jobCtx.getJobId());
+        return sb.toString();
+    }
 
     /////////////////////////////////////////////////////////
     //  set/get method for private variable
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/LockManager.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/LockManager.java
index dc6f111..96e95d9 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/LockManager.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/LockManager.java
@@ -15,10 +15,13 @@
 
 package edu.uci.ics.asterix.transaction.management.service.locking;
 
+import java.io.IOException;
+import java.io.OutputStream;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.Set;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
@@ -33,6 +36,7 @@
 import edu.uci.ics.asterix.transaction.management.service.logging.LogUtil;
 import edu.uci.ics.asterix.transaction.management.service.transaction.TransactionManagementConstants.LockManagerConstants.LockMode;
 import edu.uci.ics.asterix.transaction.management.service.transaction.TransactionSubsystem;
+import edu.uci.ics.hyracks.api.lifecycle.ILifeCycleComponent;
 
 /**
  * An implementation of the ILockManager interface for the
@@ -42,7 +46,7 @@
  * @author pouria, kisskys
  */
 
-public class LockManager implements ILockManager {
+public class LockManager implements ILockManager, ILifeCycleComponent {
 
     public static final boolean IS_DEBUG_MODE = false;//true
     //This variable indicates that the dataset granule X lock request is allowed when 
@@ -2050,6 +2054,182 @@
             unlatchLockTable();
         }
     }
+
+    @Override
+    public void start() {
+        //no op
+    }
+
+    @Override
+    public void stop(boolean dumpState, OutputStream os) {
+        if (dumpState) {
+
+            //#. dump Configurable Variables
+            dumpConfVars(os);
+
+            //#. dump jobHT
+            dumpJobInfo(os);
+
+            //#. dump datasetResourceHT
+            dumpDatasetLockInfo(os);
+
+            //#. dump entityLockInfoManager
+            dumpEntityLockInfo(os);
+
+            //#. dump entityInfoManager
+            dumpEntityInfo(os);
+
+            //#. dump lockWaiterManager
+
+            dumpLockWaiterInfo(os);
+            try {
+                os.flush();
+            } catch (IOException e) {
+                //ignore
+            }
+        }
+    }
+
+    private void dumpConfVars(OutputStream os) {
+        try {
+            StringBuilder sb = new StringBuilder();
+            sb.append("\n>>dump_begin\t>>----- [ConfVars] -----");
+            sb.append("\nESCALATE_TRHESHOLD_ENTITY_TO_DATASET: " + ESCALATE_TRHESHOLD_ENTITY_TO_DATASET);
+            sb.append("\nSHRINK_TIMER_THRESHOLD (entityLockInfoManager): "
+                    + entityLockInfoManager.getShrinkTimerThreshold());
+            sb.append("\nSHRINK_TIMER_THRESHOLD (entityInfoManager): " + entityInfoManager.getShrinkTimerThreshold());
+            sb.append("\nSHRINK_TIMER_THRESHOLD (lockWaiterManager): " + lockWaiterManager.getShrinkTimerThreshold());
+            sb.append("\n>>dump_end\t>>----- [ConfVars] -----\n");
+            os.write(sb.toString().getBytes());
+        } catch (Exception e) {
+            //ignore exception and continue dumping as much as possible.
+            if (IS_DEBUG_MODE) {
+                e.printStackTrace();
+            }
+        }
+    }
+
+    private void dumpJobInfo(OutputStream os) {
+        JobId jobId;
+        JobInfo jobInfo;
+        StringBuilder sb = new StringBuilder();
+
+        try {
+            sb.append("\n>>dump_begin\t>>----- [JobInfo] -----");
+            Set<Map.Entry<JobId, JobInfo>> entrySet = jobHT.entrySet();
+            if (entrySet != null) {
+                for (Map.Entry<JobId, JobInfo> entry : entrySet) {
+                    if (entry != null) {
+                        jobId = entry.getKey();
+                        if (jobId != null) {
+                            sb.append("\n" + jobId);
+                        } else {
+                            sb.append("\nJID:null");
+                        }
+
+                        jobInfo = entry.getValue();
+                        if (jobInfo != null) {
+                            sb.append(jobInfo.coreDump());
+                        } else {
+                            sb.append("\nJobInfo:null");
+                        }
+                    }
+                }
+            }
+            sb.append("\n>>dump_end\t>>----- [JobInfo] -----\n");
+            os.write(sb.toString().getBytes());
+        } catch (Exception e) {
+            //ignore exception and continue dumping as much as possible.
+            if (IS_DEBUG_MODE) {
+                e.printStackTrace();
+            }
+        }
+    }
+
+    private void dumpDatasetLockInfo(OutputStream os) {
+        DatasetId datasetId;
+        DatasetLockInfo datasetLockInfo;
+        StringBuilder sb = new StringBuilder();
+
+        try {
+            sb.append("\n>>dump_begin\t>>----- [DatasetLockInfo] -----");
+            Set<Map.Entry<DatasetId, DatasetLockInfo>> entrySet = datasetResourceHT.entrySet();
+            if (entrySet != null) {
+                for (Map.Entry<DatasetId, DatasetLockInfo> entry : entrySet) {
+                    if (entry != null) {
+                        datasetId = entry.getKey();
+                        if (datasetId != null) {
+                            sb.append("\nDatasetId:" + datasetId.getId());
+                        } else {
+                            sb.append("\nDatasetId:null");
+                        }
+
+                        datasetLockInfo = entry.getValue();
+                        if (datasetLockInfo != null) {
+                            sb.append(datasetLockInfo.coreDump());
+                        } else {
+                            sb.append("\nDatasetLockInfo:null");
+                        }
+                    }
+                    sb.append("\n>>dump_end\t>>----- [DatasetLockInfo] -----\n");
+                    os.write(sb.toString().getBytes());
+
+                    //create a new sb to avoid possible OOM exception
+                    sb = new StringBuilder();
+                }
+            }
+        } catch (Exception e) {
+            //ignore exception and continue dumping as much as possible.
+            if (IS_DEBUG_MODE) {
+                e.printStackTrace();
+            }
+        }
+    }
+
+    private void dumpEntityLockInfo(OutputStream os) {
+        StringBuilder sb = new StringBuilder();
+        try {
+            sb.append("\n>>dump_begin\t>>----- [EntityLockInfo] -----");
+            entityLockInfoManager.coreDump(os);
+            sb.append("\n>>dump_end\t>>----- [EntityLockInfo] -----\n");
+            os.write(sb.toString().getBytes());
+        } catch (Exception e) {
+            //ignore exception and continue dumping as much as possible.
+            if (IS_DEBUG_MODE) {
+                e.printStackTrace();
+            }
+        }
+    }
+
+    private void dumpEntityInfo(OutputStream os) {
+        StringBuilder sb = new StringBuilder();
+        try {
+            sb.append("\n>>dump_begin\t>>----- [EntityInfo] -----");
+            entityInfoManager.coreDump(os);
+            sb.append("\n>>dump_end\t>>----- [EntityInfo] -----\n");
+            os.write(sb.toString().getBytes());
+        } catch (Exception e) {
+            //ignore exception and continue dumping as much as possible.
+            if (IS_DEBUG_MODE) {
+                e.printStackTrace();
+            }
+        }
+    }
+
+    private void dumpLockWaiterInfo(OutputStream os) {
+        StringBuilder sb = new StringBuilder();
+        try {
+            sb.append("\n>>dump_begin\t>>----- [LockWaiterInfo] -----");
+            lockWaiterManager.coreDump(os);
+            sb.append("\n>>dump_end\t>>----- [LockWaiterInfo] -----\n");
+            os.write(sb.toString().getBytes());
+        } catch (Exception e) {
+            //ignore exception and continue dumping as much as possible.
+            if (IS_DEBUG_MODE) {
+                e.printStackTrace();
+            }
+        }
+    }
 }
 
 class ConsecutiveWakeupContext {
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/LockWaiterManager.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/LockWaiterManager.java
index bd414de..c6fcbd5 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/LockWaiterManager.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/LockWaiterManager.java
@@ -15,6 +15,8 @@
 
 package edu.uci.ics.asterix.transaction.management.service.locking;
 
+import java.io.IOException;
+import java.io.OutputStream;
 import java.util.ArrayList;
 
 /**
@@ -258,28 +260,42 @@
         StringBuilder s = new StringBuilder("\n########### LockWaiterManager Status #############\n");
         int size = pArray.size();
         ChildLockWaiterArrayManager child;
-        LockWaiter waiter;
 
         for (int i = 0; i < size; i++) {
             child = pArray.get(i);
             if (child.isDeinitialized()) {
                 continue;
             }
-            s.append("child[" + i + "]: occupiedSlots:" + child.getNumOfOccupiedSlots());
-            s.append(" freeSlotNum:" + child.getFreeSlotNum() + "\n");
-            for (int j = 0; j < ChildLockWaiterArrayManager.NUM_OF_SLOTS; j++) {
-                waiter = child.getLockWaiter(j);
-                s.append(j).append(": ");
-                s.append("\t" + waiter.getEntityInfoSlot());
-                s.append("\t" + waiter.needWait());
-                s.append("\t" + waiter.isVictim());
-                s.append("\n");
-            }
-            s.append("\n");
+            s.append("child[" + i + "]");
+            s.append(child.prettyPrint());
         }
         return s.toString();
     }
     
+    public void coreDump(OutputStream os) {
+        StringBuilder sb = new StringBuilder("\n########### LockWaiterManager Status #############\n");
+        int size = pArray.size();
+        ChildLockWaiterArrayManager child;
+
+        sb.append("Number of Child: " + size + "\n"); 
+        for (int i = 0; i < size; i++) {
+            try {
+                child = pArray.get(i);
+                sb.append("child[" + i + "]");
+                sb.append(child.prettyPrint());
+                
+                os.write(sb.toString().getBytes());
+            } catch (IOException e) {
+                //ignore IOException
+            }
+            sb = new StringBuilder();
+        }
+    }
+    
+    public int getShrinkTimerThreshold() {
+        return SHRINK_TIMER_THRESHOLD;
+    }
+    
     public LockWaiter getLockWaiter(int slotNum) {
         return pArray.get(slotNum / ChildLockWaiterArrayManager.NUM_OF_SLOTS).getLockWaiter(
                 slotNum % ChildLockWaiterArrayManager.NUM_OF_SLOTS);
@@ -364,4 +380,20 @@
     public int getFreeSlotNum() {
         return freeSlotNum;
     }
+    
+    public String prettyPrint() {
+        LockWaiter waiter;
+        StringBuilder sb = new StringBuilder();
+        sb.append("\n\toccupiedSlots:" + getNumOfOccupiedSlots());
+        sb.append("\n\tfreeSlotNum:" + getFreeSlotNum() + "\n");
+        for (int j = 0; j < ChildLockWaiterArrayManager.NUM_OF_SLOTS; j++) {
+            waiter = getLockWaiter(j);
+            sb.append(j).append(": ");
+            sb.append("\t" + waiter.getEntityInfoSlot());
+            sb.append("\t" + waiter.needWait());
+            sb.append("\t" + waiter.isVictim());
+            sb.append("\n");
+        }
+        return sb.toString();
+    }
 }
\ No newline at end of file
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/TimeOutDetector.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/TimeOutDetector.java
index a53c890..05052f0 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/TimeOutDetector.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/TimeOutDetector.java
@@ -2,6 +2,7 @@
 
 import java.util.LinkedList;
 
+import edu.uci.ics.asterix.common.api.AsterixThreadExecutor;
 import edu.uci.ics.asterix.common.exceptions.ACIDException;
 
 /**
@@ -26,7 +27,7 @@
         this.lockMgr = lockMgr;
         this.trigger = new Thread(new TimeoutTrigger(this));
         trigger.setDaemon(true);
-        trigger.start();
+        AsterixThreadExecutor.INSTANCE.execute(trigger);
     }
 
     public void sweep() throws ACIDException {
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogManager.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogManager.java
index b51a767..d4f874b 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogManager.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogManager.java
@@ -17,6 +17,7 @@
 import java.io.File;
 import java.io.IOException;
 import java.io.InputStream;
+import java.io.OutputStream;
 import java.io.RandomAccessFile;
 import java.nio.ByteBuffer;
 import java.nio.channels.FileChannel;
@@ -32,6 +33,7 @@
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
+import edu.uci.ics.asterix.common.api.AsterixThreadExecutor;
 import edu.uci.ics.asterix.common.exceptions.ACIDException;
 import edu.uci.ics.asterix.common.transactions.FileBasedBuffer;
 import edu.uci.ics.asterix.common.transactions.FileUtil;
@@ -49,8 +51,9 @@
 import edu.uci.ics.asterix.transaction.management.service.transaction.TransactionManagementConstants;
 import edu.uci.ics.asterix.transaction.management.service.transaction.TransactionSubsystem;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.lifecycle.ILifeCycleComponent;
 
-public class LogManager implements ILogManager {
+public class LogManager implements ILogManager, ILifeCycleComponent {
 
     public static final boolean IS_DEBUG_MODE = false;//true
     private static final Logger LOGGER = Logger.getLogger(LogManager.class.getName());
@@ -185,7 +188,7 @@
          */
         logPageFlusher = new LogPageFlushThread(this);
         logPageFlusher.setDaemon(true);
-        logPageFlusher.start();
+        AsterixThreadExecutor.INSTANCE.execute(logPageFlusher);
     }
 
     public int getLogPageIndex(long lsnValue) {
@@ -765,6 +768,60 @@
 
         map.clear();
     }
+
+    @Override
+    public void start() {
+        //no op
+    }
+
+    @Override
+    public void stop(boolean dumpState, OutputStream os) {
+        if (dumpState) {
+            //#. dump Configurable Variables
+            dumpConfVars(os);
+
+            //#. dump LSNInfo
+            dumpLSNInfo(os);
+
+            try {
+                os.flush();
+            } catch (IOException e) {
+                //ignore
+            }
+        }
+    }
+
+    private void dumpConfVars(OutputStream os) {
+        try {
+            StringBuilder sb = new StringBuilder();
+            sb.append("\n>>dump_begin\t>>----- [ConfVars] -----");
+            sb.append(logManagerProperties.toString());
+            sb.append("\n>>dump_end\t>>----- [ConfVars] -----\n");
+            os.write(sb.toString().getBytes());
+        } catch (Exception e) {
+            //ignore exception and continue dumping as much as possible.
+            if (IS_DEBUG_MODE) {
+                e.printStackTrace();
+            }
+        }
+    }
+
+    private void dumpLSNInfo(OutputStream os) {
+        try {
+            StringBuilder sb = new StringBuilder();
+            sb.append("\n>>dump_begin\t>>----- [LSNInfo] -----");
+            sb.append("\nstartingLSN: " + startingLSN);
+            sb.append("\ncurrentLSN: " + lsn.get());
+            sb.append("\nlastFlushedLSN: " + lastFlushedLSN.get());
+            sb.append("\n>>dump_end\t>>----- [LSNInfo] -----\n");
+            os.write(sb.toString().getBytes());
+        } catch (Exception e) {
+            //ignore exception and continue dumping as much as possible.
+            if (IS_DEBUG_MODE) {
+                e.printStackTrace();
+            }
+        }
+    }
 }
 
 /*
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/recovery/RecoveryManager.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/recovery/RecoveryManager.java
index b262fab..ee27061 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/recovery/RecoveryManager.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/recovery/RecoveryManager.java
@@ -22,6 +22,7 @@
 import java.io.IOException;
 import java.io.ObjectInputStream;
 import java.io.ObjectOutputStream;
+import java.io.OutputStream;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
@@ -57,6 +58,7 @@
 import edu.uci.ics.asterix.transaction.management.service.transaction.TransactionManager;
 import edu.uci.ics.asterix.transaction.management.service.transaction.TransactionSubsystem;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.lifecycle.ILifeCycleComponent;
 import edu.uci.ics.hyracks.storage.am.btree.impls.BTree;
 import edu.uci.ics.hyracks.storage.am.common.api.IIndex;
 import edu.uci.ics.hyracks.storage.am.common.api.IIndexLifecycleManager;
@@ -83,7 +85,7 @@
  * not in place completely. Once we have physical logging implemented, we would
  * add support for crash recovery.
  */
-public class RecoveryManager implements IRecoveryManager {
+public class RecoveryManager implements IRecoveryManager, ILifeCycleComponent {
 
     public static final boolean IS_DEBUG_MODE = false;//true
     private static final Logger LOGGER = Logger.getLogger(RecoveryManager.class.getName());
@@ -782,6 +784,16 @@
                     + undoCount);
         }
     }
+
+    @Override
+    public void start() {
+        //no op
+    }
+
+    @Override
+    public void stop(boolean dumpState, OutputStream os) {
+        //no op
+    }
 }
 
 class TxnId {
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionContext.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionContext.java
index d53286a..05ee899 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionContext.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionContext.java
@@ -202,4 +202,15 @@
         exlusiveJobLevelCommit = true;
     }
 
+    public String prettyPrint() {
+        StringBuilder sb = new StringBuilder();
+        sb.append("\n" + jobId + "\n");
+        sb.append("transactionType: " + transactionType);
+        sb.append("firstLogLocator: " + firstLogLocator.getLsn() + "\n");
+        sb.append("lastLogLocator: " + lastLogLocator.getLsn() + "\n");
+        sb.append("TransactionState: " + txnState + "\n");
+        sb.append("startWaitTime: " + startWaitTime + "\n");
+        sb.append("status: " + status + "\n");
+        return sb.toString();
+    }
 }
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionManager.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionManager.java
index c956367..09fa028 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionManager.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionManager.java
@@ -14,8 +14,11 @@
  */
 package edu.uci.ics.asterix.transaction.management.service.transaction;
 
+import java.io.IOException;
+import java.io.OutputStream;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.logging.Level;
 import java.util.logging.Logger;
@@ -27,15 +30,18 @@
 import edu.uci.ics.asterix.common.transactions.JobId;
 import edu.uci.ics.asterix.transaction.management.service.logging.LogType;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.lifecycle.ILifeCycleComponent;
 
 /**
  * An implementation of the @see ITransactionManager interface that provides
  * implementation of APIs for governing the lifecycle of a transaction.
  */
-public class TransactionManager implements ITransactionManager {
+public class TransactionManager implements ITransactionManager, ILifeCycleComponent {
+
+    public static final boolean IS_DEBUG_MODE = false;//true
     private static final Logger LOGGER = Logger.getLogger(TransactionManager.class.getName());
     private final TransactionSubsystem transactionProvider;
-    private Map<JobId, ITransactionContext> ITransactionContextRepository = new HashMap<JobId, ITransactionContext>();
+    private Map<JobId, ITransactionContext> transactionContextRepository = new HashMap<JobId, ITransactionContext>();
     private AtomicInteger maxJobId = new AtomicInteger(0);
 
     public TransactionManager(TransactionSubsystem provider) {
@@ -62,7 +68,7 @@
             } finally {
                 txnContext.releaseResources();
                 transactionProvider.getLockManager().releaseLocks(txnContext);
-                ITransactionContextRepository.remove(txnContext.getJobId());
+                transactionContextRepository.remove(txnContext.getJobId());
                 txnContext.setTxnState(TransactionState.ABORTED);
             }
         }
@@ -73,7 +79,7 @@
         setMaxJobId(jobId.getId());
         ITransactionContext txnContext = new TransactionContext(jobId, transactionProvider);
         synchronized (this) {
-            ITransactionContextRepository.put(jobId, txnContext);
+            transactionContextRepository.put(jobId, txnContext);
         }
         return txnContext;
     }
@@ -81,13 +87,13 @@
     @Override
     public ITransactionContext getTransactionContext(JobId jobId) throws ACIDException {
         setMaxJobId(jobId.getId());
-        synchronized (ITransactionContextRepository) {
+        synchronized (transactionContextRepository) {
 
-            ITransactionContext context = ITransactionContextRepository.get(jobId);
+            ITransactionContext context = transactionContextRepository.get(jobId);
             if (context == null) {
-                context = ITransactionContextRepository.get(jobId);
+                context = transactionContextRepository.get(jobId);
                 context = new TransactionContext(jobId, transactionProvider);
-                ITransactionContextRepository.put(jobId, context);
+                transactionContextRepository.put(jobId, context);
             }
             return context;
         }
@@ -106,7 +112,8 @@
 
             //for entity-level commit
             if (PKHashVal != -1) {
-                boolean countIsZero = transactionProvider.getLockManager().unlock(datasetId, PKHashVal, txnContext, true);
+                boolean countIsZero = transactionProvider.getLockManager().unlock(datasetId, PKHashVal, txnContext,
+                        true);
                 if (!countIsZero) {
                     // Lock count != 0 for a particular entity implies that the entity has been locked 
                     // more than once (probably due to a hash collision in our current model).
@@ -139,7 +146,7 @@
             } finally {
                 txnContext.releaseResources();
                 transactionProvider.getLockManager().releaseLocks(txnContext); // release
-                ITransactionContextRepository.remove(txnContext.getJobId());
+                transactionContextRepository.remove(txnContext.getJobId());
                 txnContext.setTxnState(TransactionState.COMMITTED);
             }
         }
@@ -167,4 +174,61 @@
     public int getMaxJobId() {
         return maxJobId.get();
     }
+
+    @Override
+    public void start() {
+        //no op
+    }
+
+    @Override
+    public void stop(boolean dumpState, OutputStream os) {
+        if (dumpState) {
+            //#. dump TxnContext
+            dumpTxnContext(os);
+
+            try {
+                os.flush();
+            } catch (IOException e) {
+                //ignore
+            }
+        }
+    }
+
+    private void dumpTxnContext(OutputStream os) {
+        JobId jobId;
+        ITransactionContext txnCtx;
+        StringBuilder sb = new StringBuilder();
+
+        try {
+            sb.append("\n>>dump_begin\t>>----- [ConfVars] -----");
+            Set<Map.Entry<JobId, ITransactionContext>> entrySet = transactionContextRepository.entrySet();
+            if (entrySet != null) {
+                for (Map.Entry<JobId, ITransactionContext> entry : entrySet) {
+                    if (entry != null) {
+                        jobId = entry.getKey();
+                        if (jobId != null) {
+                            sb.append("\n" + jobId);
+                        } else {
+                            sb.append("\nJID:null");
+                        }
+
+                        txnCtx = entry.getValue();
+                        if (txnCtx != null) {
+                            sb.append(txnCtx.prettyPrint());
+                        } else {
+                            sb.append("\nTxnCtx:null");
+                        }
+                    }
+                }
+            }
+
+            sb.append("\n>>dump_end\t>>----- [ConfVars] -----\n");
+            os.write(sb.toString().getBytes());
+        } catch (Exception e) {
+            //ignore exception and continue dumping as much as possible.
+            if (IS_DEBUG_MODE) {
+                e.printStackTrace();
+            }
+        }
+    }
 }