diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActivityState.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActivityState.java
index 1f3daac..26e7fd5 100644
--- a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActivityState.java
+++ b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActivityState.java
@@ -52,7 +52,7 @@
      */
     SUSPENDING,
     /**
-     * The activitiy has been suspended successfully. Next state must be resuming
+     * The activity has been suspended successfully. Next state must be resuming
      */
     SUSPENDED,
     /**
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java
index 276a04b..0f5dec7 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java
@@ -23,6 +23,7 @@
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.EnumSet;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Set;
@@ -85,7 +86,7 @@
     protected final MetadataProvider metadataProvider;
     protected final IHyracksClientConnection hcc;
     protected final EntityId entityId;
-    private final List<Dataset> datasets;
+    private final Set<Dataset> datasets;
     protected final ActiveEvent statsUpdatedEvent;
     protected final String runtimeName;
     protected final IRetryPolicyFactory retryPolicyFactory;
@@ -118,7 +119,7 @@
         this.metadataProvider = MetadataProvider.create(appCtx, null);
         this.hcc = hcc;
         this.entityId = entityId;
-        this.datasets = datasets;
+        this.datasets = new HashSet<>(datasets);
         this.retryPolicyFactory = retryPolicyFactory;
         this.state = ActivityState.STOPPED;
         this.statsTimestamp = -1;
@@ -251,19 +252,19 @@
     }
 
     @Override
-    public synchronized void remove(Dataset dataset) throws HyracksDataException {
+    public synchronized boolean remove(Dataset dataset) throws HyracksDataException {
         if (isActive()) {
             throw new RuntimeDataException(ErrorCode.CANNOT_REMOVE_DATASET_FROM_ACTIVE_ENTITY, entityId, state);
         }
-        getDatasets().remove(dataset);
+        return getDatasets().remove(dataset);
     }
 
     @Override
-    public synchronized void add(Dataset dataset) throws HyracksDataException {
+    public synchronized boolean add(Dataset dataset) throws HyracksDataException {
         if (isActive()) {
             throw new RuntimeDataException(ErrorCode.CANNOT_ADD_DATASET_TO_ACTIVE_ENTITY, entityId, state);
         }
-        getDatasets().add(dataset);
+        return getDatasets().add(dataset);
     }
 
     public JobId getJobId() {
@@ -676,7 +677,7 @@
     }
 
     @Override
-    public List<Dataset> getDatasets() {
+    public Set<Dataset> getDatasets() {
         return datasets;
     }
 
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveNotificationHandler.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveNotificationHandler.java
index 0a7cad6..8b74e07 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveNotificationHandler.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveNotificationHandler.java
@@ -22,6 +22,7 @@
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
 import org.apache.asterix.active.ActiveEvent;
 import org.apache.asterix.active.ActiveEvent.Kind;
@@ -287,7 +288,7 @@
             }
             LOGGER.log(level, "Acquiring locks");
             lockManager.acquireActiveEntityWriteLock(metadataProvider.getLocks(), dataverseName, entityName);
-            List<Dataset> datasets = ((ActiveEntityEventsListener) listener).getDatasets();
+            Set<Dataset> datasets = ((ActiveEntityEventsListener) listener).getDatasets();
             for (Dataset dataset : datasets) {
                 if (targetDataset != null && targetDataset.equals(dataset)) {
                     // DDL operation already acquired the proper lock for the operation
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/FeedEventsListener.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/FeedEventsListener.java
index fe4cdc5..a18cd45 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/FeedEventsListener.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/FeedEventsListener.java
@@ -65,10 +65,11 @@
     }
 
     @Override
-    public synchronized void remove(Dataset dataset) throws HyracksDataException {
+    public synchronized boolean remove(Dataset dataset) throws HyracksDataException {
         super.remove(dataset);
         feedConnections.removeIf(o -> o.getDataverseName().equals(dataset.getDataverseName())
                 && o.getDatasetName().equals(dataset.getDatasetName()));
+        return false;
     }
 
     public synchronized void addFeedConnection(FeedConnection feedConnection) {
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/TestUserActor.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/TestUserActor.java
index 1e2a795..bc2c110 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/TestUserActor.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/TestUserActor.java
@@ -18,6 +18,7 @@
  */
 package org.apache.asterix.test.active;
 
