[ASTERIXDB-2008][CLUS] Only add pending removal if node known

[ASTERIXDB-2023][ING] Introduce Enums instead of using bytes

- user model changes: no
- storage format changes: no
- interface changes: no

details:
- Only nodes which are known to cluster manager are added
to the list of nodes pending removal. Other nodes are ignored
- Enums introduced:
  - ActiveEvent.Kind
  - ActivePartitionMessage.Event
- Remove AdapterRuntimeManager
- Remove AdapterExecutor

Change-Id: I7044896559798426c04a3f46861bc5335b25d140
Reviewed-on: https://asterix-gerrit.ics.uci.edu/1921
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
diff --git a/asterixdb/asterix-active/pom.xml b/asterixdb/asterix-active/pom.xml
index 3dd24b6..6568795 100644
--- a/asterixdb/asterix-active/pom.xml
+++ b/asterixdb/asterix-active/pom.xml
@@ -31,10 +31,6 @@
       <version>${project.version}</version>
     </dependency>
     <dependency>
-      <groupId>log4j</groupId>
-      <artifactId>log4j</artifactId>
-    </dependency>
-    <dependency>
       <groupId>org.apache.hyracks</groupId>
       <artifactId>hyracks-api</artifactId>
     </dependency>
diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveManager.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveManager.java
index fcf2be9..c0717b9 100644
--- a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveManager.java
+++ b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveManager.java
@@ -23,14 +23,16 @@
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
+import java.util.logging.Level;
+import java.util.logging.Logger;
 
 import org.apache.asterix.active.message.ActiveManagerMessage;
 import org.apache.asterix.active.message.ActiveStatsResponse;
 import org.apache.asterix.active.message.StatsRequestMessage;
-import org.apache.asterix.common.api.ThreadExecutor;
 import org.apache.asterix.common.exceptions.ErrorCode;
 import org.apache.asterix.common.exceptions.RuntimeDataException;
 import org.apache.asterix.common.memory.ConcurrentFramePool;
@@ -38,21 +40,20 @@
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.util.JavaSerializationUtils;
 import org.apache.hyracks.control.nc.NodeControllerService;