+import java.util.Collection;
 import java.util.List;
 import java.util.concurrent.Semaphore;
 
@@ -54,7 +55,7 @@
                 String entityName = actionListener.getEntityId().getEntityName();
                 try {
                     lockManager.acquireActiveEntityWriteLock(mdProvider.getLocks(), dataverseName, entityName);
-                    List<Dataset> datasets = actionListener.getDatasets();
+                    Collection<Dataset> datasets = actionListener.getDatasets();
                     for (Dataset dataset : datasets) {
                         lockUtil.modifyDatasetBegin(lockManager, mdProvider.getLocks(), dataset.getDataverseName(),
                                 dataset.getDatasetName());
@@ -77,7 +78,7 @@
                 String entityName = actionListener.getEntityId().getEntityName();
                 try {
                     lockManager.acquireActiveEntityWriteLock(mdProvider.getLocks(), dataverseName, entityName);
-                    List<Dataset> datasets = actionListener.getDatasets();
+                    Collection<Dataset> datasets = actionListener.getDatasets();
                     for (Dataset dataset : datasets) {
                         lockUtil.modifyDatasetBegin(lockManager, mdProvider.getLocks(), dataset.getDataverseName(),
                                 dataset.getDatasetName());
@@ -98,7 +99,7 @@
             protected void doExecute(MetadataProvider mdProvider) throws Exception {
                 DataverseName dataverseName = actionListener.getEntityId().getDataverseName();
                 String entityName = actionListener.getEntityId().getEntityName();
-                List<Dataset> datasets = actionListener.getDatasets();
+                Collection<Dataset> datasets = actionListener.getDatasets();
                 try {
                     lockManager.acquireActiveEntityWriteLock(mdProvider.getLocks(), dataverseName, entityName);
                     for (Dataset dataset : datasets) {
@@ -125,7 +126,7 @@
                 String entityName = actionListener.getEntityId().getEntityName();
                 try {
                     lockManager.acquireActiveEntityWriteLock(mdProvider.getLocks(), dataverseName, entityName);
-                    List<Dataset> datasets = actionListener.getDatasets();
+                    Collection<Dataset> datasets = actionListener.getDatasets();
                     for (Dataset dataset : datasets) {
                         lockManager.upgradeDatasetLockToWrite(mdProvider.getLocks(), dataset.getDataverseName(),
                                 dataset.getDatasetName());
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IActiveEntityController.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IActiveEntityController.java
index baface2..c355b43 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IActiveEntityController.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IActiveEntityController.java
@@ -18,7 +18,7 @@
  */
 package org.apache.asterix.metadata.api;
 
-import java.util.List;
+import java.util.Set;
 
 import org.apache.asterix.active.IActiveEntityEventsListener;
 import org.apache.asterix.metadata.declared.MetadataProvider;
@@ -76,30 +76,32 @@
      *
      * @param dataset
      *            the dataset to add
+     * @return <code>true</code> if the active entity did not already contain the dataset
      * @throws HyracksDataException
      *             if the entity is active
      */
-    void add(Dataset dataset) throws HyracksDataException;
+    boolean add(Dataset dataset) throws HyracksDataException;
 
     /**
      * Remove dataset from the list of associated datasets
      *
      * @param dataset
      *            the dataset to add
+     * @return <code>true</code> if the active entity contained the dataset
      * @throws HyracksDataException
      *             if the entity is active
      */
-    void remove(Dataset dataset) throws HyracksDataException;
+    boolean remove(Dataset dataset) throws HyracksDataException;
 
     /**
      * @return the list of associated datasets
      */
-    List<Dataset> getDatasets();
+    Set<Dataset> getDatasets();
 
     /**
      * replace the dataset object with the passed updated object
      *
-     * @param target
+     * @param dataset
      */
     void replace(Dataset dataset);
 
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/HyracksException.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/HyracksException.java
index 46a9b66..7b9f632 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/HyracksException.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/HyracksException.java
@@ -20,7 +20,6 @@
 
 import java.io.IOException;
 import java.io.Serializable;
-import java.util.Objects;
 
 import org.apache.hyracks.api.util.ErrorMessageUtil;
 
@@ -154,11 +153,6 @@
         return msgCache;
     }
 
-    public boolean matches(String component, int errorCode) {
-        Objects.requireNonNull(component, "component");
-        return component.equals(this.component) && errorCode == this.errorCode;
-    }
-
     @Override
     public String toString() {
         return getLocalizedMessage();
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/IFormattedException.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/IFormattedException.java
index 0f873e1..f8b017b 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/IFormattedException.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/IFormattedException.java
@@ -18,6 +18,8 @@
  */
 package org.apache.hyracks.api.exceptions;
 
+import java.util.Objects;
+
 public interface IFormattedException {
 
     /**
@@ -40,4 +42,27 @@
      * @return the exception message
      */
     String getMessage();
+
+    /**
+     * Tests for matching component & errorCode against this exception
+     *
+     * @param component the component to match
+     * @param errorCode the errorCode to match
+     * @return <code>true</code> if this {@link IFormattedException} instance matches the supplied parameters
+     */
+    default boolean matches(String component, int errorCode) {
+        Objects.requireNonNull(component, "component");
+        return component.equals(getComponent()) && errorCode == getErrorCode();
+    }
+
+    /**
+     * Tests for matching component & errorCode against supplied throwable
+     *
+     * @param component the component to match
+     * @param errorCode the errorCode to match
+     * @return <code>true</code> if the supplied {@link Throwable}  matches the supplied parameters
+     */
+    static boolean matches(Throwable th, String component, int errorCode) {
+        return th instanceof IFormattedException && ((IFormattedException) th).matches(component, errorCode);
+    }
 }
\ No newline at end of file
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/InvokeUtil.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/InvokeUtil.java
index 5d52ed9..6ecb677 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/InvokeUtil.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/InvokeUtil.java
@@ -270,7 +270,7 @@
             IDelay delay, IFailedAttemptCallback onFailure) throws HyracksDataException {
         Throwable failure;
         int attempt = 0;
-        while (true) {
+        while (!Thread.currentThread().isInterrupted()) {
             attempt++;
             try {
                 return action.compute();
@@ -291,6 +291,7 @@
                 }
             }
         }
+        throw HyracksDataException.create(new InterruptedException());
     }
 
     @FunctionalInterface
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/FrameTupleAccessor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/FrameTupleAccessor.java
index cefada7..1cfc14c 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/FrameTupleAccessor.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/FrameTupleAccessor.java
@@ -70,7 +70,7 @@
     @Override
     public int getTupleStartOffset(int tupleIndex) {
         int offset = tupleIndex == 0 ? FrameConstants.TUPLE_START_OFFSET
-                : IntSerDeUtils.getInt(buffer.array(), tupleCountOffset - 4 * tupleIndex);
+                : IntSerDeUtils.getInt(buffer.array(), tupleCountOffset - FrameConstants.SIZE_LEN * tupleIndex);
         return start + offset;
     }
 
diff --git a/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/ThrowingConsumer.java b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/ThrowingConsumer.java
index be9874a..4f73680 100644
--- a/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/ThrowingConsumer.java
+++ b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/ThrowingConsumer.java
@@ -40,4 +40,10 @@
         };
     }
 
+    static <R> ThrowingFunction<R, Void> asFunction(ThrowingConsumer<R> consumer) {
+        return input -> {
+            consumer.process(input);
+            return null;
+        };
+    }
 }