-import org.apache.log4j.Logger;
 
 public class ActiveManager {
 
     private static final Logger LOGGER = Logger.getLogger(ActiveManager.class.getName());
     private static final int SHUTDOWN_TIMEOUT_SECS = 60;
 
-    private final ThreadExecutor executor;
+    private final ExecutorService executor;
     private final ConcurrentMap<ActiveRuntimeId, IActiveRuntime> runtimes;
     private final ConcurrentFramePool activeFramePool;
     private final String nodeId;
     private final INCServiceContext serviceCtx;
     private volatile boolean shutdown;
 
-    public ActiveManager(ThreadExecutor executor, String nodeId, long activeMemoryBudget, int frameSize,
+    public ActiveManager(ExecutorService executor, String nodeId, long activeMemoryBudget, int frameSize,
             INCServiceContext serviceCtx) throws HyracksDataException {
         this.executor = executor;
         this.nodeId = nodeId;
@@ -86,15 +87,16 @@
     }
 
     public void submit(ActiveManagerMessage message) throws HyracksDataException {
+        LOGGER.log(Level.INFO, "Message of type " + message.getKind() + " received in " + nodeId);
         switch (message.getKind()) {
-            case ActiveManagerMessage.STOP_ACTIVITY:
+            case STOP_ACTIVITY:
                 stopRuntime(message);
                 break;
-            case ActiveManagerMessage.REQUEST_STATS:
+            case REQUEST_STATS:
                 requestStats((StatsRequestMessage) message);
                 break;
             default:
-                LOGGER.warn("Unknown message type received: " + message.getKind());
+                LOGGER.warning("Unknown message type received: " + message.getKind());
         }
     }
 
@@ -104,7 +106,7 @@
             IActiveRuntime runtime = runtimes.get(runtimeId);
             long reqId = message.getReqId();
             if (runtime == null) {
-                LOGGER.warn("Request stats of a runtime that is not registered " + runtimeId);
+                LOGGER.warning("Request stats of a runtime that is not registered " + runtimeId);
                 // Send a failure message
                 ((NodeControllerService) serviceCtx.getControllerService())
                         .sendApplicationMessageToCC(
@@ -124,7 +126,7 @@
     }
 
     public void shutdown() {
-        LOGGER.warn("Shutting down ActiveManager on node " + nodeId);
+        LOGGER.warning("Shutting down ActiveManager on node " + nodeId);
         Map<ActiveRuntimeId, Future<Void>> stopFutures = new HashMap<>();
         shutdown = true;
         runtimes.forEach((runtimeId, runtime) -> stopFutures.put(runtimeId, executor.submit(() -> {
@@ -136,29 +138,29 @@
             try {
                 entry.getValue().get(SHUTDOWN_TIMEOUT_SECS, TimeUnit.SECONDS);
             } catch (InterruptedException e) {
-                LOGGER.warn("Interrupted waiting to stop runtime: " + entry.getKey());
+                LOGGER.warning("Interrupted waiting to stop runtime: " + entry.getKey());
                 Thread.currentThread().interrupt();
             } catch (ExecutionException e) {
-                LOGGER.warn("Exception while stopping runtime: " + entry.getKey(), e);
+                LOGGER.log(Level.WARNING, "Exception while stopping runtime: " + entry.getKey(), e);
             } catch (TimeoutException e) {
-                LOGGER.warn("Timed out waiting to stop runtime: " + entry.getKey(), e);
+                LOGGER.log(Level.WARNING, "Timed out waiting to stop runtime: " + entry.getKey(), e);
             }
         });
-        LOGGER.warn("Shutdown ActiveManager on node " + nodeId + " complete");
+        LOGGER.warning("Shutdown ActiveManager on node " + nodeId + " complete");
     }
 
     private void stopRuntime(ActiveManagerMessage message) {
         ActiveRuntimeId runtimeId = (ActiveRuntimeId) message.getPayload();
         IActiveRuntime runtime = runtimes.get(runtimeId);
         if (runtime == null) {
-            LOGGER.warn("Request to stop a runtime that is not registered " + runtimeId);
+            LOGGER.warning("Request to stop a runtime that is not registered " + runtimeId);
         } else {
             executor.execute(() -> {
                 try {
                     stopIfRunning(runtimeId, runtime);
                 } catch (Exception e) {
                     // TODO(till) Figure out a better way to handle failure to stop a runtime
-                    LOGGER.warn("Failed to stop runtime: " + runtimeId, e);
+                    LOGGER.log(Level.WARNING, "Failed to stop runtime: " + runtimeId, e);
                 }
             });
         }
diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveSourceOperatorNodePushable.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveSourceOperatorNodePushable.java
index a7d7796..27ecb52 100644
--- a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveSourceOperatorNodePushable.java
+++ b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveSourceOperatorNodePushable.java
@@ -22,6 +22,7 @@
 import java.util.logging.Logger;
 
 import org.apache.asterix.active.message.ActivePartitionMessage;
+import org.apache.asterix.active.message.ActivePartitionMessage.Event;
 import org.apache.asterix.common.api.INcApplicationContext;
 import org.apache.hyracks.api.comm.IFrameWriter;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
@@ -36,6 +37,7 @@
     protected final IHyracksTaskContext ctx;
     protected final ActiveManager activeManager;
     /** A unique identifier for the runtime **/
+    protected Thread taskThread;
     protected final ActiveRuntimeId runtimeId;
     private volatile boolean done = false;
 
@@ -85,11 +87,12 @@
     @Override
     public final void initialize() throws HyracksDataException {
         LOGGER.log(Level.INFO, "initialize() called on ActiveSourceOperatorNodePushable");
+        taskThread = Thread.currentThread();
         activeManager.registerRuntime(this);
         try {
             // notify cc that runtime has been registered
             ctx.sendApplicationMessageToCC(new ActivePartitionMessage(runtimeId, ctx.getJobletContext().getJobId(),
-                    ActivePartitionMessage.ACTIVE_RUNTIME_REGISTERED, null), null);
+                    Event.RUNTIME_REGISTERED, null), null);
             start();
         } catch (InterruptedException e) {
             LOGGER.log(Level.INFO, "initialize() interrupted on ActiveSourceOperatorNodePushable", e);
@@ -112,7 +115,7 @@
         activeManager.deregisterRuntime(runtimeId);
         try {
             ctx.sendApplicationMessageToCC(new ActivePartitionMessage(runtimeId, ctx.getJobletContext().getJobId(),
-                    ActivePartitionMessage.ACTIVE_RUNTIME_DEREGISTERED, null), null);
+                    Event.RUNTIME_DEREGISTERED, null), null);
         } catch (Exception e) {
             LOGGER.log(Level.INFO, "deinitialize() failed on ActiveSourceOperatorNodePushable", e);
             throw HyracksDataException.create(e);
diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/SingleThreadEventProcessor.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/SingleThreadEventProcessor.java
index 0a36216..de6682d 100644
--- a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/SingleThreadEventProcessor.java
+++ b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/SingleThreadEventProcessor.java
@@ -18,15 +18,12 @@
  */
 package org.apache.asterix.active;
 
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
 import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.TimeUnit;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
 import org.apache.asterix.common.exceptions.ErrorCode;
+import org.apache.asterix.common.exceptions.RuntimeDataException;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 
 public abstract class SingleThreadEventProcessor<T> implements Runnable {
@@ -34,20 +31,20 @@
     private static final Logger LOGGER = Logger.getLogger(SingleThreadEventProcessor.class.getName());
     private final String name;
     private final LinkedBlockingQueue<T> eventInbox;
-    private final ExecutorService executorService;
-    private final Future<?> future;
+    private volatile Thread executorThread;
+    private volatile boolean stopped = false;
 
     public SingleThreadEventProcessor(String threadName) {
         this.name = threadName;
         eventInbox = new LinkedBlockingQueue<>();
-        executorService = Executors.newSingleThreadExecutor(r -> new Thread(r, threadName));
-        future = executorService.submit(this);
+        executorThread = new Thread(this, threadName);
+        executorThread.start();
     }
 
     @Override
     public final void run() {
         LOGGER.log(Level.INFO, "Started " + Thread.currentThread().getName());
-        while (!Thread.currentThread().isInterrupted()) {
+        while (!stopped) {
             try {
                 T event = eventInbox.take();
                 handle(event);
@@ -69,10 +66,19 @@
     }
 
     public void stop() throws HyracksDataException, InterruptedException {
-        future.cancel(true);
-        executorService.shutdown();
-        if (!executorService.awaitTermination(10, TimeUnit.SECONDS)) {
-            throw HyracksDataException.create(ErrorCode.FAILED_TO_SHUTDOWN_EVENT_PROCESSOR, name);
+        stopped = true;
+        executorThread.interrupt();
+        executorThread.join(1000);
+        int attempt = 0;
+        while (executorThread.isAlive()) {
+            attempt++;
+            LOGGER.log(Level.WARNING,
+                    "Failed to stop event processor after " + attempt + " attempts. Interrupted exception swallowed?");
+            if (attempt == 10) {
+                throw new RuntimeDataException(ErrorCode.FAILED_TO_SHUTDOWN_EVENT_PROCESSOR, name);
+            }
+            executorThread.interrupt();
+            executorThread.join(1000);
         }
     }
 }
diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActiveManagerMessage.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActiveManagerMessage.java
index 9772698..bef418b 100644
--- a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActiveManagerMessage.java
+++ b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActiveManagerMessage.java
@@ -26,14 +26,16 @@
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 
 public class ActiveManagerMessage implements INcAddressedMessage {
-    public static final byte STOP_ACTIVITY = 0x00;
-    public static final byte REQUEST_STATS = 0x01;
+    public enum Kind {
+        STOP_ACTIVITY,
+        REQUEST_STATS
+    }
 
     private static final long serialVersionUID = 1L;
-    private final byte kind;
+    private final Kind kind;
     private final Serializable payload;
 
-    public ActiveManagerMessage(byte kind, Serializable payload) {
+    public ActiveManagerMessage(Kind kind, Serializable payload) {
         this.kind = kind;
         this.payload = payload;
     }
@@ -42,7 +44,7 @@
         return payload;
     }
 
-    public byte getKind() {
+    public Kind getKind() {
         return kind;
     }
 
diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActivePartitionMessage.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActivePartitionMessage.java
index a47d5a5..9ace417 100644
--- a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActivePartitionMessage.java
+++ b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActivePartitionMessage.java
@@ -29,17 +29,19 @@
 import org.apache.hyracks.api.job.JobId;
 
 public class ActivePartitionMessage implements ICcAddressedMessage {
+    public enum Event {
+        RUNTIME_REGISTERED,
+        RUNTIME_DEREGISTERED,
+        GENERIC_EVENT
+    }
 
     private static final long serialVersionUID = 1L;
-    public static final byte ACTIVE_RUNTIME_REGISTERED = 0x00;
-    public static final byte ACTIVE_RUNTIME_DEREGISTERED = 0x01;
-    public static final byte GENERIC_EVENT = 0x02;
     private final ActiveRuntimeId activeRuntimeId;
     private final JobId jobId;
     private final Serializable payload;
-    private final byte event;
+    private final Event event;
 
-    public ActivePartitionMessage(ActiveRuntimeId activeRuntimeId, JobId jobId, byte event, Serializable payload) {
+    public ActivePartitionMessage(ActiveRuntimeId activeRuntimeId, JobId jobId, Event event, Serializable payload) {
         this.activeRuntimeId = activeRuntimeId;
         this.jobId = jobId;
         this.event = event;
@@ -58,7 +60,7 @@
         return payload;
     }
 
-    public byte getEvent() {
+    public Event getEvent() {
         return event;
     }
 
diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/StatsRequestMessage.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/StatsRequestMessage.java
index 8fa5f19..d43f00e 100644
--- a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/StatsRequestMessage.java
+++ b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/StatsRequestMessage.java
@@ -24,8 +24,8 @@
     private static final long serialVersionUID = 1L;
     private final long reqId;
 
-    public StatsRequestMessage(byte kind, Serializable payload, long reqId) {
-        super(kind, payload);
+    public StatsRequestMessage(Serializable payload, long reqId) {
+        super(Kind.REQUEST_STATS, payload);
         this.reqId = reqId;
     }
 
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/AppRuntimeContextProviderForRecovery.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/AppRuntimeContextProviderForRecovery.java
index 1fea840..18ef143 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/AppRuntimeContextProviderForRecovery.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/AppRuntimeContextProviderForRecovery.java
@@ -18,10 +18,11 @@
  */
 package org.apache.asterix.api.common;
 
+import java.util.concurrent.ExecutorService;
+
 import org.apache.asterix.app.nc.NCAppRuntimeContext;
 import org.apache.asterix.common.api.IDatasetLifecycleManager;
 import org.apache.asterix.common.api.INcApplicationContext;
-import org.apache.asterix.common.api.ThreadExecutor;
 import org.apache.asterix.common.transactions.IAppRuntimeContextProvider;
 import org.apache.asterix.common.transactions.ITransactionSubsystem;
 import org.apache.hyracks.api.io.IIOManager;
@@ -84,7 +85,7 @@
     }
 
     @Override
-    public ThreadExecutor getThreadExecutor() {
+    public ExecutorService getThreadExecutor() {
         return asterixAppRuntimeContext.getThreadExecutor();
     }
 }
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java
index 995e372..acb1614 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java
@@ -38,8 +38,8 @@
 import org.apache.asterix.active.IRetryPolicy;
 import org.apache.asterix.active.IRetryPolicyFactory;
 import org.apache.asterix.active.NoRetryPolicyFactory;
-import org.apache.asterix.active.message.ActiveManagerMessage;
 import org.apache.asterix.active.message.ActivePartitionMessage;
+import org.apache.asterix.active.message.ActivePartitionMessage.Event;
 import org.apache.asterix.active.message.StatsRequestMessage;
 import org.apache.asterix.common.api.IClusterManagementWork.ClusterState;
 import org.apache.asterix.common.api.IMetadataLockManager;
@@ -68,6 +68,7 @@
 public abstract class ActiveEntityEventsListener implements IActiveEntityController {
 
     private static final Logger LOGGER = Logger.getLogger(ActiveEntityEventsListener.class.getName());
+    private static final Level level = Level.INFO;
     private static final ActiveEvent STATE_CHANGED = new ActiveEvent(null, Kind.STATE_CHANGED, null, null);
     private static final EnumSet<ActivityState> TRANSITION_STATES = EnumSet.of(ActivityState.RESUMING,
             ActivityState.STARTING, ActivityState.STOPPING, ActivityState.RECOVERING);
@@ -130,7 +131,7 @@
     }
 
     protected synchronized void setState(ActivityState newState) {
-        LOGGER.log(Level.FINE, "State is being set to " + newState + " from " + state);
+        LOGGER.log(level, "State of " + getEntityId() + "is being set to " + newState + " from " + state);
         this.prevState = state;
         this.state = newState;
         if (newState == ActivityState.SUSPENDED) {
@@ -142,7 +143,7 @@
     @Override
     public synchronized void notify(ActiveEvent event) {
         try {
-            LOGGER.fine("EventListener is notified.");
+            LOGGER.log(level, "EventListener is notified.");
             ActiveEvent.Kind eventKind = event.getEventKind();
             switch (eventKind) {
                 case JOB_CREATED:
@@ -172,22 +173,24 @@
     }
 
     protected synchronized void handle(ActivePartitionMessage message) {
-        if (message.getEvent() == ActivePartitionMessage.ACTIVE_RUNTIME_REGISTERED) {
+        if (message.getEvent() == Event.RUNTIME_REGISTERED) {
             numRegistered++;
             if (numRegistered == locations.getLocations().length) {
                 setState(ActivityState.RUNNING);
             }
-        } else if (message.getEvent() == ActivePartitionMessage.ACTIVE_RUNTIME_DEREGISTERED) {
+        } else if (message.getEvent() == Event.RUNTIME_DEREGISTERED) {
             numRegistered--;
         }
     }
 
     @SuppressWarnings("unchecked")
     protected void finish(ActiveEvent event) throws HyracksDataException {
+        LOGGER.log(level, "the job " + jobId + " finished");
         jobId = null;
         Pair<JobStatus, List<Exception>> status = (Pair<JobStatus, List<Exception>>) event.getEventObject();
         JobStatus jobStatus = status.getLeft();
         List<Exception> exceptions = status.getRight();
+        LOGGER.log(level, "The job finished with status: " + jobStatus);
         if (jobStatus.equals(JobStatus.FAILURE)) {
             jobFailure = exceptions.isEmpty() ? new RuntimeDataException(ErrorCode.UNREPORTED_TASK_FAILURE_EXCEPTION)
                     : exceptions.get(0);
@@ -271,10 +274,10 @@
     @SuppressWarnings("unchecked")
     @Override
     public void refreshStats(long timeout) throws HyracksDataException {
-        LOGGER.log(Level.FINE, "refreshStats called");
+        LOGGER.log(level, "refreshStats called");
         synchronized (this) {
             if (state != ActivityState.RUNNING || isFetchingStats) {
-                LOGGER.log(Level.FINE,
+                LOGGER.log(level,
                         "returning immediately since state = " + state + " and fetchingStats = " + isFetchingStats);
                 return;
             } else {
@@ -287,8 +290,7 @@
         List<INcAddressedMessage> requests = new ArrayList<>();
         List<String> ncs = Arrays.asList(locations.getLocations());
         for (int i = 0; i < ncs.size(); i++) {
-            requests.add(new StatsRequestMessage(ActiveManagerMessage.REQUEST_STATS,
-                    new ActiveRuntimeId(entityId, runtimeName, i), reqId));
+            requests.add(new StatsRequestMessage(new ActiveRuntimeId(entityId, runtimeName, i), reqId));
         }
         try {
             List<String> responses = (List<String>) messageBroker.sendSyncRequestToNCs(reqId, ncs, requests, timeout);
@@ -348,32 +350,32 @@
 
     @Override
     public synchronized void recover() throws HyracksDataException {
-        LOGGER.log(Level.FINE, "Recover is called on " + entityId);
+        LOGGER.log(level, "Recover is called on " + entityId);
         if (recoveryTask != null) {
-            LOGGER.log(Level.FINE, "But recovery task for " + entityId + " is already there!! throwing an exception");
+            LOGGER.log(level, "But recovery task for " + entityId + " is already there!! throwing an exception");
             throw new RuntimeDataException(ErrorCode.DOUBLE_RECOVERY_ATTEMPTS);
         }
         if (retryPolicyFactory == NoRetryPolicyFactory.INSTANCE) {
-            LOGGER.log(Level.FINE, "But it has no recovery policy, so it is set to permanent failure");
+            LOGGER.log(level, "But it has no recovery policy, so it is set to permanent failure");
             setState(ActivityState.PERMANENTLY_FAILED);
         } else {
             ExecutorService executor = appCtx.getServiceContext().getControllerService().getExecutor();
             IRetryPolicy policy = retryPolicyFactory.create(this);
             cancelRecovery = false;
             setState(ActivityState.TEMPORARILY_FAILED);
-            LOGGER.log(Level.FINE, "Recovery task has been submitted");
+            LOGGER.log(level, "Recovery task has been submitted");
             recoveryTask = executor.submit(() -> doRecover(policy));
         }
     }
 
     protected Void doRecover(IRetryPolicy policy)
             throws AlgebricksException, HyracksDataException, InterruptedException {
-        LOGGER.log(Level.FINE, "Actual Recovery task has started");
+        LOGGER.log(level, "Actual Recovery task has started");
         if (getState() != ActivityState.TEMPORARILY_FAILED) {
-            LOGGER.log(Level.FINE, "but its state is not temp failure and so we're just returning");
+            LOGGER.log(level, "but its state is not temp failure and so we're just returning");
             return null;
         }
-        LOGGER.log(Level.FINE, "calling the policy");
+        LOGGER.log(level, "calling the policy");
         while (policy.retry()) {
             synchronized (this) {
                 if (cancelRecovery) {
@@ -402,7 +404,7 @@
                     doStart(metadataProvider);
                     return null;
                 } catch (Exception e) {
-                    LOGGER.log(Level.WARNING, "Attempt to revive " + entityId + " failed", e);
+                    LOGGER.log(level, "Attempt to revive " + entityId + " failed", e);
                     setState(ActivityState.TEMPORARILY_FAILED);
                     recoverFailure = e;
                 } finally {
@@ -515,10 +517,10 @@
         WaitForStateSubscriber subscriber;
         Future<Void> suspendTask;
         synchronized (this) {
-            LOGGER.log(Level.FINE, "suspending entity " + entityId);
-            LOGGER.log(Level.FINE, "Waiting for ongoing activities");
+            LOGGER.log(level, "suspending entity " + entityId);
+            LOGGER.log(level, "Waiting for ongoing activities");
             waitForNonTransitionState();
-            LOGGER.log(Level.FINE, "Proceeding with suspension. Current state is " + state);
+            LOGGER.log(level, "Proceeding with suspension. Current state is " + state);
             if (state == ActivityState.STOPPED || state == ActivityState.PERMANENTLY_FAILED) {
                 suspended = true;
                 return;
@@ -536,12 +538,12 @@
                     EnumSet.of(ActivityState.SUSPENDED, ActivityState.TEMPORARILY_FAILED));
             suspendTask = metadataProvider.getApplicationContext().getServiceContext().getControllerService()
                     .getExecutor().submit(() -> doSuspend(metadataProvider));
-            LOGGER.log(Level.FINE, "Suspension task has been submitted");
+            LOGGER.log(level, "Suspension task has been submitted");
         }
         try {
-            LOGGER.log(Level.FINE, "Waiting for suspension task to complete");
+            LOGGER.log(level, "Waiting for suspension task to complete");
             suspendTask.get();
-            LOGGER.log(Level.FINE, "waiting for state to become SUSPENDED or TEMPORARILY_FAILED");
+            LOGGER.log(level, "waiting for state to become SUSPENDED or TEMPORARILY_FAILED");
             subscriber.sync();
         } catch (Exception e) {
             synchronized (this) {
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveNotificationHandler.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveNotificationHandler.java
index c5e5dbb..d36d9b7 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveNotificationHandler.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveNotificationHandler.java
@@ -53,7 +53,7 @@
         implements IActiveNotificationHandler, IJobLifecycleListener {
 
     private static final Logger LOGGER = Logger.getLogger(ActiveNotificationHandler.class.getName());
-    private static final boolean DEBUG = false;
+    private static final Level level = Level.INFO;
     public static final String ACTIVE_ENTITY_PROPERTY_NAME = "ActiveJob";
     private final Map<EntityId, IActiveEntityEventsListener> entityEventListeners;
     private final Map<JobId, EntityId> jobId2EntityId;
@@ -73,13 +73,13 @@
         EntityId entityId = jobId2EntityId.get(event.getJobId());
         if (entityId != null) {
             IActiveEntityEventsListener listener = entityEventListeners.get(entityId);
-            LOGGER.log(Level.FINE, "Next event is of type " + event.getEventKind());
+            LOGGER.log(level, "Next event is of type " + event.getEventKind());
             if (event.getEventKind() == Kind.JOB_FINISHED) {
-                LOGGER.log(Level.FINE, "Removing the job");
+                LOGGER.log(level, "Removing the job");
                 jobId2EntityId.remove(event.getJobId());
             }
             if (listener != null) {
-                LOGGER.log(Level.FINE, "Notifying the listener");
+                LOGGER.log(level, "Notifying the listener");
                 listener.notify(event);
             }
         } else {
@@ -91,34 +91,30 @@
 
     @Override
     public void notifyJobCreation(JobId jobId, JobSpecification jobSpecification) throws HyracksDataException {
-        LOGGER.log(Level.FINE,
+        LOGGER.log(level,
                 "notifyJobCreation(JobId jobId, JobSpecification jobSpecification) was called with jobId = " + jobId);
         Object property = jobSpecification.getProperty(ACTIVE_ENTITY_PROPERTY_NAME);
         if (property == null || !(property instanceof EntityId)) {
-            LOGGER.log(Level.FINE, "Job is not of type active job. property found to be: " + property);
+            LOGGER.log(level, "Job is not of type active job. property found to be: " + property);
             return;
         }
         EntityId entityId = (EntityId) property;
         monitorJob(jobId, entityId);
         boolean found = jobId2EntityId.get(jobId) != null;
-        LOGGER.log(Level.FINE, "Job was found to be: " + (found ? "Active" : "Inactive"));
+        LOGGER.log(level, "Job was found to be: " + (found ? "Active" : "Inactive"));
         add(new ActiveEvent(jobId, Kind.JOB_CREATED, entityId, jobSpecification));
     }
 
     private synchronized void monitorJob(JobId jobId, EntityId entityId) {
-        if (DEBUG) {
-            LOGGER.log(Level.WARNING, "monitorJob(JobId jobId, ActiveJob activeJob) called with job id: " + jobId);
-            boolean found = jobId2EntityId.get(jobId) != null;
-            LOGGER.log(Level.WARNING, "Job was found to be: " + (found ? "Active" : "Inactive"));
-        }
+        LOGGER.log(level, "monitorJob(JobId jobId, ActiveJob activeJob) called with job id: " + jobId);
+        boolean found = jobId2EntityId.get(jobId) != null;
+        LOGGER.log(level, "Job was found to be: " + (found ? "Active" : "Inactive"));
         if (entityEventListeners.containsKey(entityId)) {
             if (jobId2EntityId.containsKey(jobId)) {
                 LOGGER.severe("Job is already being monitored for job: " + jobId);
                 return;
             }
-            if (DEBUG) {
-                LOGGER.log(Level.WARNING, "monitoring started for job id: " + jobId);
-            }
+            LOGGER.log(level, "monitoring started for job id: " + jobId);
         } else {
             LOGGER.info("No listener was found for the entity: " + entityId);
         }
@@ -140,9 +136,7 @@
         if (entityId != null) {
             add(new ActiveEvent(jobId, Kind.JOB_FINISHED, entityId, Pair.of(jobStatus, exceptions)));
         } else {
-            if (LOGGER.isLoggable(Level.INFO)) {
-                LOGGER.info("NO NEED TO NOTIFY JOB FINISH!");
-            }
+            LOGGER.log(level, "NO NEED TO NOTIFY JOB FINISH!");
         }
     }
 
@@ -156,20 +150,16 @@
 
     @Override
     public IActiveEntityEventsListener getListener(EntityId entityId) {
-        if (DEBUG) {
-            LOGGER.log(Level.WARNING, "getActiveEntityListener(EntityId entityId) was called with entity " + entityId);
-            IActiveEntityEventsListener listener = entityEventListeners.get(entityId);
-            LOGGER.log(Level.WARNING, "Listener found: " + listener);
-        }
+        LOGGER.log(level, "getActiveEntityListener(EntityId entityId) was called with entity " + entityId);
+        IActiveEntityEventsListener listener = entityEventListeners.get(entityId);
+        LOGGER.log(level, "Listener found: " + listener);
         return entityEventListeners.get(entityId);
     }
 
     @Override
     public synchronized IActiveEntityEventsListener[] getEventListeners() {
-        if (DEBUG) {
-            LOGGER.log(Level.WARNING, "getEventListeners() was called");
-            LOGGER.log(Level.WARNING, "returning " + entityEventListeners.size() + " Listeners");
-        }
+        LOGGER.log(level, "getEventListeners() was called");
+        LOGGER.log(level, "returning " + entityEventListeners.size() + " Listeners");
         return entityEventListeners.values().toArray(new IActiveEntityEventsListener[entityEventListeners.size()]);
     }
 
@@ -178,11 +168,8 @@
         if (suspended) {
             throw new RuntimeDataException(ErrorCode.ACTIVE_NOTIFICATION_HANDLER_IS_SUSPENDED);
         }
-        if (DEBUG) {
-            LOGGER.log(Level.WARNING,
-                    "registerListener(IActiveEntityEventsListener listener) was called for the entity "
-                            + listener.getEntityId());
-        }
+        LOGGER.log(level, "registerListener(IActiveEntityEventsListener listener) was called for the entity "
+                + listener.getEntityId());
         if (entityEventListeners.containsKey(listener.getEntityId())) {
             throw new RuntimeDataException(ErrorCode.ACTIVE_ENTITY_IS_ALREADY_REGISTERED, listener.getEntityId());
         }
@@ -194,7 +181,7 @@
         if (suspended) {
             throw new RuntimeDataException(ErrorCode.ACTIVE_NOTIFICATION_HANDLER_IS_SUSPENDED);
         }
-        LOGGER.log(Level.FINE, "unregisterListener(IActiveEntityEventsListener listener) was called for the entity "
+        LOGGER.log(level, "unregisterListener(IActiveEntityEventsListener listener) was called for the entity "
                 + listener.getEntityId());
         IActiveEntityEventsListener registeredListener = entityEventListeners.remove(listener.getEntityId());
         if (registeredListener == null) {
@@ -221,16 +208,16 @@
 
     @Override
     public synchronized void recover() throws HyracksDataException {
-        LOGGER.log(Level.FINE, "Starting active recovery");
+        LOGGER.log(level, "Starting active recovery");
         for (IActiveEntityEventsListener listener : entityEventListeners.values()) {
             synchronized (listener) {
-                LOGGER.log(Level.FINE, "Entity " + listener.getEntityId() + " is " + listener.getStats());
+                LOGGER.log(level, "Entity " + listener.getEntityId() + " is " + listener.getStats());
                 if (listener.getState() == ActivityState.PERMANENTLY_FAILED
                         && listener instanceof IActiveEntityController) {
-                    LOGGER.log(Level.FINE, "Recovering");
+                    LOGGER.log(level, "Recovering");
                     ((IActiveEntityController) listener).recover();
                 } else {
-                    LOGGER.log(Level.FINE, "Only notifying");
+                    LOGGER.log(level, "Only notifying");
                     listener.notifyAll();
                 }
             }
@@ -243,7 +230,7 @@
             if (suspended) {
                 throw new RuntimeDataException(ErrorCode.ACTIVE_EVENT_HANDLER_ALREADY_SUSPENDED);
             }
-            LOGGER.log(Level.FINE, "Suspending active events handler");
+            LOGGER.log(level, "Suspending active events handler");
             suspended = true;
         }
         IMetadataLockManager lockManager = mdProvider.getApplicationContext().getMetadataLockManager();
@@ -253,27 +240,27 @@
             // exclusive lock all the datasets
             String dataverseName = listener.getEntityId().getDataverse();
             String entityName = listener.getEntityId().getEntityName();
-            LOGGER.log(Level.FINE, "Suspending " + listener.getEntityId());
-            LOGGER.log(Level.FINE, "Acquiring locks");
+            LOGGER.log(level, "Suspending " + listener.getEntityId());
+            LOGGER.log(level, "Acquiring locks");
             lockManager.acquireActiveEntityWriteLock(mdProvider.getLocks(), dataverseName + '.' + entityName);
             List<Dataset> datasets = ((ActiveEntityEventsListener) listener).getDatasets();
             for (Dataset dataset : datasets) {
                 lockManager.acquireDatasetExclusiveModificationLock(mdProvider.getLocks(),
                         DatasetUtil.getFullyQualifiedName(dataset));
             }
-            LOGGER.log(Level.FINE, "locks acquired");
+            LOGGER.log(level, "locks acquired");
             ((ActiveEntityEventsListener) listener).suspend(mdProvider);
-            LOGGER.log(Level.FINE, listener.getEntityId() + " suspended");
+            LOGGER.log(level, listener.getEntityId() + " suspended");
         }
     }
 
     public void resume(MetadataProvider mdProvider)
             throws AsterixException, HyracksDataException, InterruptedException {
-        LOGGER.log(Level.FINE, "Resuming active events handler");
+        LOGGER.log(level, "Resuming active events handler");
         for (IActiveEntityEventsListener listener : entityEventListeners.values()) {
-            LOGGER.log(Level.FINE, "Resuming " + listener.getEntityId());
+            LOGGER.log(level, "Resuming " + listener.getEntityId());
             ((ActiveEntityEventsListener) listener).resume(mdProvider);
-            LOGGER.log(Level.FINE, listener.getEntityId() + " resumed");
+            LOGGER.log(level, listener.getEntityId() + " resumed");
         }
         synchronized (this) {
             suspended = false;
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/FeedEventsListener.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/FeedEventsListener.java
index 45c79a0..124e56e 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/FeedEventsListener.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/FeedEventsListener.java
@@ -114,7 +114,7 @@
     @Override
     protected Void doStop(MetadataProvider metadataProvider) throws HyracksDataException, AlgebricksException {
         IActiveEntityEventSubscriber eventSubscriber =
-                new WaitForStateSubscriber(this, Collections.singleton(ActivityState.STOPPED));
+                new WaitForStateSubscriber(this, EnumSet.of(ActivityState.STOPPED, ActivityState.PERMANENTLY_FAILED));
         try {
             // Construct ActiveMessage
             for (int i = 0; i < getLocations().getLocations().length; i++) {
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementRequestMessage.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementRequestMessage.java
index 9faa9e9..e7919fa 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementRequestMessage.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementRequestMessage.java
@@ -137,7 +137,6 @@
                 GlobalConfig.ASTERIX_LOGGER.log(Level.SEVERE, "Unexpected exception", e);
                 responseMsg.setError(new Exception(e.toString()));
             }
-
             try {
                 messageBroker.sendApplicationMessageToNC(responseMsg, requestNodeId);
             } catch (Exception e) {
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
index 29bc95e..7647881 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
@@ -24,6 +24,8 @@
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
@@ -31,7 +33,6 @@
 import org.apache.asterix.api.common.AppRuntimeContextProviderForRecovery;
 import org.apache.asterix.common.api.IDatasetLifecycleManager;
 import org.apache.asterix.common.api.INcApplicationContext;
-import org.apache.asterix.common.api.ThreadExecutor;
 import org.apache.asterix.common.cluster.ClusterPartition;
 import org.apache.asterix.common.config.ActiveProperties;
 import org.apache.asterix.common.config.AsterixExtension;
@@ -113,7 +114,7 @@
     private ReplicationProperties replicationProperties;
     private MessagingProperties messagingProperties;
     private final NodeProperties nodeProperties;
-    private ThreadExecutor threadExecutor;
+    private ExecutorService threadExecutor;
     private IDatasetLifecycleManager datasetLifecycleManager;
     private IBufferCache bufferCache;
     private ITransactionSubsystem txnSubsystem;
@@ -164,7 +165,7 @@
     @Override
     public void initialize(boolean initialRun) throws IOException, ACIDException {
         ioManager = getServiceContext().getIoManager();
-        threadExecutor = new ThreadExecutor(getServiceContext().getThreadFactory());
+        threadExecutor = Executors.newCachedThreadPool(getServiceContext().getThreadFactory());
         ICacheMemoryAllocator allocator = new HeapBufferAllocator();
         IPageCleanerPolicy pcp = new DelayPageCleanerPolicy(600000);
         IPageReplacementStrategy prs = new ClockPageReplacementStrategy(allocator,
@@ -383,7 +384,7 @@
     }
 
     @Override
-    public ThreadExecutor getThreadExecutor() {
+    public ExecutorService getThreadExecutor() {
         return threadExecutor;
     }
 
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/ResultPrinter.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/ResultPrinter.java
index 452d13e..56975d1 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/ResultPrinter.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/ResultPrinter.java
@@ -107,7 +107,7 @@
             }
             app.append("\r\n");
         } catch (IOException e) {
-            throw new HyracksDataException(e);
+            throw HyracksDataException.create(e);
         }
     }
 
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java
index 09c4983..9fc9940 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java
@@ -31,6 +31,7 @@
 import org.apache.asterix.active.ActiveRuntimeId;
 import org.apache.asterix.active.EntityId;
 import org.apache.asterix.active.message.ActiveManagerMessage;
+import org.apache.asterix.active.message.ActiveManagerMessage.Kind;
 import org.apache.asterix.app.translator.DefaultStatementExecutorFactory;
 import org.apache.asterix.common.context.IStorageComponentProvider;
 import org.apache.asterix.common.dataflow.ICcApplicationContext;
@@ -272,8 +273,9 @@
             }
 
             // make connections between operators
-            for (Entry<ConnectorDescriptorId, Pair<Pair<IOperatorDescriptor, Integer>,
-                    Pair<IOperatorDescriptor, Integer>>> entry : subJob.getConnectorOperatorMap().entrySet()) {
+            for (Entry<ConnectorDescriptorId,
+                       Pair<Pair<IOperatorDescriptor, Integer>, Pair<IOperatorDescriptor, Integer>>> entry
+                       : subJob.getConnectorOperatorMap().entrySet()) {
                 ConnectorDescriptorId newId = connectorIdMapping.get(entry.getKey());
                 IConnectorDescriptor connDesc = jobSpec.getConnectorMap().get(newId);
                 Pair<IOperatorDescriptor, Integer> leftOp = entry.getValue().getLeft();
@@ -381,7 +383,7 @@
 
     public static void SendStopMessageToNode(ICcApplicationContext appCtx, EntityId feedId, String intakeNodeLocation,
             Integer partition) throws Exception {
-        ActiveManagerMessage stopFeedMessage = new ActiveManagerMessage(ActiveManagerMessage.STOP_ACTIVITY,
+        ActiveManagerMessage stopFeedMessage = new ActiveManagerMessage(Kind.STOP_ACTIVITY,
                 new ActiveRuntimeId(feedId, FeedIntakeOperatorNodePushable.class.getSimpleName(), partition));
         SendActiveMessage(appCtx, stopFeedMessage, intakeNodeLocation);
     }
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/Action.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/Action.java
index 71cb038..74c4364 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/Action.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/Action.java
@@ -21,7 +21,7 @@
 import org.apache.asterix.metadata.declared.MetadataProvider;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 
-abstract class Action {
+public abstract class Action {
     boolean done = false;
     HyracksDataException failure;
 
@@ -39,21 +39,21 @@
 
     protected abstract void doExecute(MetadataProvider mdProvider) throws Exception;
 
-    boolean hasFailed() {
+    public boolean hasFailed() {
         return failure != null;
     }
 
-    HyracksDataException getFailure() {
+    public HyracksDataException getFailure() {
         return failure;
     }
 
-    synchronized void sync() throws InterruptedException {
+    public synchronized void sync() throws InterruptedException {
         while (!done) {
             wait();
         }
     }
 
-    boolean isDone() {
+    public boolean isDone() {
         return done;
     }
 }
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/ActiveStatsTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/ActiveStatsTest.java
index f8baa0e..e1fdb69 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/ActiveStatsTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/ActiveStatsTest.java
@@ -30,6 +30,7 @@
 import org.apache.asterix.active.IActiveRuntime;
 import org.apache.asterix.active.NoRetryPolicyFactory;
 import org.apache.asterix.active.message.ActivePartitionMessage;
+import org.apache.asterix.active.message.ActivePartitionMessage.Event;
 import org.apache.asterix.algebra.base.ILangExtension.Language;
 import org.apache.asterix.app.active.ActiveEntityEventsListener;
 import org.apache.asterix.app.active.ActiveNotificationHandler;
@@ -126,8 +127,8 @@
         requestedStats = eventsListener.getStats();
         Assert.assertTrue(requestedStats.contains("N/A"));
         // Fake partition message and notify eventListener
-        ActivePartitionMessage partitionMessage = new ActivePartitionMessage(activeRuntimeId, jobId,
-                ActivePartitionMessage.ACTIVE_RUNTIME_REGISTERED, null);
+        ActivePartitionMessage partitionMessage =
+                new ActivePartitionMessage(activeRuntimeId, jobId, Event.RUNTIME_REGISTERED, null);
         partitionMessage.handle(appCtx);
         start.sync();
         if (start.hasFailed()) {
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/Actor.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/Actor.java
index 3f68651..8d21b55 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/Actor.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/Actor.java
@@ -21,7 +21,7 @@
 import org.apache.asterix.active.SingleThreadEventProcessor;
 import org.apache.asterix.metadata.declared.MetadataProvider;
 
-class Actor extends SingleThreadEventProcessor<Action> {
+public class Actor extends SingleThreadEventProcessor<Action> {
 
     private final MetadataProvider actorMdProvider;
 
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/TestNodeControllerActor.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/TestNodeControllerActor.java
index e7e21b6..99499a3 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/TestNodeControllerActor.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/TestNodeControllerActor.java
@@ -23,6 +23,7 @@
 import org.apache.asterix.active.ActiveRuntimeId;
 import org.apache.asterix.active.EntityId;
 import org.apache.asterix.active.message.ActivePartitionMessage;
+import org.apache.asterix.active.message.ActivePartitionMessage.Event;
 import org.apache.asterix.metadata.declared.MetadataProvider;
 import org.apache.hyracks.api.job.JobId;
 
@@ -41,9 +42,8 @@
         Action registration = new Action() {
             @Override
             protected void doExecute(MetadataProvider actorMdProvider) throws Exception {
-                ActiveEvent event = new ActiveEvent(jobId, Kind.PARTITION_EVENT, entityId,
-                        new ActivePartitionMessage(new ActiveRuntimeId(entityId, id, partition), jobId,
-                                ActivePartitionMessage.ACTIVE_RUNTIME_REGISTERED, null));
+                ActiveEvent event = new ActiveEvent(jobId, Kind.PARTITION_EVENT, entityId, new ActivePartitionMessage(
+                        new ActiveRuntimeId(entityId, id, partition), jobId, Event.RUNTIME_REGISTERED, null));
                 clusterController.activeEvent(event);
             }
         };
@@ -55,9 +55,8 @@
         Action registration = new Action() {
             @Override
             protected void doExecute(MetadataProvider actorMdProvider) throws Exception {
-                ActiveEvent event = new ActiveEvent(jobId, Kind.PARTITION_EVENT, entityId,
-                        new ActivePartitionMessage(new ActiveRuntimeId(entityId, id, partition), jobId,
-                                ActivePartitionMessage.ACTIVE_RUNTIME_DEREGISTERED, null));
+                ActiveEvent event = new ActiveEvent(jobId, Kind.PARTITION_EVENT, entityId, new ActivePartitionMessage(
+                        new ActiveRuntimeId(entityId, id, partition), jobId, Event.RUNTIME_DEREGISTERED, null));
                 clusterController.activeEvent(event);
             }
         };
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java
index 1c29e86..a96adc0 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java
@@ -103,13 +103,13 @@
     // see
     // https://stackoverflow.com/questions/417142/what-is-the-maximum-length-of-a-url-in-different-browsers/417184
     private static final long MAX_URL_LENGTH = 2000l;
-    private static final Pattern JAVA_BLOCK_COMMENT_PATTERN = Pattern.compile("/\\*.*\\*/",
-            Pattern.MULTILINE | Pattern.DOTALL);
+    private static final Pattern JAVA_BLOCK_COMMENT_PATTERN =
+            Pattern.compile("/\\*.*\\*/", Pattern.MULTILINE | Pattern.DOTALL);
     private static final Pattern JAVA_LINE_COMMENT_PATTERN = Pattern.compile("^//.*$", Pattern.MULTILINE);
     private static final Pattern SHELL_LINE_COMMENT_PATTERN = Pattern.compile("^#.*$", Pattern.MULTILINE);
     private static final Pattern REGEX_LINES_PATTERN = Pattern.compile("^(-)?/(.*)/([im]*)$");
-    private static final Pattern POLL_TIMEOUT_PATTERN = Pattern.compile("polltimeoutsecs=(\\d+)(\\D|$)",
-            Pattern.MULTILINE);
+    private static final Pattern POLL_TIMEOUT_PATTERN =
+            Pattern.compile("polltimeoutsecs=(\\d+)(\\D|$)", Pattern.MULTILINE);
     private static final Pattern POLL_DELAY_PATTERN = Pattern.compile("polldelaysecs=(\\d+)(\\D|$)", Pattern.MULTILINE);
     private static final Pattern HANDLE_VARIABLE_PATTERN = Pattern.compile("handlevariable=(\\w+)");
     private static final Pattern VARIABLE_REF_PATTERN = Pattern.compile("\\$(\\w+)");
@@ -168,10 +168,10 @@
     public void runScriptAndCompareWithResult(File scriptFile, PrintWriter print, File expectedFile, File actualFile,
             ComparisonEnum compare) throws Exception {
         System.err.println("Expected results file: " + expectedFile.toString());
-        BufferedReader readerExpected = new BufferedReader(
-                new InputStreamReader(new FileInputStream(expectedFile), "UTF-8"));
-        BufferedReader readerActual = new BufferedReader(
-                new InputStreamReader(new FileInputStream(actualFile), "UTF-8"));
+        BufferedReader readerExpected =
+                new BufferedReader(new InputStreamReader(new FileInputStream(expectedFile), "UTF-8"));
+        BufferedReader readerActual =
+                new BufferedReader(new InputStreamReader(new FileInputStream(actualFile), "UTF-8"));
         boolean regex = false;
         try {
             if (ComparisonEnum.BINARY.equals(compare)) {
@@ -354,10 +354,10 @@
     public void runScriptAndCompareWithResultRegex(File scriptFile, File expectedFile, File actualFile)
             throws Exception {
         String lineExpected, lineActual;
-        try (BufferedReader readerExpected = new BufferedReader(
-                new InputStreamReader(new FileInputStream(expectedFile), "UTF-8"));
-                BufferedReader readerActual = new BufferedReader(
-                        new InputStreamReader(new FileInputStream(actualFile), "UTF-8"))) {
+        try (BufferedReader readerExpected =
+                new BufferedReader(new InputStreamReader(new FileInputStream(expectedFile), "UTF-8"));
+                BufferedReader readerActual =
+                        new BufferedReader(new InputStreamReader(new FileInputStream(actualFile), "UTF-8"))) {
             StringBuilder actual = new StringBuilder();
             while ((lineActual = readerActual.readLine()) != null) {
                 actual.append(lineActual).append('\n');
@@ -534,7 +534,7 @@
         return executeQueryService(str, fmt, uri, params, jsonEncoded, responseCodeValidator, false);
     }
 
-    protected InputStream executeQueryService(String str, OutputFormat fmt, URI uri, List<Parameter> params,
+    public InputStream executeQueryService(String str, OutputFormat fmt, URI uri, List<Parameter> params,
             boolean jsonEncoded, Predicate<Integer> responseCodeValidator, boolean cancellable) throws Exception {
         final List<Parameter> newParams = upsertParam(params, "format", fmt.mimeType());
         HttpUriRequest method = jsonEncoded ? constructPostMethodJson(str, uri, "statement", newParams)
@@ -697,8 +697,8 @@
     // Insert and Delete statements are executed here
     public void executeUpdate(String str, URI uri) throws Exception {
         // Create a method instance.
-        HttpUriRequest request = RequestBuilder.post(uri).setEntity(new StringEntity(str, StandardCharsets.UTF_8))
-                .build();
+        HttpUriRequest request =
+                RequestBuilder.post(uri).setEntity(new StringEntity(str, StandardCharsets.UTF_8)).build();
 
         // Execute the method.
         executeAndCheckHttpRequest(request);
@@ -708,10 +708,10 @@
     public InputStream executeAnyAQLAsync(String statement, boolean defer, OutputFormat fmt, URI uri,
             Map<String, Object> variableCtx) throws Exception {
         // Create a method instance.
-        HttpUriRequest request = RequestBuilder.post(uri)
-                .addParameter("mode", defer ? "asynchronous-deferred" : "asynchronous")
-                .setEntity(new StringEntity(statement, StandardCharsets.UTF_8)).setHeader("Accept", fmt.mimeType())
-                .build();
+        HttpUriRequest request =
+                RequestBuilder.post(uri).addParameter("mode", defer ? "asynchronous-deferred" : "asynchronous")
+                        .setEntity(new StringEntity(statement, StandardCharsets.UTF_8))
+                        .setHeader("Accept", fmt.mimeType()).build();
 
         String handleVar = getHandleVariable(statement);
 
@@ -737,8 +737,8 @@
     // create function statement
     public void executeDDL(String str, URI uri) throws Exception {
         // Create a method instance.
-        HttpUriRequest request = RequestBuilder.post(uri).setEntity(new StringEntity(str, StandardCharsets.UTF_8))
-                .build();
+        HttpUriRequest request =
+                RequestBuilder.post(uri).setEntity(new StringEntity(str, StandardCharsets.UTF_8)).build();
 
         // Execute the method.
         executeAndCheckHttpRequest(request);
@@ -748,8 +748,8 @@
     // and returns the contents as a string
     // This string is later passed to REST API for execution.
     public String readTestFile(File testFile) throws Exception {
-        BufferedReader reader = new BufferedReader(
-                new InputStreamReader(new FileInputStream(testFile), StandardCharsets.UTF_8));
+        BufferedReader reader =
+                new BufferedReader(new InputStreamReader(new FileInputStream(testFile), StandardCharsets.UTF_8));
         String line;
         StringBuilder stringBuilder = new StringBuilder();
         String ls = System.getProperty("line.separator");
@@ -804,8 +804,8 @@
 
     private static String getProcessOutput(Process p) throws Exception {
         ByteArrayOutputStream baos = new ByteArrayOutputStream();
-        Future<Integer> future = Executors.newSingleThreadExecutor()
-                .submit(() -> IOUtils.copy(p.getInputStream(), new OutputStream() {
+        Future<Integer> future =
+                Executors.newSingleThreadExecutor().submit(() -> IOUtils.copy(p.getInputStream(), new OutputStream() {
                     @Override
                     public void write(int b) throws IOException {
                         baos.write(b);
@@ -1357,7 +1357,7 @@
                         if (failedGroup != null) {
                             failedGroup.getTestCase().add(testCaseCtx.getTestCase());
                         }
-                        throw new Exception("Test \"" + testFile + "\" FAILED!");
+                        throw new Exception("Test \"" + testFile + "\" FAILED!", e);
                     }
                 } finally {
                     if (numOfFiles == testFileCtxs.size()) {
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/feed-stats/feed-stats.1.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/feed-stats/feed-stats.1.adm
index d0b0ea0..0fcdf15 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/feed-stats/feed-stats.1.adm
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/feed-stats/feed-stats.1.adm
@@ -4,8 +4,7 @@
       "adapter-stats" : {
         "incoming-records-count" : 13,
         "failed-at-parser-records-count" : 3
-      },
-      "executor-restart-times" : 0
+      }
     } ]
   }
 }
\ No newline at end of file
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
index 3da58e9..8abbeab 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
@@ -438,14 +438,15 @@
                 try {
                     dsInfo.wait();
                 } catch (InterruptedException e) {
-                    throw new HyracksDataException(e);
+                    Thread.currentThread().interrupt();
+                    throw HyracksDataException.create(e);
                 }
             }
         }
         try {
             flushDatasetOpenIndexes(dsInfo, false);
         } catch (Exception e) {
-            throw new HyracksDataException(e);
+            throw HyracksDataException.create(e);
         }
         for (IndexInfo iInfo : dsInfo.getIndexes().values()) {
             if (iInfo.isOpen()) {
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
index 94fe951..5aed5f2 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
@@ -228,6 +228,7 @@
     public static final int ACTIVE_EVENT_HANDLER_ALREADY_SUSPENDED = 3107;
     public static final int FEED_STOPPED_WHILE_WAITING_FOR_A_NEW_RECORD = 3108;
     public static final int METADATA_DROP_FUCTION_IN_USE = 3109;
+    public static final int FEED_FAILED_WHILE_GETTING_A_NEW_RECORD = 3110;
 
     // Lifecycle management errors
     public static final int DUPLICATE_PARTITION_ID = 4000;
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/Checkpoint.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/Checkpoint.java
index a74898e..a4c41df 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/Checkpoint.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/Checkpoint.java
@@ -120,7 +120,7 @@
         try {
             return new ObjectMapper().writeValueAsString(this);
         } catch (JsonProcessingException e) {
-            throw new HyracksDataException(e);
+            throw HyracksDataException.create(e);
         }
     }
 
@@ -128,7 +128,7 @@
         try {
             return new ObjectMapper().readValue(json, Checkpoint.class);
         } catch (IOException e) {
-            throw new HyracksDataException(e);
+            throw HyracksDataException.create(e);
         }
     }
 }
\ No newline at end of file
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/IAppRuntimeContextProvider.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/IAppRuntimeContextProvider.java
index 49f5457..229fb6d 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/IAppRuntimeContextProvider.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/IAppRuntimeContextProvider.java
@@ -18,9 +18,10 @@
  */
 package org.apache.asterix.common.transactions;
 
+import java.util.concurrent.ExecutorService;
+
 import org.apache.asterix.common.api.IDatasetLifecycleManager;
 import org.apache.asterix.common.api.INcApplicationContext;
-import org.apache.asterix.common.api.ThreadExecutor;
 import org.apache.hyracks.api.io.IIOManager;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationScheduler;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMOperationTracker;
@@ -29,7 +30,7 @@
 
 public interface IAppRuntimeContextProvider {
 
-    ThreadExecutor getThreadExecutor();
+    ExecutorService getThreadExecutor();
 
     IBufferCache getBufferCache();
 
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StoragePathUtil.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StoragePathUtil.java
index 2e98abd..824e30b 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StoragePathUtil.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StoragePathUtil.java
@@ -126,7 +126,7 @@
             }
             return file;
         } catch (Exception e) {
-            throw new HyracksDataException(e);
+            throw HyracksDataException.create(e);
         }
     }
 }
diff --git a/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties b/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
index 4cc06a6..a7a1990 100644
--- a/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
+++ b/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
@@ -217,6 +217,7 @@
 3107 = Active Notification Handler is already suspended
 3108 = Feed stopped while waiting for a new record
 3109 = Function %1$s is being used. It cannot be dropped.
+3110 = Feed failed while reading a new record
 
 # Lifecycle management errors
 4000 = Partition id %1$d for node %2$s already in use by node %3$s
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/LookupAdapterFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/LookupAdapterFactory.java
index a31b46d..b51416a 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/LookupAdapterFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/LookupAdapterFactory.java
@@ -74,7 +74,7 @@
             return new LookupAdapter<>(dataParser, reader, inRecDesc, ridReader, retainInput, retainMissing,
                     isMissingWriterFactory, ctx, writer);
         } catch (Exception e) {
-            throw new HyracksDataException(e);
+            throw HyracksDataException.create(e);
         }
     }
 
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IDataFlowController.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IDataFlowController.java
index def0bf1..7412338 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IDataFlowController.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IDataFlowController.java
@@ -18,27 +18,29 @@
  */
 package org.apache.asterix.external.api;
 
+import org.apache.asterix.common.exceptions.ErrorCode;
+import org.apache.asterix.common.exceptions.RuntimeDataException;
 import org.apache.hyracks.api.comm.IFrameWriter;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 
+@FunctionalInterface
 public interface IDataFlowController {
 
-    //TODO: Refactor this interface. Remove writer from start() signature
     public void start(IFrameWriter writer) throws HyracksDataException, InterruptedException;
 
     public default boolean pause() throws HyracksDataException {
-        throw new HyracksDataException("Method not implemented");
+        throw new RuntimeDataException(ErrorCode.OPERATION_NOT_SUPPORTED);
     }
 
     public default boolean resume() throws HyracksDataException {
-        throw new HyracksDataException("Method not implemented");
+        throw new RuntimeDataException(ErrorCode.OPERATION_NOT_SUPPORTED);
     }
 
     public default void flush() throws HyracksDataException {
-        throw new HyracksDataException("Method not implemented");
+        throw new RuntimeDataException(ErrorCode.OPERATION_NOT_SUPPORTED);
     }
 
     public default boolean stop() throws HyracksDataException {
-        throw new HyracksDataException("Method not implemented");
+        throw new RuntimeDataException(ErrorCode.OPERATION_NOT_SUPPORTED);
     }
 }
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IDataSourceAdapter.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IDataSourceAdapter.java
index e62672d..472cdae 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IDataSourceAdapter.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IDataSourceAdapter.java
@@ -18,8 +18,6 @@
  */
 package org.apache.asterix.external.api;
 
-import java.io.Serializable;
-
 import org.apache.hyracks.api.comm.IFrameWriter;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 
@@ -30,7 +28,7 @@
  * adapter(pull or push).
  */
 @FunctionalInterface
-public interface IDataSourceAdapter extends Serializable {
+public interface IDataSourceAdapter {
 
     public enum AdapterType {
         INTERNAL,
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/AbstractFeedDataFlowController.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/AbstractFeedDataFlowController.java
index c87fe2d..53fa137 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/AbstractFeedDataFlowController.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/AbstractFeedDataFlowController.java
@@ -57,14 +57,5 @@
         tupleForwarder.flush();
     }
 
-    @Override
-    public abstract boolean stop() throws HyracksDataException;
-
-    public abstract boolean handleException(Throwable th) throws HyracksDataException;
-
     public abstract String getStats();
-
-    public void fail() throws HyracksDataException {
-        tupleForwarder.fail();
-    }
 }
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java
index 5b9b96f..e24c26d 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java
@@ -20,6 +20,8 @@
 
 import java.io.IOException;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.logging.Level;
+import java.util.logging.Logger;
 
 import org.apache.asterix.common.exceptions.ErrorCode;
 import org.apache.asterix.common.exceptions.RuntimeDataException;
@@ -32,16 +34,20 @@
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
-import org.apache.log4j.Level;
-import org.apache.log4j.Logger;
 
 public class FeedRecordDataFlowController<T> extends AbstractFeedDataFlowController {
+    public enum State {
+        CREATED,
+        STARTED,
+        STOPPED
+    }
+
     private static final Logger LOGGER = Logger.getLogger(FeedRecordDataFlowController.class.getName());
     private final IRecordDataParser<T> dataParser;
     private final IRecordReader<T> recordReader;
     protected final AtomicBoolean closed = new AtomicBoolean(false);
     protected static final long INTERVAL = 1000;
-    protected boolean failed = false;
+    protected State state = State.CREATED;
     protected long incomingRecordsCount = 0;
     protected long failedRecordsCount = 0;
 
@@ -57,8 +63,15 @@
 
     @Override
     public void start(IFrameWriter writer) throws HyracksDataException, InterruptedException {
+        synchronized (this) {
+            if (state == State.STOPPED) {
+                return;
+            } else {
+                setState(State.STARTED);
+            }
+        }
+        Exception failure = null;
         try {
-            failed = false;
             tupleForwarder.initialize(ctx, writer);
             while (hasNext()) {
                 IRawRecord<? extends T> record = next();
@@ -74,21 +87,48 @@
                 }
             }
         } catch (HyracksDataException e) {
-            LOGGER.log(Level.WARN, e);
+            LOGGER.log(Level.WARNING, "Exception during ingestion", e);
             //if interrupted while waiting for a new record, then it is safe to not fail forward
             if (e.getComponent() == ErrorCode.ASTERIX
-                    && e.getErrorCode() == ErrorCode.FEED_STOPPED_WHILE_WAITING_FOR_A_NEW_RECORD) {
-                // Do nothing
+                    && (e.getErrorCode() == ErrorCode.FEED_STOPPED_WHILE_WAITING_FOR_A_NEW_RECORD)) {
+                // Do nothing. interrupted by the active manager
+            } else if (e.getComponent() == ErrorCode.ASTERIX
+                    && (e.getErrorCode() == ErrorCode.FEED_FAILED_WHILE_GETTING_A_NEW_RECORD)) {
+                // Failure but we know we can for sure push the previously parsed records safely
+                failure = e;
+                try {
+                    flush();
+                } catch (Exception flushException) {
+                    tupleForwarder.fail();
+                    flushException.addSuppressed(e);
+                    failure = flushException;
+                }
             } else {
-                failed = true;
-                throw e;
+                failure = e;
+                tupleForwarder.fail();
             }
         } catch (Exception e) {
-            failed = true;
-            LOGGER.warn("Failure while operating a feed source", e);
-            throw HyracksDataException.create(e);
+            failure = e;
+            tupleForwarder.fail();
+            LOGGER.log(Level.WARNING, "Failure while operating a feed source", e);
+        } finally {
+            failure = finish(failure);
         }
-        finish();
+        if (failure != null) {
+            if (failure instanceof InterruptedException) {
+                throw (InterruptedException) failure;
+            }
+            throw HyracksDataException.create(failure);
+        }
+    }
+
+    private synchronized void setState(State newState) {
+        LOGGER.log(Level.INFO, "State is being set from " + state + " to " + newState);
+        state = newState;
+    }
+
+    public synchronized State getState() {
+        return state;
     }
 
     private IRawRecord<? extends T> next() throws HyracksDataException {
@@ -97,47 +137,58 @@
         } catch (InterruptedException e) { // NOSONAR Gracefully handling interrupt to push records in the pipeline
             throw new RuntimeDataException(ErrorCode.FEED_STOPPED_WHILE_WAITING_FOR_A_NEW_RECORD, e);
         } catch (Exception e) {
-            throw HyracksDataException.create(e);
+            if (!recordReader.handleException(e)) {
+                throw new RuntimeDataException(ErrorCode.FEED_FAILED_WHILE_GETTING_A_NEW_RECORD, e);
+            }
+            return null;
         }
     }
 
     private boolean hasNext() throws HyracksDataException {
-        boolean hasNext;
-        try {
-            hasNext = recordReader.hasNext();
-        } catch (InterruptedException e) { // NOSONAR Gracefully handling interrupt to push records in the pipeline
-            throw new RuntimeDataException(ErrorCode.FEED_STOPPED_WHILE_WAITING_FOR_A_NEW_RECORD, e);
-        } catch (Exception e) {
-            throw HyracksDataException.create(e);
+        while (true) {
+            try {
+                return recordReader.hasNext();
+            } catch (InterruptedException e) { // NOSONAR Gracefully handling interrupt to push records in the pipeline
+                throw new RuntimeDataException(ErrorCode.FEED_STOPPED_WHILE_WAITING_FOR_A_NEW_RECORD, e);
+            } catch (Exception e) {
+                if (!recordReader.handleException(e)) {
+                    throw new RuntimeDataException(ErrorCode.FEED_FAILED_WHILE_GETTING_A_NEW_RECORD, e);
+                }
+            }
         }
-        return hasNext;
     }
 
-    private void finish() throws HyracksDataException {
+    private Exception finish(Exception failure) {
         HyracksDataException hde = null;
         try {
-            tupleForwarder.close();
-        } catch (Throwable th) {
+            recordReader.close();
+        } catch (Exception th) {
+            LOGGER.log(Level.WARNING, "Failure during while operating a feed source", th);
             hde = HyracksDataException.suppress(hde, th);
         }
         try {
-            recordReader.close();
-        } catch (Throwable th) {
-            LOGGER.warn("Failure during while operating a feed sourcec", th);
+            tupleForwarder.close();
+        } catch (Exception th) {
             hde = HyracksDataException.suppress(hde, th);
         } finally {
             closeSignal();
         }
+        setState(State.STOPPED);
         if (hde != null) {
-            throw hde;
+            if (failure != null) {
+                failure.addSuppressed(hde);
+            } else {
+                return hde;
+            }
         }
+        return failure;
     }
 
     private boolean parseAndForward(IRawRecord<? extends T> record) throws IOException {
         try {
             dataParser.parse(record, tb.getDataOutput());
         } catch (Exception e) {
-            LOGGER.warn(ExternalDataConstants.ERROR_PARSE_RECORD, e);
+            LOGGER.log(Level.WARNING, ExternalDataConstants.ERROR_PARSE_RECORD, e);
             feedLogManager.logRecord(record.toString(), ExternalDataConstants.ERROR_PARSE_RECORD);
             // continue the outer loop
             return false;
@@ -172,44 +223,31 @@
 
     @Override
     public boolean stop() throws HyracksDataException {
-        HyracksDataException hde = null;
+        synchronized (this) {
+            switch (state) {
+                case CREATED:
+                case STOPPED:
+                    setState(State.STOPPED);
+                    return true;
+                case STARTED:
+                    break;
+                default:
+                    throw new HyracksDataException("unknown state " + state);
+
+            }
+        }
         if (recordReader.stop()) {
-            if (failed) {
-                // failed, close here
-                try {
-                    tupleForwarder.close();
-                } catch (Throwable th) {
-                    hde = HyracksDataException.suppress(hde, th);
-                }
-                try {
-                    recordReader.close();
-                } catch (Throwable th) {
-                    hde = HyracksDataException.suppress(hde, th);
-                }
-                if (hde != null) {
-                    throw hde;
-                }
-            } else {
-                try {
-                    waitForSignal();
-                } catch (InterruptedException e) {
-                    throw HyracksDataException.create(e);
-                }
+            try {
+                waitForSignal();
+            } catch (InterruptedException e) {
+                Thread.currentThread().interrupt();
+                throw HyracksDataException.create(e);
             }
             return true;
         }
         return false;
     }
 
-    @Override
-    public boolean handleException(Throwable th) throws HyracksDataException {
-        // This is not a parser record. most likely, this error happened in the record reader.
-        if (!recordReader.handleException(th)) {
-            finish();
-        }
-        return !closed.get();
-    }
-
     public IRecordReader<T> getReader() {
         return recordReader;
     }
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedStreamDataFlowController.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedStreamDataFlowController.java
index cad11cd..1f1f545 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedStreamDataFlowController.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedStreamDataFlowController.java
@@ -43,8 +43,7 @@
         try {
             tupleForwarder.initialize(ctx, writer);
             while (true) {
-                tb.reset();
-                if (!dataParser.parse(tb.getDataOutput())) {
+                if (!parseNext()) {
                     break;
                 }
                 tb.addFieldEndOffset();
@@ -52,12 +51,25 @@
                 incomingRecordsCount++;
             }
         } catch (Exception e) {
-            throw new HyracksDataException(e);
+            throw HyracksDataException.create(e);
         } finally {
             tupleForwarder.close();
         }
     }
 
+    private boolean parseNext() throws HyracksDataException {
+        while (true) {
+            try {
+                tb.reset();
+                return dataParser.parse(tb.getDataOutput());
+            } catch (Exception e) {
+                if (!handleException(e)) {
+                    throw e;
+                }
+            }
+        }
+    }
+
     @Override
     public boolean stop() throws HyracksDataException {
         try {
@@ -71,8 +83,7 @@
         return false;
     }
 
-    @Override
-    public boolean handleException(Throwable th) {
+    private boolean handleException(Throwable th) {
         boolean handled = true;
         try {
             handled &= stream.handleException(th);
@@ -86,6 +97,7 @@
         return handled;
     }
 
+    @Override
     public String getStats() {
         return "{\"incoming-records-number\": " + incomingRecordsCount + "}";
     }
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedTupleForwarder.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedTupleForwarder.java
index 3a8130b..f824b67 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedTupleForwarder.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedTupleForwarder.java
@@ -103,7 +103,7 @@
                 if (throwable != null) {
                     throwable.addSuppressed(e);
                 } else {
-                    throw new HyracksDataException(e);
+                    throw HyracksDataException.create(e);
                 }
             } catch (Throwable th) {
                 if (throwable != null) {
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/IndexingDataFlowController.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/IndexingDataFlowController.java
index 2b06775..c4f75e3 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/IndexingDataFlowController.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/IndexingDataFlowController.java
@@ -32,8 +32,8 @@
     private final IExternalIndexer indexer;
 
     public IndexingDataFlowController(IHyracksTaskContext ctx, ITupleForwarder tupleForwarder,
-            IRecordDataParser<T> dataParser, IRecordReader<? extends T> recordReader,
-            IExternalIndexer indexer) throws IOException {
+            IRecordDataParser<T> dataParser, IRecordReader<? extends T> recordReader, IExternalIndexer indexer)
+            throws IOException {
         super(ctx, tupleForwarder, dataParser, recordReader, 1 + indexer.getNumberOfFields());
         this.indexer = indexer;
     }
@@ -43,7 +43,7 @@
         try {
             indexer.index(tb);
         } catch (IOException e) {
-            throw new HyracksDataException(e);
+            throw HyracksDataException.create(e);
         }
     }
 }
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/RateControlledTupleForwarder.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/RateControlledTupleForwarder.java
index eb5527f..f34b77d 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/RateControlledTupleForwarder.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/RateControlledTupleForwarder.java
@@ -71,7 +71,8 @@
             try {
                 Thread.sleep(interTupleInterval);
             } catch (InterruptedException e) {
-                throw new HyracksDataException(e);
+                Thread.currentThread().interrupt();
+                throw HyracksDataException.create(e);
             }
         }
         boolean success = appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize());
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/FeedAdapter.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/FeedAdapter.java
index 9f32a25..eeda80c 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/FeedAdapter.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/FeedAdapter.java
@@ -24,7 +24,6 @@
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 
 public class FeedAdapter implements IDataSourceAdapter {
-    private static final long serialVersionUID = 1L;
     private final AbstractFeedDataFlowController controller;
 
     public FeedAdapter(AbstractFeedDataFlowController controller) {
@@ -40,10 +39,6 @@
         return controller.stop();
     }
 
-    public boolean handleException(Throwable e) throws HyracksDataException {
-        return controller.handleException(e);
-    }
-
     public boolean pause() throws HyracksDataException {
         return controller.pause();
     }
@@ -56,7 +51,7 @@
         return controller.getStats();
     }
 
-    public void fail() throws HyracksDataException {
-        controller.fail();
+    public AbstractFeedDataFlowController getController() {
+        return controller;
     }
 }
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/GenericAdapter.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/GenericAdapter.java
index 0681d71..916fe0a 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/GenericAdapter.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/GenericAdapter.java
@@ -25,7 +25,6 @@
 
 public class GenericAdapter implements IDataSourceAdapter {
 
-    private static final long serialVersionUID = 1L;
     private final IDataFlowController controller;
 
     public GenericAdapter(IDataFlowController controller) {
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/AdapterExecutor.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/AdapterExecutor.java
deleted file mode 100644
index d102d0c..0000000
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/AdapterExecutor.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.feed.runtime;
-
-import org.apache.asterix.external.dataset.adapter.FeedAdapter;
-import org.apache.hyracks.api.comm.IFrameWriter;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.log4j.Logger;
-
-/**
- * The class in charge of executing feed adapters.
- */
-public class AdapterExecutor implements Runnable {
-
-    private static final Logger LOGGER = Logger.getLogger(AdapterExecutor.class.getName());
-
-    private final IFrameWriter writer; // A writer that sends frames to multiple receivers (that can
-    // increase or decrease at any time)
-    private final FeedAdapter adapter; // The adapter
-    private final AdapterRuntimeManager adapterManager;// The runtime manager <-- two way visibility -->
-    private int restartCount = 0;
-
-    public AdapterExecutor(IFrameWriter writer, FeedAdapter adapter, AdapterRuntimeManager adapterManager) {
-        this.writer = writer;
-        this.adapter = adapter;
-        this.adapterManager = adapterManager;
-    }
-
-    @Override
-    public void run() {
-        // Start by getting the partition number from the manager
-        if (LOGGER.isInfoEnabled()) {
-            LOGGER.info("Starting ingestion for partition:" + adapterManager.getPartition());
-        }
-        boolean failed = false;
-        try {
-            failed = doRun();
-        } catch (InterruptedException e) {
-            Thread.currentThread().interrupt();
-        } catch (Exception e) {
-            failed = true;
-            LOGGER.error("Unhandled Exception", e);
-        } finally {
-            // Done with the adapter. about to close, setting the stage based on the failed ingestion flag and notifying
-            // the runtime manager
-            adapterManager.setFailed(failed);
-            adapterManager.setDone(true);
-            synchronized (adapterManager) {
-                adapterManager.notifyAll();
-            }
-        }
-    }
-
-    private boolean doRun() throws HyracksDataException, InterruptedException {
-        boolean continueIngestion = true;
-        boolean failedIngestion = false;
-        while (continueIngestion) {
-            try {
-                // Start the adapter
-                adapter.start(adapterManager.getPartition(), writer);
-                // Adapter has completed execution
-                continueIngestion = false;
-            } catch (InterruptedException e) {
-                adapter.fail();
-                throw e;
-            } catch (Exception e) {
-                LOGGER.error("Exception during feed ingestion ", e);
-                continueIngestion = adapter.handleException(e);
-                if (!continueIngestion) {
-                    adapter.fail();
-                }
-                failedIngestion = !continueIngestion;
-                restartCount++;
-            }
-        }
-        return failedIngestion;
-    }
-
-    public String getStats() {
-        return "{\"adapter-stats\": " + adapter.getStats() + ", \"executor-restart-times\": " + restartCount + "}";
-    }
-}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/AdapterRuntimeManager.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/AdapterRuntimeManager.java
deleted file mode 100644
index 1b5eeac..0000000
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/AdapterRuntimeManager.java
+++ /dev/null
@@ -1,144 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.feed.runtime;
-
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import org.apache.asterix.active.EntityId;
-import org.apache.asterix.external.dataset.adapter.FeedAdapter;
-import org.apache.hyracks.api.comm.IFrameWriter;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-
-/**
- * This class manages the execution of an adapter within a feed
- */
-public class AdapterRuntimeManager {
-
-    private static final Logger LOGGER = Logger.getLogger(AdapterRuntimeManager.class.getName());
-
-    private final EntityId feedId; // (dataverse-feed)
-
-    private final FeedAdapter feedAdapter; // The adapter
-
-    private final AdapterExecutor adapterExecutor; // The executor for the adapter
-
-    private final int partition; // The partition number
-
-    private final IHyracksTaskContext ctx;
-
-    private Future<?> execution;
-
-    private boolean started = false;
-    private volatile boolean done = false;
-    private volatile boolean failed = false;
-
-    public AdapterRuntimeManager(IHyracksTaskContext ctx, EntityId entityId, FeedAdapter feedAdapter,
-            IFrameWriter writer, int partition) {
-        this.ctx = ctx;
-        this.feedId = entityId;
-        this.feedAdapter = feedAdapter;
-        this.partition = partition;
-        this.adapterExecutor = new AdapterExecutor(writer, feedAdapter, this);
-    }
-
-    public void start() {
-        synchronized (adapterExecutor) {
-            started = true;
-            if (!done) {
-                execution = ctx.getExecutorService().submit(adapterExecutor);
-            } else {
-                LOGGER.log(Level.WARNING, "Someone stopped me before I even start. I will simply not start");
-            }
-        }
-    }
-
-    public void stop() throws HyracksDataException, InterruptedException {
-        synchronized (adapterExecutor) {
-            try {
-                if (started) {
-                    try {
-                        ctx.getExecutorService().submit(() -> {
-                            if (feedAdapter.stop()) {
-                                execution.get();
-                            }
-                            return null;
-                        }).get(30, TimeUnit.SECONDS);
-                    } catch (InterruptedException e) {
-                        LOGGER.log(Level.WARNING, "Interrupted while trying to stop an adapter runtime", e);
-                        throw e;
-                    } catch (Exception e) {
-                        LOGGER.log(Level.WARNING, "Exception while trying to stop an adapter runtime", e);
-                        throw HyracksDataException.create(e);
-                    } finally {
-                        execution.cancel(true);
-                    }
-                } else {
-                    LOGGER.log(Level.WARNING, "Adapter executor was stopped before it starts");
-                }
-            } finally {
-                done = true;
-            }
-        }
-    }
-
-    public EntityId getFeedId() {
-        return feedId;
-    }
-
-    @Override
-    public String toString() {
-        return feedId + "[" + partition + "]";
-    }
-
-    public FeedAdapter getFeedAdapter() {
-        return feedAdapter;
-    }
-
-    public AdapterExecutor getAdapterExecutor() {
-        return adapterExecutor;
-    }
-
-    public int getPartition() {
-        return partition;
-    }
-
-    public boolean isFailed() {
-        return failed;
-    }
-
-    public void setFailed(boolean failed) {
-        this.failed = failed;
-    }
-
-    public boolean isDone() {
-        return done;
-    }
-
-    public void setDone(boolean done) {
-        this.done = done;
-    }
-
-    public String getStats() {
-        return adapterExecutor.getStats();
-    }
-}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/HDFSDataSourceFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/HDFSDataSourceFactory.java
index cbf784e..982cf5b 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/HDFSDataSourceFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/HDFSDataSourceFactory.java
@@ -137,7 +137,7 @@
             restoreConfig(ctx);
             return new HDFSInputStream(read, inputSplits, readSchedule, nodeName, conf, configuration, files, indexer);
         } catch (Exception e) {
-            throw new HyracksDataException(e);
+            throw HyracksDataException.create(e);
         }
     }
 
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/LocalFSInputStream.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/LocalFSInputStream.java
index 00ac090..c5ca129 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/LocalFSInputStream.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/LocalFSInputStream.java
@@ -21,16 +21,17 @@
 import java.io.File;
 import java.io.FileInputStream;
 import java.io.IOException;
+import java.util.logging.Level;
+import java.util.logging.Logger;
 
 import org.apache.asterix.common.exceptions.ErrorCode;
-import org.apache.asterix.common.exceptions.RuntimeDataException;
+import org.apache.asterix.common.exceptions.ExceptionUtils;
 import org.apache.asterix.external.api.AsterixInputStream;
 import org.apache.asterix.external.dataflow.AbstractFeedDataFlowController;
 import org.apache.asterix.external.util.ExternalDataConstants;
 import org.apache.asterix.external.util.FeedLogManager;
 import org.apache.asterix.external.util.FileSystemWatcher;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.log4j.Logger;
 
 public class LocalFSInputStream extends AsterixInputStream {
 
@@ -155,24 +156,25 @@
         if (in == null) {
             return false;
         }
-        if (th instanceof HyracksDataException
-                && ((HyracksDataException) th).getErrorCode() == ErrorCode.RECORD_READER_MALFORMED_INPUT_STREAM) {
+        Throwable root = ExceptionUtils.getRootCause(th);
+        if (root instanceof HyracksDataException
+                && ((HyracksDataException) root).getErrorCode() == ErrorCode.RECORD_READER_MALFORMED_INPUT_STREAM) {
             if (currentFile != null) {
                 try {
                     logManager.logRecord(currentFile.getAbsolutePath(), "Corrupted input file");
                 } catch (IOException e) {
-                    LOGGER.warn("Filed to write to feed log file", e);
+                    LOGGER.log(Level.WARNING, "Filed to write to feed log file", e);
                 }
-                LOGGER.warn("Corrupted input file: " + currentFile.getAbsolutePath());
+                LOGGER.log(Level.WARNING, "Corrupted input file: " + currentFile.getAbsolutePath());
             }
             try {
                 advance();
                 return true;
             } catch (Exception e) {
-                LOGGER.warn("An exception was thrown while trying to skip a file", e);
+                LOGGER.log(Level.WARNING, "An exception was thrown while trying to skip a file", e);
             }
         }
-        LOGGER.warn("Failed to recover from failure", th);
+        LOGGER.log(Level.WARNING, "Failed to recover from failure", th);
         return false;
     }
 }
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/SocketClientInputStreamFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/SocketClientInputStreamFactory.java
index 9a0e718..caeaa07 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/SocketClientInputStreamFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/SocketClientInputStreamFactory.java
@@ -84,7 +84,7 @@
         try {
             return new SocketClientInputStream(sockets.get(partition));
         } catch (IOException e) {
-            throw new HyracksDataException(e);
+            throw HyracksDataException.create(e);
         }
     }
 }
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/SocketServerInputStreamFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/SocketServerInputStreamFactory.java
index 05931b2..1f1fa5c 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/SocketServerInputStreamFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/SocketServerInputStreamFactory.java
@@ -124,7 +124,7 @@
             server.bind(new InetSocketAddress(socket.second));
             return new SocketServerInputStream(server);
         } catch (IOException e) {
-            throw new HyracksDataException(e);
+            throw HyracksDataException.create(e);
         }
     }
 
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/TwitterFirehoseStreamFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/TwitterFirehoseStreamFactory.java
index b32006c..12be449 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/TwitterFirehoseStreamFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/TwitterFirehoseStreamFactory.java
@@ -97,7 +97,7 @@
         try {
             return new TwitterFirehoseInputStream(configuration, partition);
         } catch (IOException e) {
-            throw new HyracksDataException(e);
+            throw HyracksDataException.create(e);
         }
     }
 }
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/java/JObjectAccessors.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/java/JObjectAccessors.java
index 0d485924..2b5e248 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/java/JObjectAccessors.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/java/JObjectAccessors.java
@@ -23,6 +23,8 @@
 import java.io.IOException;
 import java.util.LinkedHashMap;
 import java.util.List;
+import java.util.logging.Level;
+import java.util.logging.Logger;
 
 import org.apache.asterix.common.exceptions.AsterixException;
 import org.apache.asterix.common.exceptions.ErrorCode;
@@ -95,6 +97,11 @@
 
 public class JObjectAccessors {
 
+    private static final Logger LOGGER = Logger.getLogger(JObjectAccessors.class.getName());
+
+    private JObjectAccessors() {
+    }
+
     public static IJObjectAccessor createFlatJObjectAccessor(ATypeTag aTypeTag) {
         IJObjectAccessor accessor = null;
         switch (aTypeTag) {
@@ -200,18 +207,16 @@
         @Override
         public IJObject access(IVisitablePointable pointable, IObjectPool<IJObject, IAType> objPool)
                 throws HyracksDataException {
-            IJObject jObject = objPool.allocate(BuiltinType.ANULL);
-            return jObject;
+            return objPool.allocate(BuiltinType.ANULL);
         }
     }
 
-    public static class JMissingAccessor implements  IJObjectAccessor {
+    public static class JMissingAccessor implements IJObjectAccessor {
 
         @Override
         public IJObject access(IVisitablePointable pointable, IObjectPool<IJObject, IAType> objPool)
                 throws HyracksDataException {
-            IJObject jObject = objPool.allocate(BuiltinType.AMISSING);
-            return jObject;
+            return objPool.allocate(BuiltinType.AMISSING);
         }
     }
 
@@ -271,7 +276,7 @@
             try {
                 v = reader.readUTF(new DataInputStream(new ByteArrayInputStream(b, s + 1, l - 1)));
             } catch (IOException e) {
-                throw new HyracksDataException(e);
+                throw HyracksDataException.create(e);
             }
             JObjectUtil.getNormalizedString(v);
 
@@ -539,8 +544,8 @@
                 }
 
             } catch (Exception e) {
-                e.printStackTrace();
-                throw new HyracksDataException(e);
+                LOGGER.log(Level.WARNING, "Failure while accessing a java record", e);
+                throw HyracksDataException.create(e);
             }
             return jRecord;
         }
@@ -593,7 +598,7 @@
                     list.add(listItem);
                 }
             } catch (AsterixException exception) {
-                throw new HyracksDataException(exception);
+                throw HyracksDataException.create(exception);
             }
             return list;
         }
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalIndexBulkModifyOperatorNodePushable.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalIndexBulkModifyOperatorNodePushable.java
index 45d424e..242773e 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalIndexBulkModifyOperatorNodePushable.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalIndexBulkModifyOperatorNodePushable.java
@@ -89,7 +89,7 @@
             try {
                 bulkLoader.end();
             } catch (Throwable th) {
-                throw new HyracksDataException(th);
+                throw HyracksDataException.create(th);
             } finally {
                 try {
                     indexHelper.close();
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalLookupOperatorDescriptor.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalLookupOperatorDescriptor.java
index 6299982..c096f69 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalLookupOperatorDescriptor.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalLookupOperatorDescriptor.java
@@ -89,18 +89,21 @@
                     try {
                         snapshotAccessor.close();
                     } catch (Throwable th) {
-                        hde = new HyracksDataException(th);
+                        hde = HyracksDataException.create(th);
                     }
                     try {
                         adapter.close();
                     } catch (Throwable th) {
                         if (hde == null) {
-                            hde = new HyracksDataException(th);
+                            hde = HyracksDataException.create(th);
                         } else {
                             hde.addSuppressed(th);
                         }
                     }
                 }
+                if (hde != null) {
+                    throw hde;
+                }
             }
 
             @Override
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorDescriptor.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorDescriptor.java
index 3a06a2b..770e978 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorDescriptor.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorDescriptor.java
@@ -96,8 +96,7 @@
         if (adaptorFactory == null) {
             adaptorFactory = createExternalAdapterFactory(ctx);
         }
-        return new FeedIntakeOperatorNodePushable(ctx, feedId, adaptorFactory, partition, policyAccessor,
-                recordDescProvider, this);
+        return new FeedIntakeOperatorNodePushable(ctx, feedId, adaptorFactory, partition, recordDescProvider, this);
     }
 
     private IAdapterFactory createExternalAdapterFactory(IHyracksTaskContext ctx) throws HyracksDataException {
@@ -112,7 +111,7 @@
                 adapterFactory.setOutputType(adapterOutputType);
                 adapterFactory.configure(ctx.getJobletContext().getServiceContext(), adaptorConfiguration);
             } catch (Exception e) {
-                throw new HyracksDataException(e);
+                throw HyracksDataException.create(e);
             }
         } else {
             RuntimeDataException err = new RuntimeDataException(
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorNodePushable.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorNodePushable.java
index 8c6a420..16b8fba 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorNodePushable.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorNodePushable.java
@@ -18,15 +18,14 @@
  */
 package org.apache.asterix.external.operators;
 
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
 import org.apache.asterix.active.ActiveRuntimeId;
 import org.apache.asterix.active.ActiveSourceOperatorNodePushable;
 import org.apache.asterix.active.EntityId;
-import org.apache.asterix.common.exceptions.ErrorCode;
-import org.apache.asterix.common.exceptions.RuntimeDataException;
 import org.apache.asterix.external.api.IAdapterFactory;
 import org.apache.asterix.external.dataset.adapter.FeedAdapter;
-import org.apache.asterix.external.feed.policy.FeedPolicyAccessor;
-import org.apache.asterix.external.feed.runtime.AdapterRuntimeManager;
 import org.apache.hyracks.api.comm.IFrame;
 import org.apache.hyracks.api.comm.VSizeFrame;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
@@ -42,74 +41,97 @@
  * The artifacts are lazily activated when a feed receives a subscription request.
  */
 public class FeedIntakeOperatorNodePushable extends ActiveSourceOperatorNodePushable {
-
-    private final int partition;
-    private final IAdapterFactory adapterFactory;
+    private static final Logger LOGGER = Logger.getLogger(FeedIntakeOperatorNodePushable.class.getName());
     private final FeedIntakeOperatorDescriptor opDesc;
-    private volatile AdapterRuntimeManager adapterRuntimeManager;
+    private final FeedAdapter adapter;
+    private boolean poisoned = false;
 
     public FeedIntakeOperatorNodePushable(IHyracksTaskContext ctx, EntityId feedId, IAdapterFactory adapterFactory,
-            int partition, FeedPolicyAccessor policyAccessor, IRecordDescriptorProvider recordDescProvider,
-            FeedIntakeOperatorDescriptor feedIntakeOperatorDescriptor) {
+            int partition, IRecordDescriptorProvider recordDescProvider,
+            FeedIntakeOperatorDescriptor feedIntakeOperatorDescriptor) throws HyracksDataException {
         super(ctx, new ActiveRuntimeId(feedId, FeedIntakeOperatorNodePushable.class.getSimpleName(), partition));
         this.opDesc = feedIntakeOperatorDescriptor;
         this.recordDesc = recordDescProvider.getOutputRecordDescriptor(opDesc.getActivityId(), 0);
-        this.partition = partition;
-        this.adapterFactory = adapterFactory;
+        adapter = (FeedAdapter) adapterFactory.createAdapter(ctx, runtimeId.getPartition());
     }
 
     @Override
     protected void start() throws HyracksDataException, InterruptedException {
+        String before = Thread.currentThread().getName();
+        Thread.currentThread().setName("Intake Thread");
         try {
             writer.open();
-            Thread.currentThread().setName("Intake Thread");
-            FeedAdapter adapter = (FeedAdapter) adapterFactory.createAdapter(ctx, partition);
-            adapterRuntimeManager = new AdapterRuntimeManager(ctx, runtimeId.getEntityId(), adapter, writer, partition);
-            IFrame message = new VSizeFrame(ctx);
-            TaskUtil.put(HyracksConstants.KEY_MESSAGE, message, ctx);
+            synchronized (this) {
+                if (poisoned) {
+                    return;
+                }
+            }
             /*
              * Set null feed message. Feed pipeline carries with it a message with each frame
              * Initially, the message is set to a null message that can be changed by feed adapters.
              * One use case is adapters which consume data sources that allow restartability. Such adapters
              * can propagate progress information through the ingestion pipeline to storage nodes
              */
+            IFrame message = new VSizeFrame(ctx);
+            TaskUtil.put(HyracksConstants.KEY_MESSAGE, message, ctx);
             message.getBuffer().put(MessagingFrameTupleAppender.NULL_FEED_MESSAGE);
             message.getBuffer().flip();
-            adapterRuntimeManager.start();
-            synchronized (adapterRuntimeManager) {
-                while (!adapterRuntimeManager.isDone()) {
-                    adapterRuntimeManager.wait();
-                }
-            }
-            if (adapterRuntimeManager.isFailed()) {
-                throw new RuntimeDataException(
-                        ErrorCode.OPERATORS_FEED_INTAKE_OPERATOR_NODE_PUSHABLE_FAIL_AT_INGESTION);
-            }
+            run();
         } catch (Exception e) {
-            /*
-             * An Interrupted Exception is thrown if the Intake job cannot progress further due to failure of another
-             * node involved in the Hyracks job. As the Intake job involves only the intake operator, the exception is
-             * indicative of a failure at the sibling intake operator location. The surviving intake partitions must
-             * continue to live and receive data from the external source.
-             */
-            writer.fail();
+            LOGGER.log(Level.WARNING, "Failure during data ingestion", e);
             throw e;
         } finally {
             writer.close();
+            Thread.currentThread().setName(before);
+        }
+    }
+
+    private void run() throws HyracksDataException {
+        // Start by getting the partition number from the manager
+        LOGGER.info("Starting ingestion for partition:" + ctx.getTaskAttemptId().getTaskId().getPartition());
+        try {
+            doRun();
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw HyracksDataException.create(e);
+        } catch (Exception e) {
+            LOGGER.log(Level.WARNING, "Unhandled Exception", e);
+            throw HyracksDataException.create(e);
+        }
+    }
+
+    private void doRun() throws HyracksDataException, InterruptedException {
+        while (true) {
+            try {
+                // Start the adapter
+                adapter.start(ctx.getTaskAttemptId().getTaskId().getPartition(), writer);
+                // Adapter has completed execution
+                return;
+            } catch (InterruptedException e) {
+                throw e;
+            } catch (Exception e) {
+                LOGGER.log(Level.WARNING, "Exception during feed ingestion ", e);
+                throw HyracksDataException.create(e);
+            }
         }
     }
 
     @Override
     protected void abort() throws HyracksDataException, InterruptedException {
-        if (adapterRuntimeManager != null) {
-            adapterRuntimeManager.stop();
+        LOGGER.info(runtimeId + " aborting...");
+        synchronized (this) {
+            poisoned = true;
+            if (!adapter.stop()) {
+                LOGGER.info(runtimeId + " failed to stop adapter. interrupting the thread...");
+                taskThread.interrupt();
+            }
         }
     }
 
     @Override
     public String getStats() {
-        if (adapterRuntimeManager != null) {
-            return adapterRuntimeManager.getStats();
+        if (adapter != null) {
+            return "{\"adapter-stats\": " + adapter.getStats() + "}";
         } else {
             return "\"Runtime stats is not available.\"";
         }
diff --git a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/classad/BuiltinClassAdFunctions.java b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/classad/BuiltinClassAdFunctions.java
index e965dce..5fc9df3 100644
--- a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/classad/BuiltinClassAdFunctions.java
+++ b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/classad/BuiltinClassAdFunctions.java
@@ -1849,7 +1849,7 @@
             result.setBooleanValue(false);
 
             List<String> list0 = objectPool.stringArrayListPool.get();
-            Set<String> set1 = new HashSet<String>();
+            Set<String> set1 = new HashSet<>();
 
             split_string_list(str0, have_delimiter ? delimiter_string.charAt(0) : ',', list0);
             split_string_set(str1, have_delimiter ? delimiter_string.charAt(0) : ',', set1);
@@ -1943,7 +1943,7 @@
                     return true;
                 }
             } catch (IOException e) {
-                throw new HyracksDataException(e);
+                throw HyracksDataException.create(e);
             }
             state.decrementDepth();
             expr.setParentScope(state.getCurAd());
diff --git a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedAdapter.java b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedAdapter.java
index c362969..2273bea 100644
--- a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedAdapter.java
+++ b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedAdapter.java
@@ -39,8 +39,6 @@
 
 public class TestTypedAdapter extends FeedAdapter {
 
-    private static final long serialVersionUID = 1L;
-
     private final PipedOutputStream pos;
 
     private final PipedInputStream pis;
@@ -145,11 +143,6 @@
     }
 
     @Override
-    public boolean handleException(Throwable e) {
-        return false;
-    }
-
-    @Override
     public boolean pause() {
         return false;
     }
diff --git a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedAdapterFactory.java b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedAdapterFactory.java
index 1c28940..5262e1f 100644
--- a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedAdapterFactory.java
+++ b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedAdapterFactory.java
@@ -106,7 +106,7 @@
                             }
                             forwarder.close();
                         } catch (Exception e) {
-                            throw new HyracksDataException(e);
+                            throw HyracksDataException.create(e);
                         }
                     }
                 };
@@ -115,7 +115,7 @@
         try {
             return new TestTypedAdapter(tupleParserFactory, outputType, ctx, configuration, partition);
         } catch (IOException e) {
-            throw new HyracksDataException(e);
+            throw HyracksDataException.create(e);
         }
     }
 
diff --git a/asterixdb/asterix-om/src/main/java/org/apache/asterix/builders/AbstractListBuilder.java b/asterixdb/asterix-om/src/main/java/org/apache/asterix/builders/AbstractListBuilder.java
index f64206e..f15b1e5 100644
--- a/asterixdb/asterix-om/src/main/java/org/apache/asterix/builders/AbstractListBuilder.java
+++ b/asterixdb/asterix-om/src/main/java/org/apache/asterix/builders/AbstractListBuilder.java
@@ -95,14 +95,14 @@
                 this.outputStream.write(data, start + 1, len - 1);
             }
         } catch (IOException e) {
-            throw new HyracksDataException(e);
+            throw HyracksDataException.create(e);
         }
     }
 
     private boolean toWriteTag(byte serializedTypeTag) {
         boolean toWriteTag = itemTypeTag == ATypeTag.ANY;
-        toWriteTag = toWriteTag
-                || (itemTypeTag == ATypeTag.NULL && serializedTypeTag == ATypeTag.SERIALIZED_NULL_TYPE_TAG);
+        toWriteTag =
+                toWriteTag || (itemTypeTag == ATypeTag.NULL && serializedTypeTag == ATypeTag.SERIALIZED_NULL_TYPE_TAG);
         return toWriteTag
                 || (itemTypeTag == ATypeTag.MISSING && serializedTypeTag == ATypeTag.SERIALIZED_MISSING_TYPE_TAG);
     }
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/records/RecordAddFieldsDescriptor.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/records/RecordAddFieldsDescriptor.java
index 111557a..5df04f8 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/records/RecordAddFieldsDescriptor.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/records/RecordAddFieldsDescriptor.java
@@ -118,17 +118,17 @@
                     private final RecordBuilder recordBuilder = new RecordBuilder();
                     private final RuntimeRecordTypeInfo requiredRecordTypeInfo = new RuntimeRecordTypeInfo();
 
-                    private final IBinaryHashFunction putHashFunc = ListItemBinaryHashFunctionFactory.INSTANCE
-                            .createBinaryHashFunction();
-                    private final IBinaryHashFunction getHashFunc = ListItemBinaryHashFunctionFactory.INSTANCE
-                            .createBinaryHashFunction();
+                    private final IBinaryHashFunction putHashFunc =
+                            ListItemBinaryHashFunctionFactory.INSTANCE.createBinaryHashFunction();
+                    private final IBinaryHashFunction getHashFunc =
+                            ListItemBinaryHashFunctionFactory.INSTANCE.createBinaryHashFunction();
                     private final BinaryEntry keyEntry = new BinaryEntry();
                     private final BinaryEntry valEntry = new BinaryEntry();
                     private final IVisitablePointable tempValReference = allocator.allocateEmpty();
-                    private final IBinaryComparator cmp = ListItemBinaryComparatorFactory.INSTANCE
-                            .createBinaryComparator();
-                    private BinaryHashMap hashMap = new BinaryHashMap(TABLE_SIZE, TABLE_FRAME_SIZE, putHashFunc,
-                            getHashFunc, cmp);
+                    private final IBinaryComparator cmp =
+                            ListItemBinaryComparatorFactory.INSTANCE.createBinaryComparator();
+                    private BinaryHashMap hashMap =
+                            new BinaryHashMap(TABLE_SIZE, TABLE_FRAME_SIZE, putHashFunc, getHashFunc, cmp);
                     private ArrayBackedValueStorage resultStorage = new ArrayBackedValueStorage();
                     private DataOutput out = resultStorage.getDataOutput();
 
@@ -157,7 +157,6 @@
                         vp0.set(argPtr0);
                         vp1.set(argPtr1);
 
-
                         ARecordVisitablePointable recordPointable = (ARecordVisitablePointable) vp0;
                         AListVisitablePointable listPointable = (AListVisitablePointable) vp1;
 
@@ -207,10 +206,10 @@
                                     throw new AsterixException("Expected list of record, got "
                                             + PointableHelper.getTypeTag(inputFields.get(i)));
                                 }
-                                List<IVisitablePointable> names = ((ARecordVisitablePointable) inputFields.get(i))
-                                        .getFieldNames();
-                                List<IVisitablePointable> values = ((ARecordVisitablePointable) inputFields.get(i))
-                                        .getFieldValues();
+                                List<IVisitablePointable> names =
+                                        ((ARecordVisitablePointable) inputFields.get(i)).getFieldNames();
+                                List<IVisitablePointable> values =
+                                        ((ARecordVisitablePointable) inputFields.get(i)).getFieldValues();
 
                                 // Get name and value of the field to be added
                                 // Use loop to account for the cases where users switches the order of the fields
@@ -241,8 +240,7 @@
                                     tempValReference.set(entry.getBuf(), entry.getOffset(), entry.getLength());
                                     // If value is not equal throw conflicting duplicate field, otherwise ignore
                                     if (!PointableHelper.byteArrayEqual(valuePointable, tempValReference)) {
-                                        throw new RuntimeDataException(ErrorCode.DUPLICATE_FIELD_NAME,
-                                                getIdentifier());
+                                        throw new RuntimeDataException(ErrorCode.DUPLICATE_FIELD_NAME, getIdentifier());
                                     }
                                 } else {
                                     if (pos > -1) {
@@ -256,7 +254,7 @@
                                 }
                             }
                         } catch (AsterixException e) {
-                            throw new HyracksDataException(e);
+                            throw HyracksDataException.create(e);
                         }
                     }
                 };
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/std/FlushDatasetOperatorDescriptor.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/std/FlushDatasetOperatorDescriptor.java
index 3957c06..f2deb74 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/std/FlushDatasetOperatorDescriptor.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/std/FlushDatasetOperatorDescriptor.java
@@ -72,8 +72,8 @@
             @Override
             public void close() throws HyracksDataException {
                 try {
-                    INcApplicationContext appCtx = (INcApplicationContext) ctx.getJobletContext()
-                            .getServiceContext().getApplicationContext();
+                    INcApplicationContext appCtx =
+                            (INcApplicationContext) ctx.getJobletContext().getServiceContext().getApplicationContext();
                     IDatasetLifecycleManager datasetLifeCycleManager = appCtx.getDatasetLifecycleManager();
                     ILockManager lockManager = appCtx.getTransactionSubsystem().getLockManager();
                     ITransactionManager txnManager = appCtx.getTransactionSubsystem().getTransactionManager();
@@ -84,7 +84,7 @@
                     // flush the dataset synchronously
                     datasetLifeCycleManager.flushDataset(datasetId.getId(), false);
                 } catch (ACIDException e) {
-                    throw new HyracksDataException(e);
+                    throw HyracksDataException.create(e);
                 }
             }
 
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/transaction/GlobalResourceIdFactory.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/transaction/GlobalResourceIdFactory.java
index b835b3a..b7a4c14 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/transaction/GlobalResourceIdFactory.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/transaction/GlobalResourceIdFactory.java
@@ -65,12 +65,12 @@
                 ((INCMessageBroker) serviceCtx.getMessageBroker()).sendMessageToCC(msg);
                 reponse = resourceIdResponseQ.take();
                 if (reponse.getException() != null) {
-                    throw new HyracksDataException(reponse.getException().getMessage());
+                    throw HyracksDataException.create(reponse.getException());
                 }
             }
             return reponse.getResourceId();
         } catch (Exception e) {
-            throw new HyracksDataException(e);
+            throw HyracksDataException.create(e);
         }
     }
 }
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java
index ab7d657..b22a257 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java
@@ -395,10 +395,11 @@
         if (LOGGER.isLoggable(Level.INFO)) {
             LOGGER.info("Registering intention to remove node id " + nodeId);
         }
-        if (!activeNcConfiguration.containsKey(nodeId)) {
+        if (activeNcConfiguration.containsKey(nodeId)) {
+            pendingRemoval.add(nodeId);
+        } else {
             LOGGER.warning("Cannot register unknown node " + nodeId + " for pending removal");
         }
-        pendingRemoval.add(nodeId);
     }
 
     public synchronized boolean cancelRemovePending(String nodeId) {
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/LockThenSearchOperationCallback.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/LockThenSearchOperationCallback.java
index 367616e..f76cb89 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/LockThenSearchOperationCallback.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/LockThenSearchOperationCallback.java
@@ -46,8 +46,8 @@
     private int pkHash;
 
     public LockThenSearchOperationCallback(DatasetId datasetId, int[] entityIdFields,
-            ITransactionSubsystem txnSubsystem,
-            ITransactionContext txnCtx, IOperatorNodePushable operatorNodePushable) {
+            ITransactionSubsystem txnSubsystem, ITransactionContext txnCtx,
+            IOperatorNodePushable operatorNodePushable) {
         super(datasetId, entityIdFields, txnCtx, txnSubsystem.getLockManager());
         this.operatorNodePushable = (LSMIndexInsertUpdateDeleteOperatorNodePushable) operatorNodePushable;
         this.logManager = txnSubsystem.getLogManager();
@@ -118,7 +118,7 @@
                 lockManager.lock(datasetId, pkHash, LockMode.X, txnCtx);
             }
         } catch (ACIDException e) {
-            throw new HyracksDataException(e);
+            throw HyracksDataException.create(e);
         }
     }
 
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexInstantSearchOperationCallback.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexInstantSearchOperationCallback.java
index e8de90d..b13a08e 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexInstantSearchOperationCallback.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexInstantSearchOperationCallback.java
@@ -56,7 +56,7 @@
         try {
             lockManager.lock(datasetId, pkHash, LockMode.S, txnCtx);
         } catch (ACIDException e) {
-            throw new HyracksDataException(e);
+            throw HyracksDataException.create(e);
         }
     }
 
@@ -70,7 +70,7 @@
         try {
             lockManager.unlock(datasetId, pkHash, LockMode.S, txnCtx);
         } catch (ACIDException e) {
-            throw new HyracksDataException(e);
+            throw HyracksDataException.create(e);
         }
     }
 
@@ -80,7 +80,7 @@
         try {
             lockManager.unlock(datasetId, pkHash, LockMode.S, txnCtx);
         } catch (ACIDException e) {
-            throw new HyracksDataException(e);
+            throw HyracksDataException.create(e);
         }
     }
 
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallback.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallback.java
index 9e96fbb..a6cb61c 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallback.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallback.java
@@ -89,7 +89,7 @@
                 lockManager.lock(datasetId, pkHash, LockMode.X, txnCtx);
             }
         } catch (ACIDException e) {
-            throw new HyracksDataException(e);
+            throw HyracksDataException.create(e);
         }
     }
 
@@ -99,7 +99,7 @@
             int pkHash = computePrimaryKeyHashValue(after, primaryKeyFields);
             log(pkHash, after, before);
         } catch (ACIDException e) {
-            throw new HyracksDataException(e);
+            throw HyracksDataException.create(e);
         }
     }
 
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallbackFactory.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallbackFactory.java
index 5527f47..932c925 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallbackFactory.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallbackFactory.java
@@ -74,7 +74,7 @@
             txnCtx.registerIndexAndCallback(resource.getId(), index, (AbstractOperationCallback) modCallback, true);
             return modCallback;
         } catch (ACIDException e) {
-            throw new HyracksDataException(e);
+            throw HyracksDataException.create(e);
         }
     }
 }
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexModificationOperationCallbackFactory.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexModificationOperationCallbackFactory.java
index 8c5b099..b339d27 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexModificationOperationCallbackFactory.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexModificationOperationCallbackFactory.java
@@ -70,7 +70,7 @@
             txnCtx.registerIndexAndCallback(resource.getId(), index, (AbstractOperationCallback) modCallback, false);
             return modCallback;
         } catch (ACIDException e) {
-            throw new HyracksDataException(e);
+            throw HyracksDataException.create(e);
         }
     }
 }
diff --git a/asterixdb/asterix-transactions/src/test/java/org/apache/asterix/transaction/management/service/locking/TestRuntimeContextProvider.java b/asterixdb/asterix-transactions/src/test/java/org/apache/asterix/transaction/management/service/locking/TestRuntimeContextProvider.java
index a27f987..f897aca 100644
--- a/asterixdb/asterix-transactions/src/test/java/org/apache/asterix/transaction/management/service/locking/TestRuntimeContextProvider.java
+++ b/asterixdb/asterix-transactions/src/test/java/org/apache/asterix/transaction/management/service/locking/TestRuntimeContextProvider.java
@@ -20,11 +20,11 @@
 
 import static org.mockito.Mockito.mock;
 
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 
 import org.apache.asterix.common.api.IDatasetLifecycleManager;
 import org.apache.asterix.common.api.INcApplicationContext;
-import org.apache.asterix.common.api.ThreadExecutor;
 import org.apache.asterix.common.transactions.IAppRuntimeContextProvider;
 import org.apache.asterix.common.transactions.ITransactionSubsystem;
 import org.apache.hyracks.api.io.IIOManager;
@@ -35,11 +35,11 @@
 
 class TestRuntimeContextProvider implements IAppRuntimeContextProvider {
 
-    ThreadExecutor ate = new ThreadExecutor(Executors.defaultThreadFactory());
+    ExecutorService ate = Executors.newCachedThreadPool(Executors.defaultThreadFactory());
     IDatasetLifecycleManager dlcm = mock(IDatasetLifecycleManager.class);
 
     @Override
-    public ThreadExecutor getThreadExecutor() {
+    public ExecutorService getThreadExecutor() {
         return ate;
     }
 
diff --git a/hyracks-fullstack/algebricks/algebricks-common/src/main/java/org/apache/hyracks/algebricks/common/constraints/AlgebricksAbsolutePartitionConstraint.java b/hyracks-fullstack/algebricks/algebricks-common/src/main/java/org/apache/hyracks/algebricks/common/constraints/AlgebricksAbsolutePartitionConstraint.java
index 8394057..067579e 100644
--- a/hyracks-fullstack/algebricks/algebricks-common/src/main/java/org/apache/hyracks/algebricks/common/constraints/AlgebricksAbsolutePartitionConstraint.java
+++ b/hyracks-fullstack/algebricks/algebricks-common/src/main/java/org/apache/hyracks/algebricks/common/constraints/AlgebricksAbsolutePartitionConstraint.java
@@ -18,6 +18,8 @@
  */
 package org.apache.hyracks.algebricks.common.constraints;
 
+import java.util.Arrays;
+
 public class AlgebricksAbsolutePartitionConstraint extends AlgebricksPartitionConstraint {
     private final String[] locations;
 
@@ -33,4 +35,10 @@
     public String[] getLocations() {
         return locations;
     }
+
+    @Override
+    public String toString() {
+        return Arrays.toString(locations);
+    }
+
 }
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/SplitOperatorDescriptor.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/SplitOperatorDescriptor.java
index 68274ce..2314f88 100644
--- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/SplitOperatorDescriptor.java
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/SplitOperatorDescriptor.java
@@ -71,8 +71,8 @@
 
     @Override
     public void contributeActivities(IActivityGraphBuilder builder) {
-        SplitterMaterializerActivityNode sma = new SplitterMaterializerActivityNode(
-                new ActivityId(odId, SPLITTER_MATERIALIZER_ACTIVITY_ID));
+        SplitterMaterializerActivityNode sma =
+                new SplitterMaterializerActivityNode(new ActivityId(odId, SPLITTER_MATERIALIZER_ACTIVITY_ID));
         builder.addActivity(this, sma);
         builder.addSourceEdge(0, sma, 0);
         for (int i = 0; i < outputArity; i++) {
@@ -168,7 +168,7 @@
                                 writers[i].close();
                             } catch (Throwable th) {
                                 if (hde == null) {
-                                    hde = new HyracksDataException(th);
+                                    hde = HyracksDataException.create(th);
                                 } else {
                                     hde.addSuppressed(th);
                                 }
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/StringStreamingRuntimeFactory.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/StringStreamingRuntimeFactory.java
index 024f6f5..82f403e 100644
--- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/StringStreamingRuntimeFactory.java
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/StringStreamingRuntimeFactory.java
@@ -137,8 +137,8 @@
                     ForwardScriptOutput fso = new ForwardScriptOutput(parser, process.getInputStream());
                     outputPipe = new Thread(fso);
                     outputPipe.start();
-                    DumpInStreamToPrintStream disps = new DumpInStreamToPrintStream(process.getErrorStream(),
-                            System.err);
+                    DumpInStreamToPrintStream disps =
+                            new DumpInStreamToPrintStream(process.getErrorStream(), System.err);
                     dumpStderr = new Thread(disps);
                     dumpStderr.start();
                 } catch (IOException e) {
@@ -174,7 +174,8 @@
                     outputPipe.join();
                     dumpStderr.join();
                 } catch (InterruptedException e) {
-                    throw new HyracksDataException(e);
+                    Thread.currentThread().interrupt();
+                    throw HyracksDataException.create(e);
                 }
                 if (ret != 0) {
                     throw new HyracksDataException("Process exit value: " + ret);
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/IoUtil.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/IoUtil.java
index f4c5114..0e70759 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/IoUtil.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/IoUtil.java
@@ -59,7 +59,7 @@
     public static void delete(File file) throws HyracksDataException {
         try {
             if (file.isDirectory()) {
-                FileUtils.deleteDirectory(file);
+                deleteDirectory(file);
             } else {
                 Files.delete(file.toPath());
             }
@@ -89,4 +89,39 @@
             throw HyracksDataException.create(ErrorCode.CANNOT_CREATE_FILE, e, fileRef.getAbsolutePath());
         }
     }
+
+    public static void deleteDirectory(File directory) throws IOException {
+        if (!directory.exists()) {
+            return;
+        }
+        if (!FileUtils.isSymlink(directory)) {
+            cleanDirectory(directory);
+        }
+        Files.delete(directory.toPath());
+    }
+
+    public static void cleanDirectory(final File directory) throws IOException {
+        final File[] files = verifiedListFiles(directory);
+        for (final File file : files) {
+            delete(file);
+        }
+    }
+
+    private static File[] verifiedListFiles(File directory) throws IOException {
+        if (!directory.exists()) {
+            final String message = directory + " does not exist";
+            throw new IllegalArgumentException(message);
+        }
+
+        if (!directory.isDirectory()) {
+            final String message = directory + " is not a directory";
+            throw new IllegalArgumentException(message);
+        }
+
+        final File[] files = directory.listFiles();
+        if (files == null) { // null if security restricted
+            throw new IOException("Failed to list contents of " + directory);
+        }
+        return files;
+    }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/test/java/org/apache/hyracks/api/test/FrameWriterTestUtils.java b/hyracks-fullstack/hyracks/hyracks-api/src/test/java/org/apache/hyracks/api/test/FrameWriterTestUtils.java
index c9cc71e..2c1ce37 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/test/java/org/apache/hyracks/api/test/FrameWriterTestUtils.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/test/java/org/apache/hyracks/api/test/FrameWriterTestUtils.java
@@ -19,6 +19,7 @@
 package org.apache.hyracks.api.test;
 
 import java.util.Collection;
+import java.util.Collections;
 
 public class FrameWriterTestUtils {
     public static final String EXCEPTION_MESSAGE = "IFrameWriter Exception in the call to the method ";
@@ -32,6 +33,10 @@
         Close
     }
 
+    public static TestFrameWriter create() {
+        return create(Collections.emptyList(), Collections.emptyList(), false);
+    }
+
     public static TestFrameWriter create(Collection<FrameWriterOperation> exceptionThrowingOperations,
             Collection<FrameWriterOperation> errorThrowingOperations, boolean deepCopyInputFrames) {
         CountAnswer openAnswer =
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/dataset/DatasetDirectoryService.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/dataset/DatasetDirectoryService.java
index 2685f60..7a9306c 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/dataset/DatasetDirectoryService.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/dataset/DatasetDirectoryService.java
@@ -151,12 +151,18 @@
 
     @Override
     public synchronized void reportJobFailure(JobId jobId, List<Exception> exceptions) {
+        LOGGER.log(Level.INFO, "job " + jobId + " failed and is being reported to " + getClass().getSimpleName(),
+                exceptions.get(0));
         DatasetJobRecord djr = getDatasetJobRecord(jobId);
+        LOGGER.log(Level.INFO, "Dataset job record is " + djr);
         if (djr != null) {
+            LOGGER.log(Level.INFO, "Setting exceptions in Dataset job record");
             djr.fail(exceptions);
         }
         final JobResultInfo jobResultInfo = jobResultLocations.get(jobId);
+        LOGGER.log(Level.INFO, "Job result info is " + jobResultInfo);
         if (jobResultInfo != null) {
+            LOGGER.log(Level.INFO, "Setting exceptions in Job result info");
             jobResultInfo.setException(exceptions.isEmpty() ? null : exceptions.get(0));
         }
         notifyAll();
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/executor/JobExecutor.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/executor/JobExecutor.java
index f18a917..dbbaf9f 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/executor/JobExecutor.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/executor/JobExecutor.java
@@ -66,7 +66,6 @@
 import org.apache.hyracks.control.common.job.PartitionState;
 import org.apache.hyracks.control.common.job.TaskAttemptDescriptor;
 
-
 public class JobExecutor {
     private static final Logger LOGGER = Logger.getLogger(JobExecutor.class.getName());
 
@@ -190,11 +189,11 @@
 
     private void startRunnableActivityClusters() throws HyracksException {
         Set<TaskCluster> taskClusterRoots = new HashSet<>();
-        findRunnableTaskClusterRoots(taskClusterRoots, jobRun.getActivityClusterGraph().getActivityClusterMap()
-                .values());
-        if (LOGGER.isLoggable(Level.FINE)) {
-            LOGGER.fine("Runnable TC roots: " + taskClusterRoots + ", inProgressTaskClusters: "
-                    + inProgressTaskClusters);
+        findRunnableTaskClusterRoots(taskClusterRoots,
+                jobRun.getActivityClusterGraph().getActivityClusterMap().values());
+        if (LOGGER.isLoggable(Level.INFO)) {
+            LOGGER.log(Level.INFO,
+                    "Runnable TC roots: " + taskClusterRoots + ", inProgressTaskClusters: " + inProgressTaskClusters);
         }
         if (taskClusterRoots.isEmpty() && inProgressTaskClusters.isEmpty()) {
             ccs.getWorkQueue()
@@ -344,8 +343,8 @@
         for (int i = 0; i < tasks.length; ++i) {
             Task ts = tasks[i];
             TaskId tid = ts.getTaskId();
-            TaskAttempt taskAttempt = new TaskAttempt(tcAttempt, new TaskAttemptId(new TaskId(tid.getActivityId(),
-                    tid.getPartition()), attempts), ts);
+            TaskAttempt taskAttempt = new TaskAttempt(tcAttempt,
+                    new TaskAttemptId(new TaskId(tid.getActivityId(), tid.getPartition()), attempts), ts);
             taskAttempt.setStatus(TaskAttempt.TaskStatus.INITIALIZED, null);
             locationMap.put(tid,
                     new PartitionLocationExpression(tid.getActivityId().getOperatorDescriptorId(), tid.getPartition()));
@@ -496,8 +495,8 @@
         final DeploymentId deploymentId = jobRun.getDeploymentId();
         final JobId jobId = jobRun.getJobId();
         final ActivityClusterGraph acg = jobRun.getActivityClusterGraph();
-        final Map<ConnectorDescriptorId, IConnectorPolicy> connectorPolicies = new HashMap<>(
-                jobRun.getConnectorPolicyMap());
+        final Map<ConnectorDescriptorId, IConnectorPolicy> connectorPolicies =
+                new HashMap<>(jobRun.getConnectorPolicyMap());
         INodeManager nodeManager = ccs.getNodeManager();
         try {
             byte[] acgBytes = predistributed ? null : JavaSerializationUtils.serialize(acg);
@@ -555,14 +554,14 @@
             }
         }
         final JobId jobId = jobRun.getJobId();
-        LOGGER.fine("Abort map for job: " + jobId + ": " + abortTaskAttemptMap);
+        LOGGER.info("Abort map for job: " + jobId + ": " + abortTaskAttemptMap);
         INodeManager nodeManager = ccs.getNodeManager();
         for (Map.Entry<String, List<TaskAttemptId>> entry : abortTaskAttemptMap.entrySet()) {
             final NodeControllerState node = nodeManager.getNodeControllerState(entry.getKey());
             final List<TaskAttemptId> abortTaskAttempts = entry.getValue();
             if (node != null) {
-                if (LOGGER.isLoggable(Level.FINE)) {
-                    LOGGER.fine("Aborting: " + abortTaskAttempts + " at " + entry.getKey());
+                if (LOGGER.isLoggable(Level.INFO)) {
+                    LOGGER.info("Aborting: " + abortTaskAttempts + " at " + entry.getKey());
                 }
                 try {
                     node.getNodeController().abortTasks(jobId, abortTaskAttempts);
@@ -582,6 +581,7 @@
     }
 
     private void abortDoomedTaskClusters() throws HyracksException {
+        LOGGER.log(Level.INFO, "aborting doomed task clusters");
         Set<TaskCluster> doomedTaskClusters = new HashSet<>();
         for (TaskCluster tc : inProgressTaskClusters) {
             // Start search at TCs that produce no outputs (sinks)
@@ -590,6 +590,7 @@
             }
         }
 
+        LOGGER.log(Level.INFO, "number of doomed task clusters found = " + doomedTaskClusters.size());
         for (TaskCluster tc : doomedTaskClusters) {
             TaskClusterAttempt tca = findLastTaskClusterAttempt(tc);
             if (tca != null) {
@@ -628,7 +629,7 @@
             if ((maxState == null
                     || (cPolicy.consumerWaitsForProducerToFinish() && maxState != PartitionState.COMMITTED))
                     && findDoomedTaskClusters(partitionProducingTaskClusterMap.get(pid), doomedTaskClusters)) {
-                    doomed = true;
+                doomed = true;
             }
         }
         if (doomed) {
@@ -663,28 +664,36 @@
 
     /**
      * Indicates that a single task attempt has encountered a failure.
-     * @param ta Failed Task Attempt
-     * @param exceptions exeptions thrown during the failure
+     *
+     * @param ta
+     *            Failed Task Attempt
+     * @param exceptions
+     *            exeptions thrown during the failure
      */
     public void notifyTaskFailure(TaskAttempt ta, List<Exception> exceptions) {
         try {
-            LOGGER.fine("Received failure notification for TaskAttempt " + ta.getTaskAttemptId());
+            LOGGER.log(Level.INFO, "Received failure notification for TaskAttempt " + ta.getTaskAttemptId());
             TaskAttemptId taId = ta.getTaskAttemptId();
             TaskCluster tc = ta.getTask().getTaskCluster();
             TaskClusterAttempt lastAttempt = findLastTaskClusterAttempt(tc);
             if (lastAttempt != null && taId.getAttempt() == lastAttempt.getAttempt()) {
-                LOGGER.fine("Marking TaskAttempt " + ta.getTaskAttemptId() + " as failed");
+                LOGGER.log(Level.INFO, "Marking TaskAttempt " + ta.getTaskAttemptId() + " as failed");
                 ta.setStatus(TaskAttempt.TaskStatus.FAILED, exceptions);
                 abortTaskCluster(lastAttempt, TaskClusterAttempt.TaskClusterStatus.FAILED);
                 abortDoomedTaskClusters();
-                if (lastAttempt.getAttempt() >= jobRun.getActivityClusterGraph().getMaxReattempts() || isCancelled()) {
+                int maxReattempts = jobRun.getActivityClusterGraph().getMaxReattempts();
+                LOGGER.log(Level.INFO, "Marking TaskAttempt " + ta.getTaskAttemptId()
+                        + " as failed and the number of max re-attempts = " + maxReattempts);
+                if (lastAttempt.getAttempt() >= maxReattempts || isCancelled()) {
+                    LOGGER.log(Level.INFO, "Aborting the job of " + ta.getTaskAttemptId());
                     abortJob(exceptions);
                     return;
                 }
+                LOGGER.log(Level.INFO, "We will try to start runnable activity clusters of " + ta.getTaskAttemptId());
                 startRunnableActivityClusters();
             } else {
-                LOGGER.warning("Ignoring task failure notification: " + taId + " -- Current last attempt = "
-                        + lastAttempt);
+                LOGGER.warning(
+                        "Ignoring task failure notification: " + taId + " -- Current last attempt = " + lastAttempt);
             }
         } catch (Exception e) {
             abortJob(Collections.singletonList(e));
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/TaskFailureWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/TaskFailureWork.java
index 486e9c6..8f50087 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/TaskFailureWork.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/TaskFailureWork.java
@@ -19,6 +19,8 @@
 package org.apache.hyracks.control.cc.work;
 
 import java.util.List;
+import java.util.logging.Level;
+import java.util.logging.Logger;
 
 import org.apache.hyracks.api.dataflow.TaskAttemptId;
 import org.apache.hyracks.api.job.JobId;
@@ -28,6 +30,7 @@
 import org.apache.hyracks.control.cc.job.TaskAttempt;
 
 public class TaskFailureWork extends AbstractTaskLifecycleWork {
+    private static final Logger LOGGER = Logger.getLogger(TaskFailureWork.class.getName());
     private final List<Exception> exceptions;
 
     public TaskFailureWork(ClusterControllerService ccs, JobId jobId, TaskAttemptId taId, String nodeId,
@@ -38,6 +41,7 @@
 
     @Override
     protected void performEvent(TaskAttempt ta) {
+        LOGGER.log(Level.WARNING, "Executing task failure work for " + this, exceptions.get(0));
         IJobManager jobManager = ccs.getJobManager();
         JobRun run = jobManager.get(jobId);
         ccs.getDatasetDirectoryService().reportJobFailure(jobId, exceptions);
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetPartitionWriter.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetPartitionWriter.java
index 0189735..b654d44 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetPartitionWriter.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetPartitionWriter.java
@@ -120,7 +120,7 @@
                 manager.reportPartitionWriteCompletion(jobId, resultSetId, partition);
             }
         } catch (HyracksException e) {
-            throw new HyracksDataException(e);
+            throw HyracksDataException.create(e);
         }
     }
 
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/IOManager.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/IOManager.java
index 92831f4..10c7415 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/IOManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/IOManager.java
@@ -229,7 +229,7 @@
         try {
             ((FileHandle) fHandle).close();
         } catch (IOException e) {
-            throw new HyracksDataException(e);
+            throw HyracksDataException.create(e);
         }
     }
 
@@ -242,7 +242,7 @@
         try {
             waf = File.createTempFile(prefix, WORKSPACE_FILE_SUFFIX, new File(dev.getMount(), waPath));
         } catch (IOException e) {
-            throw new HyracksDataException(e);
+            throw HyracksDataException.create(e);
         }
         return dev.createFileRef(waPath + File.separator + waf.getName());
     }
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/ApplicationMessageWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/ApplicationMessageWork.java
index 7f5302a..4f5b556 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/ApplicationMessageWork.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/ApplicationMessageWork.java
@@ -28,9 +28,6 @@
 import org.apache.hyracks.control.nc.NodeControllerService;
 import org.apache.hyracks.control.nc.application.NCServiceContext;
 
-/**
- * @author rico
- */
 public class ApplicationMessageWork extends AbstractWork {
     private static final Logger LOGGER = Logger.getLogger(ApplicationMessageWork.class.getName());
     private byte[] message;
@@ -63,6 +60,6 @@
 
     @Override
     public String toString() {
-        return getName() + ": nodeID: " + nodeId;
+        return getName() + ": nodeId: " + nodeId;
     }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/NotifyTaskCompleteWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/NotifyTaskCompleteWork.java
index f4ee6b0..7728d16 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/NotifyTaskCompleteWork.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/NotifyTaskCompleteWork.java
@@ -18,12 +18,16 @@
  */
 package org.apache.hyracks.control.nc.work;
 
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
 import org.apache.hyracks.control.common.job.profiling.om.TaskProfile;
 import org.apache.hyracks.control.common.work.AbstractWork;
 import org.apache.hyracks.control.nc.NodeControllerService;
 import org.apache.hyracks.control.nc.Task;
 
 public class NotifyTaskCompleteWork extends AbstractWork {
+    private static final Logger LOGGER = Logger.getLogger(NotifyTaskCompleteWork.class.getName());
     private final NodeControllerService ncs;
     private final Task task;
 
@@ -40,8 +44,13 @@
             ncs.getClusterController().notifyTaskComplete(task.getJobletContext().getJobId(), task.getTaskAttemptId(),
                     ncs.getId(), taskProfile);
         } catch (Exception e) {
-            e.printStackTrace();
+            LOGGER.log(Level.SEVERE, "Failed notifying task complete for " + task.getTaskAttemptId(), e);
         }
         task.getJoblet().removeTask(task);
     }
+
+    @Override
+    public String toString() {
+        return getClass().getSimpleName() + ":" + task.getTaskAttemptId();
+    }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/NotifyTaskFailureWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/NotifyTaskFailureWork.java
index fa8ba28..7ed2c09 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/NotifyTaskFailureWork.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/NotifyTaskFailureWork.java
@@ -35,7 +35,6 @@
     private final Task task;
     private final JobId jobId;
     private final TaskAttemptId taskId;
-
     private final List<Exception> exceptions;
 
     public NotifyTaskFailureWork(NodeControllerService ncs, Task task, List<Exception> exceptions, JobId jobId,
@@ -49,6 +48,8 @@
 
     @Override
     public void run() {
+        LOGGER.log(Level.WARNING, ncs.getId() + " is sending a notification to cc that task " + taskId + " has failed",
+                exceptions.get(0));
         try {
             IDatasetPartitionManager dpm = ncs.getDatasetPartitionManager();
             if (dpm != null) {
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/ArrayTupleBuilder.java b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/ArrayTupleBuilder.java
index e5f7d09..0c639db 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/ArrayTupleBuilder.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/ArrayTupleBuilder.java
@@ -104,7 +104,7 @@
                 fieldData.getDataOutput().writeInt(FrameConstants.FRAME_FIELD_MAGIC);
             }
         } catch (IOException e) {
-            throw new HyracksDataException(e);
+            throw HyracksDataException.create(e);
         }
         fEndOffsets[nextField++] = fieldData.getLength();
     }
@@ -139,7 +139,7 @@
         try {
             fieldData.getDataOutput().write(bytes, start, length);
         } catch (IOException e) {
-            throw new HyracksDataException(e);
+            throw HyracksDataException.create(e);
         }
         fEndOffsets[nextField++] = fieldData.getLength();
     }
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/base/AbstractReplicateOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/base/AbstractReplicateOperatorDescriptor.java
index 8ee4fa3..16c0a29 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/base/AbstractReplicateOperatorDescriptor.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/base/AbstractReplicateOperatorDescriptor.java
@@ -84,16 +84,16 @@
 
     @Override
     public void contributeActivities(IActivityGraphBuilder builder) {
-        ReplicatorMaterializerActivityNode sma = new ReplicatorMaterializerActivityNode(
-                new ActivityId(odId, SPLITTER_MATERIALIZER_ACTIVITY_ID));
+        ReplicatorMaterializerActivityNode sma =
+                new ReplicatorMaterializerActivityNode(new ActivityId(odId, SPLITTER_MATERIALIZER_ACTIVITY_ID));
         builder.addActivity(this, sma);
         builder.addSourceEdge(0, sma, 0);
         int pipelineOutputIndex = 0;
         int activityId = MATERIALIZE_READER_ACTIVITY_ID;
         for (int i = 0; i < outputArity; i++) {
             if (outputMaterializationFlags[i]) {
-                MaterializeReaderActivityNode mra = new MaterializeReaderActivityNode(
-                        new ActivityId(odId, activityId++));
+                MaterializeReaderActivityNode mra =
+                        new MaterializeReaderActivityNode(new ActivityId(odId, activityId++));
                 builder.addActivity(this, mra);
                 builder.addBlockingEdge(sma, mra);
                 builder.addTargetEdge(i, mra, 0);
@@ -165,7 +165,7 @@
                                     writers[i].close();
                                 } catch (Throwable th) {
                                     if (hde == null) {
-                                        hde = new HyracksDataException(th);
+                                        hde = HyracksDataException.create(th);
                                     } else {
                                         hde.addSuppressed(th);
                                     }
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/collectors/InputChannelFrameReader.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/collectors/InputChannelFrameReader.java
index 7ff280b..6748a4d 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/collectors/InputChannelFrameReader.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/collectors/InputChannelFrameReader.java
@@ -53,7 +53,8 @@
             try {
                 wait();
             } catch (InterruptedException e) {
-                throw new HyracksDataException(e);
+                Thread.currentThread().interrupt();
+                throw HyracksDataException.create(e);
             }
         }
         if (failed) {
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/LocalityAwarePartitionDataWriter.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/LocalityAwarePartitionDataWriter.java
index 253b3e3..63bb72b 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/LocalityAwarePartitionDataWriter.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/LocalityAwarePartitionDataWriter.java
@@ -130,7 +130,7 @@
                     appenders[i].write(pWriters[i], true);
                 } catch (Throwable th) {
                     if (closeException == null) {
-                        closeException = new HyracksDataException(th);
+                        closeException = HyracksDataException.create(th);
                     } else {
                         closeException.addSuppressed(th);
                     }
@@ -139,7 +139,7 @@
                         pWriters[i].close();
                     } catch (Throwable th) {
                         if (closeException == null) {
-                            closeException = new HyracksDataException(th);
+                            closeException = HyracksDataException.create(th);
                         } else {
                             closeException.addSuppressed(th);
                         }
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/MToNBroadcastConnectorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/MToNBroadcastConnectorDescriptor.java
index d748c8e..4246626 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/MToNBroadcastConnectorDescriptor.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/MToNBroadcastConnectorDescriptor.java
@@ -95,7 +95,7 @@
                             epWriters[i].close();
                         } catch (Throwable th) {
                             if (closeException == null) {
-                                closeException = new HyracksDataException(th);
+                                closeException = HyracksDataException.create(th);
                             } else {
                                 closeException.addSuppressed(th);
                             }
@@ -129,8 +129,8 @@
             int nProducerPartitions, int nConsumerPartitions) throws HyracksDataException {
         BitSet expectedPartitions = new BitSet(nProducerPartitions);
         expectedPartitions.set(0, nProducerPartitions);
-        NonDeterministicChannelReader channelReader = new NonDeterministicChannelReader(nProducerPartitions,
-                expectedPartitions);
+        NonDeterministicChannelReader channelReader =
+                new NonDeterministicChannelReader(nProducerPartitions, expectedPartitions);
         NonDeterministicFrameReader frameReader = new NonDeterministicFrameReader(channelReader);
         return new PartitionCollector(ctx, getConnectorId(), index, expectedPartitions, frameReader, channelReader);
     }
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/file/FrameFileWriterOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/file/FrameFileWriterOperatorDescriptor.java
index a34a322..b94f305 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/file/FrameFileWriterOperatorDescriptor.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/file/FrameFileWriterOperatorDescriptor.java
@@ -66,7 +66,7 @@
                 try {
                     out.write(buffer.array());
                 } catch (IOException e) {
-                    throw new HyracksDataException(e);
+                    throw HyracksDataException.create(e);
                 }
             }
 
@@ -79,7 +79,7 @@
                 try {
                     out.close();
                 } catch (IOException e) {
-                    throw new HyracksDataException(e);
+                    throw HyracksDataException.create(e);
                 }
             }
         };
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/file/PlainFileWriterOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/file/PlainFileWriterOperatorDescriptor.java
index 3a3f414..76ebaa4 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/file/PlainFileWriterOperatorDescriptor.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/file/PlainFileWriterOperatorDescriptor.java
@@ -76,8 +76,8 @@
         final FileSplit[] splits = fileSplitProvider.getFileSplits();
         IIOManager ioManager = ctx.getIoManager();
         // Frame accessor
-        final FrameTupleAccessor frameTupleAccessor = new FrameTupleAccessor(
-                recordDescProvider.getInputRecordDescriptor(getActivityId(), 0));
+        final FrameTupleAccessor frameTupleAccessor =
+                new FrameTupleAccessor(recordDescProvider.getInputRecordDescriptor(getActivityId(), 0));
         // Record descriptor
         final RecordDescriptor recordDescriptor = recordDescProvider.getInputRecordDescriptor(getActivityId(), 0);
         return new AbstractUnaryInputSinkOperatorNodePushable() {
@@ -131,7 +131,7 @@
                 try {
                     out.close();
                 } catch (IOException e) {
-                    throw new HyracksDataException(e);
+                    throw HyracksDataException.create(e);
                 }
             }
         };
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/map/ReflectionBasedDeserializedMapperFactory.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/map/ReflectionBasedDeserializedMapperFactory.java
index e7e084c..fa812cf 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/map/ReflectionBasedDeserializedMapperFactory.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/map/ReflectionBasedDeserializedMapperFactory.java
@@ -33,10 +33,8 @@
     public IDeserializedMapper createMapper() throws HyracksDataException {
         try {
             return mapperClass.newInstance();
-        } catch (InstantiationException e) {
-            throw new HyracksDataException(e);
-        } catch (IllegalAccessException e) {
-            throw new HyracksDataException(e);
+        } catch (Exception e) {
+            throw HyracksDataException.create(e);
         }
     }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-1.x/src/main/java/org/apache/hyracks/hdfs/ContextFactory.java b/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-1.x/src/main/java/org/apache/hyracks/hdfs/ContextFactory.java
index 2dbcf83..ecee076 100644
--- a/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-1.x/src/main/java/org/apache/hyracks/hdfs/ContextFactory.java
+++ b/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-1.x/src/main/java/org/apache/hyracks/hdfs/ContextFactory.java
@@ -24,7 +24,6 @@
 import org.apache.hadoop.mapreduce.Mapper;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.hadoop.mapreduce.TaskAttemptID;
-
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 
 /**
@@ -37,7 +36,7 @@
         try {
             return new Mapper().new Context(conf, tid, null, null, null, null, null);
         } catch (Exception e) {
-            throw new HyracksDataException(e);
+            throw HyracksDataException.create(e);
         }
     }
 
@@ -46,7 +45,7 @@
             TaskAttemptID tid = new TaskAttemptID("", 0, true, partition, 0);
             return new TaskAttemptContext(conf, tid);
         } catch (Exception e) {
-            throw new HyracksDataException(e);
+            throw HyracksDataException.create(e);
         }
     }
 
diff --git a/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-2.x/src/main/java/org/apache/hyracks/hdfs/ContextFactory.java b/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-2.x/src/main/java/org/apache/hyracks/hdfs/ContextFactory.java
index dc6ca4e..96b0a23 100644
--- a/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-2.x/src/main/java/org/apache/hyracks/hdfs/ContextFactory.java
+++ b/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-2.x/src/main/java/org/apache/hyracks/hdfs/ContextFactory.java
@@ -26,7 +26,6 @@
 import org.apache.hadoop.mapreduce.TaskType;
 import org.apache.hadoop.mapreduce.task.JobContextImpl;
 import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
-
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 
 /**
@@ -38,7 +37,7 @@
         try {
             return new TaskAttemptContextImpl(conf, tid);
         } catch (Exception e) {
-            throw new HyracksDataException(e);
+            throw HyracksDataException.create(e);
         }
     }
 
@@ -47,7 +46,7 @@
             TaskAttemptID tid = new TaskAttemptID("", 0, TaskType.REDUCE, partition, 0);
             return new TaskAttemptContextImpl(conf, tid);
         } catch (Exception e) {
-            throw new HyracksDataException(e);
+            throw HyracksDataException.create(e);
         }
     }
 
diff --git a/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/org/apache/hyracks/hdfs/dataflow/ConfFactory.java b/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/org/apache/hyracks/hdfs/dataflow/ConfFactory.java
index 3840167..33b84dc 100644
--- a/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/org/apache/hyracks/hdfs/dataflow/ConfFactory.java
+++ b/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/org/apache/hyracks/hdfs/dataflow/ConfFactory.java
@@ -39,7 +39,7 @@
             confBytes = bos.toByteArray();
             dos.close();
         } catch (Exception e) {
-            throw new HyracksDataException(e);
+            throw HyracksDataException.create(e);
         }
     }
 
@@ -51,7 +51,7 @@
             dis.close();
             return conf;
         } catch (Exception e) {
-            throw new HyracksDataException(e);
+            throw HyracksDataException.create(e);
         }
     }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/org/apache/hyracks/hdfs/dataflow/HDFSWriteOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/org/apache/hyracks/hdfs/dataflow/HDFSWriteOperatorDescriptor.java
index e7a3111..dcf4508 100644
--- a/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/org/apache/hyracks/hdfs/dataflow/HDFSWriteOperatorDescriptor.java
+++ b/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/org/apache/hyracks/hdfs/dataflow/HDFSWriteOperatorDescriptor.java
@@ -27,7 +27,6 @@
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapred.FileOutputFormat;
 import org.apache.hadoop.mapred.JobConf;
-
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.dataflow.IOperatorNodePushable;
 import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
@@ -124,7 +123,7 @@
                     tupleWriter.close(dos);
                     dos.close();
                 } catch (Exception e) {
-                    throw new HyracksDataException(e);
+                    throw HyracksDataException.create(e);
                 } finally {
                     Thread.currentThread().setContextClassLoader(ctxCL);
                 }
diff --git a/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/org/apache/hyracks/hdfs/dataflow/InputSplitsFactory.java b/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/org/apache/hyracks/hdfs/dataflow/InputSplitsFactory.java
index 927eb54..5cc09b7 100644
--- a/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/org/apache/hyracks/hdfs/dataflow/InputSplitsFactory.java
+++ b/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/org/apache/hyracks/hdfs/dataflow/InputSplitsFactory.java
@@ -103,7 +103,7 @@
             dis.close();
             return splits;
         } catch (Exception e) {
-            throw new HyracksDataException(e);
+            throw HyracksDataException.create(e);
         }
     }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/org/apache/hyracks/hdfs2/dataflow/ConfFactory.java b/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/org/apache/hyracks/hdfs2/dataflow/ConfFactory.java
index 0e49fef..b02a97b 100644
--- a/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/org/apache/hyracks/hdfs2/dataflow/ConfFactory.java
+++ b/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/org/apache/hyracks/hdfs2/dataflow/ConfFactory.java
@@ -25,7 +25,6 @@
 import java.io.Serializable;
 
 import org.apache.hadoop.mapreduce.Job;
-
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 
 public class ConfFactory implements Serializable {
@@ -40,7 +39,7 @@
             confBytes = bos.toByteArray();
             dos.close();
         } catch (Exception e) {
-            throw new HyracksDataException(e);
+            throw HyracksDataException.create(e);
         }
     }
 
@@ -52,7 +51,7 @@
             dis.close();
             return conf;
         } catch (Exception e) {
-            throw new HyracksDataException(e);
+            throw HyracksDataException.create(e);
         }
     }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/org/apache/hyracks/hdfs2/dataflow/FileSplitsFactory.java b/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/org/apache/hyracks/hdfs2/dataflow/FileSplitsFactory.java
index ef95ee8..682b49a 100644
--- a/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/org/apache/hyracks/hdfs2/dataflow/FileSplitsFactory.java
+++ b/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/org/apache/hyracks/hdfs2/dataflow/FileSplitsFactory.java
@@ -30,7 +30,6 @@
 import java.util.List;
 
 import org.apache.hadoop.mapreduce.lib.input.FileSplit;
-
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 
 @SuppressWarnings("rawtypes")
@@ -96,7 +95,7 @@
             ByteArrayInputStream bis = new ByteArrayInputStream(bytes);
             DataInputStream dis = new DataInputStream(bis);
             int size = dis.readInt();
-            List<FileSplit> splits = new ArrayList<FileSplit>();
+            List<FileSplit> splits = new ArrayList<>();
             for (int i = 0; i < size; i++) {
                 splits.add((FileSplit) defaultConstructor.newInstance());
                 splits.get(i).readFields(dis);
@@ -104,7 +103,7 @@
             dis.close();
             return splits;
         } catch (Exception e) {
-            throw new HyracksDataException(e);
+            throw HyracksDataException.create(e);
         }
     }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/org/apache/hyracks/hdfs2/dataflow/HDFSWriteOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/org/apache/hyracks/hdfs2/dataflow/HDFSWriteOperatorDescriptor.java
index c27e4ec..1f163ba 100644
--- a/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/org/apache/hyracks/hdfs2/dataflow/HDFSWriteOperatorDescriptor.java
+++ b/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/org/apache/hyracks/hdfs2/dataflow/HDFSWriteOperatorDescriptor.java
@@ -27,7 +27,6 @@
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.dataflow.IOperatorNodePushable;
 import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
@@ -123,7 +122,7 @@
                     tupleWriter.close(dos);
                     dos.close();
                 } catch (Exception e) {
-                    throw new HyracksDataException(e);
+                    throw HyracksDataException.create(e);
                 } finally {
                     Thread.currentThread().setContextClassLoader(ctxCL);
                 }
diff --git a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCSystem.java b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCSystem.java
index d9ab210..dea48bd 100644
--- a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCSystem.java
+++ b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCSystem.java
@@ -74,6 +74,7 @@
         } catch (IOException e) {
             throw new IPCException(e);
         } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
             throw new IPCException(e);
         }
     }
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexSearchOperatorNodePushable.java b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexSearchOperatorNodePushable.java
index b358f07..07d07c3 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexSearchOperatorNodePushable.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexSearchOperatorNodePushable.java
@@ -215,7 +215,7 @@
                     }
                 } catch (Throwable th) {
                     writer.fail();
-                    closeException = new HyracksDataException(th);
+                    closeException = HyracksDataException.create(th);
                 }
             }
 
@@ -223,7 +223,7 @@
                 cursor.close();
             } catch (Throwable th) {
                 if (closeException == null) {
-                    closeException = new HyracksDataException(th);
+                    closeException = HyracksDataException.create(th);
                 } else {
                     closeException.addSuppressed(th);
                 }
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/freepage/MutableArrayValueReference.java b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/freepage/MutableArrayValueReference.java
index a19e69a..157450a 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/freepage/MutableArrayValueReference.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/freepage/MutableArrayValueReference.java
@@ -18,6 +18,8 @@
  */
 package org.apache.hyracks.storage.am.common.freepage;
 
+import java.nio.charset.StandardCharsets;
+
 import org.apache.hyracks.data.std.api.IValueReference;
 
 public class MutableArrayValueReference implements IValueReference {
@@ -46,4 +48,9 @@
         return array == null ? 0 : array.length;
     }
 
+    @Override
+    public String toString() {
+        return new String(array, StandardCharsets.UTF_8);
+    }
+
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeDiskComponent.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeDiskComponent.java
index eb8ec92..33bb60e 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeDiskComponent.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeDiskComponent.java
@@ -63,4 +63,9 @@
     public int getFileReferenceCount() {
         return btree.getBufferCache().getFileReferenceCount(btree.getFileId());
     }
+
+    @Override
+    public String toString() {
+        return getClass().getSimpleName() + ":" + btree.getFileReference().getRelativePath();
+    }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeWithBuddyDiskComponent.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeWithBuddyDiskComponent.java
index 57b9092..0ba7c30 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeWithBuddyDiskComponent.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeWithBuddyDiskComponent.java
@@ -74,4 +74,8 @@
         return btree.getBufferCache().getFileReferenceCount(btree.getFileId());
     }
 
+    @Override
+    public String toString() {
+        return getClass().getSimpleName() + ":" + btree.getFileReference().getRelativePath();
+    }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java
index 8ff907a..b1005bd 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java
@@ -221,6 +221,7 @@
                      * See PrefixMergePolicy.isMergeLagging() for more details.
                      */
                     if (opType == LSMOperationType.FLUSH) {
+                        opTracker.notifyAll();
                         while (mergePolicy.isMergeLagging(lsmIndex)) {
                             try {
                                 opTracker.wait();
@@ -672,16 +673,27 @@
     }
 
     /***
-     * Ensures the index is in a modifiable state
-     * @throws HyracksDataException if the index is not in a modifiable state
+     * Ensures the index is in a modifiable state (no failed flushes)
+     *
+     * @throws HyracksDataException
+     *             if the index is not in a modifiable state
      */
     private void ensureIndexModifiable() throws HyracksDataException {
+        // if current memory component has a flush request, it means that flush didn't start for it
+        if (lsmIndex.hasFlushRequestForCurrentMutableComponent()) {
+            return;
+        }
         // find if there is any memory component which is in a writable state or eventually will be in a writable state
         for (ILSMMemoryComponent memoryComponent : lsmIndex.getMemoryComponents()) {
             switch (memoryComponent.getState()) {
                 case INACTIVE:
+                    // will be activated on next modification
+                case UNREADABLE_UNWRITABLE:
+                    // flush completed successfully but readers are still inside
                 case READABLE_WRITABLE:
+                    // writable
                 case READABLE_UNWRITABLE_FLUSHING:
+                    // flush is ongoing
                     return;
                 default:
                     // continue to the next component
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/utils/ComponentMetadataUtil.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/utils/ComponentMetadataUtil.java
index 6ccbc8d..40017d1 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/utils/ComponentMetadataUtil.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/utils/ComponentMetadataUtil.java
@@ -19,6 +19,8 @@
 package org.apache.hyracks.storage.am.lsm.common.utils;
 
 import java.util.List;
+import java.util.logging.Level;
+import java.util.logging.Logger;
 
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.data.std.api.IPointable;
@@ -32,8 +34,8 @@
 
 public class ComponentMetadataUtil {
 
-    public static final MutableArrayValueReference MARKER_LSN_KEY =
-            new MutableArrayValueReference("Marker".getBytes());
+    private static final Logger LOGGER = Logger.getLogger(ComponentMetadataUtil.class.getName());
+    public static final MutableArrayValueReference MARKER_LSN_KEY = new MutableArrayValueReference("Marker".getBytes());
     public static final long NOT_FOUND = -1L;
 
     private ComponentMetadataUtil() {
@@ -71,16 +73,28 @@
      * @throws HyracksDataException
      */
     public static void get(ILSMIndex index, IValueReference key, IPointable pointable) throws HyracksDataException {
+        LOGGER.log(Level.INFO, "Getting " + key + " from index " + index);
         // Lock the opTracker to ensure index components don't change
         synchronized (index.getOperationTracker()) {
             index.getCurrentMemoryComponent().getMetadata().get(key, pointable);
             if (pointable.getLength() == 0) {
+                LOGGER.log(Level.INFO, key + " was not found in mutable memory component of " + index);
                 // was not found in the in current mutable component, search in the other in memory components
                 fromImmutableMemoryComponents(index, key, pointable);
                 if (pointable.getLength() == 0) {
+                    LOGGER.log(Level.INFO, key + " was not found in all immmutable memory components of " + index);
                     // was not found in the in all in memory components, search in the disk components
                     fromDiskComponents(index, key, pointable);
+                    if (pointable.getLength() == 0) {
+                        LOGGER.log(Level.INFO, key + " was not found in all disk components of " + index);
+                    } else {
+                        LOGGER.log(Level.INFO, key + " was found in disk components of " + index);
+                    }
+                } else {
+                    LOGGER.log(Level.INFO, key + " was found in the immutable memory components of " + index);
                 }
+            } else {
+                LOGGER.log(Level.INFO, key + " was found in mutable memory component of " + index);
             }
         }
     }
@@ -105,7 +119,9 @@
 
     private static void fromDiskComponents(ILSMIndex index, IValueReference key, IPointable pointable)
             throws HyracksDataException {
+        LOGGER.log(Level.INFO, "Getting " + key + " from disk components of " + index);
         for (ILSMDiskComponent c : index.getImmutableComponents()) {
+            LOGGER.log(Level.INFO, "Getting " + key + " from disk components " + c);
             c.getMetadata().get(key, pointable);
             if (pointable.getLength() != 0) {
                 // Found
@@ -115,10 +131,13 @@
     }
 
     private static void fromImmutableMemoryComponents(ILSMIndex index, IValueReference key, IPointable pointable) {
+        LOGGER.log(Level.INFO, "Getting " + key + " from immutable memory components of " + index);
         List<ILSMMemoryComponent> memComponents = index.getMemoryComponents();
         int numOtherMemComponents = memComponents.size() - 1;
         int next = index.getCurrentMemoryComponentIndex();
+        LOGGER.log(Level.INFO, index + " has " + numOtherMemComponents + " immutable memory components");
         for (int i = 0; i < numOtherMemComponents; i++) {
+            LOGGER.log(Level.INFO, "trying to get " + key + " from immutable memory components number: " + (i + 1));
             next = next - 1;
             if (next < 0) {
                 next = memComponents.size() - 1;
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexDiskComponent.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexDiskComponent.java
index f2b3284..2470a39 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexDiskComponent.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexDiskComponent.java
@@ -75,4 +75,9 @@
     public int getFileReferenceCount() {
         return deletedKeysBTree.getBufferCache().getFileReferenceCount(deletedKeysBTree.getFileId());
     }
+
+    @Override
+    public String toString() {
+        return getClass().getSimpleName() + ":" + ((OnDiskInvertedIndex) invIndex).getInvListsFile().getRelativePath();
+    }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/ondisk/OnDiskInvertedIndex.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/ondisk/OnDiskInvertedIndex.java
index d89a31a..9332302 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/ondisk/OnDiskInvertedIndex.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/ondisk/OnDiskInvertedIndex.java
@@ -43,8 +43,8 @@
 import org.apache.hyracks.storage.am.common.api.IPageManagerFactory;
 import org.apache.hyracks.storage.am.common.impls.NoOpOperationCallback;
 import org.apache.hyracks.storage.am.common.tuples.PermutingTupleReference;
-import org.apache.hyracks.storage.am.lsm.invertedindex.api.IInvertedIndexAccessor;
 import org.apache.hyracks.storage.am.lsm.invertedindex.api.IInPlaceInvertedIndex;
+import org.apache.hyracks.storage.am.lsm.invertedindex.api.IInvertedIndexAccessor;
 import org.apache.hyracks.storage.am.lsm.invertedindex.api.IInvertedIndexSearcher;
 import org.apache.hyracks.storage.am.lsm.invertedindex.api.IInvertedListBuilder;
 import org.apache.hyracks.storage.am.lsm.invertedindex.api.IInvertedListCursor;
@@ -292,7 +292,7 @@
                 output.writeInt(invListBuilder.getListSize());
                 btreeTupleBuilder.addFieldEndOffset();
             } catch (IOException e) {
-                throw new HyracksDataException(e);
+                throw HyracksDataException.create(e);
             }
             // Reset tuple reference and add it into the BTree load.
             btreeTupleReference.reset(btreeTupleBuilder.getFieldEndOffsets(), btreeTupleBuilder.getByteArray());
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeDiskComponent.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeDiskComponent.java
index 982f89b..54ef122 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeDiskComponent.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeDiskComponent.java
@@ -76,4 +76,9 @@
     public int getFileReferenceCount() {
         return rtree.getBufferCache().getFileReferenceCount(rtree.getFileId());
     }
+
+    @Override
+    public String toString() {
+        return getClass().getSimpleName() + ":" + rtree.getFileReference().getRelativePath();
+    }
 }