Merge branch 'gerrit/trinity' into 'master'

Change-Id: Ia4c6bdfa9dce9437049d59e137494b2372348754
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryCancellationServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryCancellationServlet.java
index 41f2a53..b2134dc 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryCancellationServlet.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryCancellationServlet.java
@@ -67,6 +67,9 @@
             CancelQueryRequest cancelQueryMessage = new CancelQueryRequest(serviceCtx.getNodeId(),
                     cancelQueryFuture.getFutureId(), uuid, clientContextId);
             // TODO(mblow): multicc -- need to send cancellation to the correct cc
+            if (LOGGER.isDebugEnabled()) {
+                LOGGER.debug("Sending CancelQueryRequest with uuid:{}, clientContextID:{}", uuid, clientContextId);
+            }
             messageBroker.sendMessageToPrimaryCC(cancelQueryMessage);
             CancelQueryResponse cancelResponse =
                     (CancelQueryResponse) cancelQueryFuture.get(DEFAULT_NC_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryServiceServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryServiceServlet.java
index d2d7a9b..695956c 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryServiceServlet.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryServiceServlet.java
@@ -165,7 +165,8 @@
             CancelQueryRequest cancelQueryMessage =
                     new CancelQueryRequest(nodeId, cancelQueryFuture.getFutureId(), uuid, clientContextID);
             // TODO(mblow): multicc -- need to send cancellation to the correct cc
-            LOGGER.info("Cancelling query due to {}", exception.getClass().getSimpleName());
+            LOGGER.info("Cancelling query with uuid:{}, clientContextID:{} due to {}", uuid, clientContextID,
+                    exception.getClass().getSimpleName());
             messageBroker.sendMessageToPrimaryCC(cancelQueryMessage);
             if (wait) {
                 cancelQueryFuture.get(ExecuteStatementRequestMessage.DEFAULT_QUERY_CANCELLATION_WAIT_MILLIS,
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java
index ddd3d64..3fd339e 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java
@@ -195,12 +195,12 @@
     @SuppressWarnings("unchecked")
     protected void finish(ActiveEvent event) throws HyracksDataException {
         if (LOGGER.isEnabled(level)) {
-            LOGGER.log(level, "the job {} finished", jobId);
+            LOGGER.log(level, "Active job {} finished", jobId);
         }
         JobId lastJobId = jobId;
         if (numRegistered != numDeRegistered) {
             LOGGER.log(Level.WARN,
-                    "the job {} finished with reported runtime registrations = {} and deregistrations = {}", jobId,
+                    "Active job {} finished with reported runtime registrations = {} and deregistrations = {}", jobId,
                     numRegistered, numDeRegistered);
         }
         jobId = null;
@@ -208,7 +208,7 @@
         JobStatus jobStatus = status.getLeft();
         List<Exception> exceptions = status.getRight();
         if (LOGGER.isEnabled(level)) {
-            LOGGER.log(level, "The job finished with status: {}", jobStatus);
+            LOGGER.log(level, "Active job {} finished with status {}", lastJobId, jobStatus);
         }
         if (!jobSuccessfullyTerminated(jobStatus)) {
             jobFailure = exceptions.isEmpty() ? new RuntimeDataException(ErrorCode.UNREPORTED_TASK_FAILURE_EXCEPTION)
@@ -372,7 +372,7 @@
     @Override
     public synchronized void recover() {
         if (LOGGER.isEnabled(level)) {
-            LOGGER.log(level, "Recover is called on " + entityId);
+            LOGGER.log(level, "Recover is called on {}", entityId);
         }
         if (retryPolicyFactory == NoRetryPolicyFactory.INSTANCE) {
             LOGGER.log(level, "But it has no recovery policy, so it is set to permanent failure");
@@ -456,7 +456,7 @@
         try {
             metadataProvider.getApplicationContext().getHcc().cancelJob(jobId);
         } catch (Throwable th) {
-            LOGGER.warn("Failed to cancel active job", th);
+            LOGGER.warn("Failed to cancel active job {}", jobId, th);
             e.addSuppressed(th);
         }
     }
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveNotificationHandler.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveNotificationHandler.java
index 284929f..6b3581e 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveNotificationHandler.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveNotificationHandler.java
@@ -66,22 +66,24 @@
 
     @Override
     protected void handle(ActiveEvent event) {
-        EntityId entityId = jobId2EntityId.get(event.getJobId());
+        JobId jobId = event.getJobId();
+        Kind eventKind = event.getEventKind();
+        EntityId entityId = jobId2EntityId.get(jobId);
         if (entityId != null) {
             IActiveEntityEventsListener listener = entityEventListeners.get(entityId);
             if (LOGGER.isEnabled(level)) {
-                LOGGER.log(level, "Next event is of type " + event.getEventKind());
+                LOGGER.log(level, "Next event is {} for job {}", eventKind, jobId);
             }
-            if (event.getEventKind() == Kind.JOB_FINISHED) {
-                LOGGER.log(level, "Removing the job");
-                jobId2EntityId.remove(event.getJobId());
+            if (eventKind == Kind.JOB_FINISHED) {
+                LOGGER.log(level, "Removing job {}", jobId);
+                jobId2EntityId.remove(jobId);
             }
             if (listener != null) {
                 LOGGER.log(level, "Notifying the listener");
                 listener.notify(event);
             }
         } else {
-            LOGGER.log(Level.ERROR, "Entity not found for received message for job " + event.getJobId());
+            LOGGER.log(Level.ERROR, "Entity not found for event {} for job {}", eventKind, jobId);
         }
     }
 
@@ -90,45 +92,43 @@
     @Override
     public void notifyJobCreation(JobId jobId, JobSpecification jobSpecification) throws HyracksDataException {
         if (LOGGER.isEnabled(level)) {
-            LOGGER.log(level,
-                    "notifyJobCreation(JobId jobId, JobSpecification jobSpecification) was called with jobId = "
-                            + jobId);
+            LOGGER.log(level, "notifyJobCreation was called for job {}", jobId);
         }
         Object property = jobSpecification.getProperty(ACTIVE_ENTITY_PROPERTY_NAME);
         if (!(property instanceof EntityId)) {
             if (LOGGER.isEnabled(level)) {
-                LOGGER.log(level, "Job is not of type active job. property found to be: " + property);
+                LOGGER.log(level, "Job {} is not of type active job. property found to be {}", jobId, property);
             }
             return;
         }
         EntityId entityId = (EntityId) property;
         monitorJob(jobId, entityId);
         boolean found = jobId2EntityId.get(jobId) != null;
-        LOGGER.log(level, "Job was found to be: " + (found ? "Active" : "Inactive"));
+        LOGGER.log(level, "Job {} was found to be {}", jobId, (found ? "Active" : "Inactive"));
         add(new ActiveEvent(jobId, Kind.JOB_CREATED, entityId, jobSpecification));
     }
 
     private synchronized void monitorJob(JobId jobId, EntityId entityId) {
         if (LOGGER.isEnabled(level)) {
-            LOGGER.log(level, "monitorJob(JobId jobId, ActiveJob activeJob) called with job id: " + jobId);
+            LOGGER.log(level, "monitorJob was called for job {}", jobId);
         }
         boolean found = jobId2EntityId.get(jobId) != null;
         if (LOGGER.isEnabled(level)) {
-            LOGGER.log(level, "Job was found to be: " + (found ? "Active" : "Inactive"));
+            LOGGER.log(level, "Job {} was found to be {}", jobId, (found ? "Active" : "Inactive"));
         }
         if (entityEventListeners.containsKey(entityId)) {
             if (jobId2EntityId.containsKey(jobId)) {
                 if (LOGGER.isErrorEnabled()) {
-                    LOGGER.error("Job is already being monitored for job: " + jobId);
+                    LOGGER.error("Job {} is already being monitored", jobId);
                 }
                 return;
             }
             if (LOGGER.isEnabled(level)) {
-                LOGGER.log(level, "monitoring started for job id: " + jobId);
+                LOGGER.log(level, "Monitoring started for job {}", jobId);
             }
         } else {
             if (LOGGER.isEnabled(level)) {
-                LOGGER.info("No listener was found for the entity: " + entityId);
+                LOGGER.info("No listener was found for the entity {} for job {}", entityId, jobId);
             }
         }
         jobId2EntityId.put(jobId, entityId);
@@ -146,14 +146,14 @@
     public synchronized void notifyJobFinish(JobId jobId, JobStatus jobStatus, List<Exception> exceptions)
             throws HyracksException {
         if (LOGGER.isEnabled(level)) {
-            LOGGER.log(level, "Getting notified of job finish for JobId: " + jobId);
+            LOGGER.log(level, "Getting notified of job finish for job {}", jobId);
         }
         EntityId entityId = jobId2EntityId.get(jobId);
         if (entityId != null) {
             add(new ActiveEvent(jobId, Kind.JOB_FINISHED, entityId, Pair.of(jobStatus, exceptions)));
         } else {
             if (LOGGER.isEnabled(level)) {
-                LOGGER.log(level, "no need to notify job finish");
+                LOGGER.log(level, "No need to notify JOB_FINISHED for job {}", jobId);
             }
         }
     }
@@ -169,11 +169,11 @@
     @Override
     public IActiveEntityEventsListener getListener(EntityId entityId) {
         if (LOGGER.isEnabled(level)) {
-            LOGGER.log(level, "getActiveEntityListener(EntityId entityId) was called with entity " + entityId);
+            LOGGER.log(level, "getActiveEntityListener was called with entity {}", entityId);
         }
         IActiveEntityEventsListener listener = entityEventListeners.get(entityId);
         if (LOGGER.isEnabled(level)) {
-            LOGGER.log(level, "Listener found: " + listener);
+            LOGGER.log(level, "Listener found: {}", listener);
         }
         return entityEventListeners.get(entityId);
     }
@@ -192,8 +192,7 @@
             throw new RuntimeDataException(ErrorCode.ACTIVE_NOTIFICATION_HANDLER_IS_SUSPENDED);
         }
         if (LOGGER.isEnabled(level)) {
-            LOGGER.log(level, "registerListener(IActiveEntityEventsListener listener) was called for the entity "
-                    + listener.getEntityId());
+            LOGGER.log(level, "registerListener was called for the entity {}", listener.getEntityId());
         }
         if (entityEventListeners.containsKey(listener.getEntityId())) {
             throw new RuntimeDataException(ErrorCode.ACTIVE_ENTITY_IS_ALREADY_REGISTERED, listener.getEntityId());
@@ -207,8 +206,7 @@
             throw new RuntimeDataException(ErrorCode.ACTIVE_NOTIFICATION_HANDLER_IS_SUSPENDED);
         }
         if (LOGGER.isEnabled(level)) {
-            LOGGER.log(level, "unregisterListener(IActiveEntityEventsListener listener) was called for the entity "
-                    + listener.getEntityId());
+            LOGGER.log(level, "unregisterListener was called for the entity {}", listener.getEntityId());
         }
         IActiveEntityEventsListener registeredListener = entityEventListeners.remove(listener.getEntityId());
         if (registeredListener == null) {
@@ -226,7 +224,7 @@
         for (IActiveEntityEventsListener listener : getEventListeners()) {
             synchronized (listener) {
                 if (LOGGER.isEnabled(level)) {
-                    LOGGER.log(level, "Entity " + listener.getEntityId() + " is " + listener.getState());
+                    LOGGER.log(level, "Entity {} is {}", listener.getEntityId(), listener.getState());
                 }
                 listener.notifyAll();
             }
@@ -276,11 +274,11 @@
     public void resumeOrHalt(IActiveEntityEventsListener listener, MetadataProvider metadataProvider) {
         try {
             if (LOGGER.isEnabled(level)) {
-                LOGGER.log(level, "Resuming " + listener.getEntityId());
+                LOGGER.log(level, "Resuming {}", listener.getEntityId());
             }
             ((ActiveEntityEventsListener) listener).resume(metadataProvider);
             if (LOGGER.isEnabled(level)) {
-                LOGGER.log(level, listener.getEntityId() + " resumed");
+                LOGGER.log(level, "{} resumed", listener.getEntityId());
             }
         } catch (Throwable th) { // NOSONAR must halt in case of any failure
             LOGGER.error("Resume active failed", th);
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/CancelQueryRequest.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/CancelQueryRequest.java
index 6154faa..65d1039 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/CancelQueryRequest.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/CancelQueryRequest.java
@@ -50,6 +50,7 @@
         final IRequestTracker requestTracker = appCtx.getRequestTracker();
         IClientRequest req = uuid != null ? requestTracker.get(uuid) : requestTracker.getByClientContextId(contextId);
         RequestStatus status;
+        String requestId = "";
 
         if (req == null) {
             LOGGER.log(Level.INFO, "No request found for uuid {} or context id {}", uuid, contextId);
@@ -59,7 +60,8 @@
                 status = RequestStatus.REJECTED;
             } else {
                 try {
-                    requestTracker.cancel(req.getId());
+                    requestId = req.getId();
+                    requestTracker.cancel(requestId);
                     status = RequestStatus.SUCCESS;
                 } catch (Exception e) {
                     LOGGER.log(Level.WARN, "unexpected exception thrown from cancel", e);
@@ -67,6 +69,10 @@
                 }
             }
         }
+        if (LOGGER.isDebugEnabled()) {
+            LOGGER.debug("Sending CancelQueryResponse to {}. requestId:{}, uuid:{}, contextId:{}, status:{}", nodeId,
+                    requestId, uuid, contextId, status);
+        }
         CancelQueryResponse response = new CancelQueryResponse(reqId, status);
         CCMessageBroker messageBroker = (CCMessageBroker) appCtx.getServiceContext().getMessageBroker();
         try {
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementRequestMessage.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementRequestMessage.java
index 0a5e033..05bc87b 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementRequestMessage.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementRequestMessage.java
@@ -243,7 +243,7 @@
 
     @Override
     public String toString() {
-        return String.format("%s(id=%s, from=%s): %s", getClass().getSimpleName(), requestMessageId, requestNodeId,
-                LogRedactionUtil.statement(statementsText));
+        return String.format("%s(id=%s, from=%s, uuid=%s): %s", getClass().getSimpleName(), requestMessageId,
+                requestNodeId, requestReference.getUuid(), LogRedactionUtil.statement(statementsText));
     }
 }
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/HaltCallback.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/HaltCallback.java
index 9802001..bb689ac 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/HaltCallback.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/HaltCallback.java
@@ -41,6 +41,8 @@
     @Override
     public void operationFailed(ILSMIOOperation operation, Throwable t) {
         LOGGER.error("Operation {} has failed", operation, t);
-        ExitUtil.halt(ExitUtil.EC_IO_OPERATION_FAILED);
+        if (operation.getIOOpertionType() != ILSMIOOperation.LSMIOOperationType.REPLICATE) {
+            ExitUtil.halt(ExitUtil.EC_IO_OPERATION_FAILED);
+        }
     }
 }
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
index 56f794b..5d80771 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
@@ -492,18 +492,10 @@
             final INetworkSecurityManager networkSecurityManager =
                     ncServiceContext.getControllerService().getNetworkSecurityManager();
 
-            // clients need to have the client factory on their classpath- to enable older clients, only use
-            // our client socket factory when SSL is enabled
-            if (networkSecurityManager.getConfiguration().isSslEnabled()) {
-                final RMIServerFactory serverSocketFactory = new RMIServerFactory(networkSecurityManager);
-                final RMIClientFactory clientSocketFactory =
-                        new RMIClientFactory(networkSecurityManager.getConfiguration());
-                metadataNodeStub = (IMetadataNode) UnicastRemoteObject.exportObject(MetadataNode.INSTANCE,
-                        getMetadataProperties().getMetadataPort(), clientSocketFactory, serverSocketFactory);
-            } else {
-                metadataNodeStub = (IMetadataNode) UnicastRemoteObject.exportObject(MetadataNode.INSTANCE,
-                        getMetadataProperties().getMetadataPort());
-            }
+            metadataNodeStub = (IMetadataNode) UnicastRemoteObject.exportObject(MetadataNode.INSTANCE,
+                    getMetadataProperties().getMetadataPort(),
+                    RMIClientFactory.getSocketFactory(networkSecurityManager),
+                    RMIServerFactory.getSocketFactory(networkSecurityManager));
         }
     }
 
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
index 21ce833..d41b549 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
@@ -4813,6 +4813,10 @@
                 }
             }
             jobId = JobUtils.runJob(hcc, jobSpec, jobFlags, false);
+            if (LOGGER.isDebugEnabled()) {
+                LOGGER.debug("createAndRunJob jobId:{}, uuid:{}", jobId,
+                        requestParameters.getRequestReference().getUuid());
+            }
             clientRequest.setJobId(jobId);
             if (jId != null) {
                 jId.setValue(jobId);
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/rebalance/IDatasetRebalanceCallback.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/rebalance/IDatasetRebalanceCallback.java
index e8683c9..c366977 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/rebalance/IDatasetRebalanceCallback.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/rebalance/IDatasetRebalanceCallback.java
@@ -30,7 +30,7 @@
 public interface IDatasetRebalanceCallback {
 
     /**
-     * The action to perform before the target dataset is populated.
+     * The check to perform before the target dataset is populated.
      *
      * @param metadataProvider,
      *            the metadata provider.
@@ -40,9 +40,13 @@
      *            the target dataset.
      * @param hcc,
      *            the hyracks client connection.
+     *
+     * @return <code>true</code> if the rebalance of the dataset should proceed, otherwise <code>false</code> to skip.
+     *         If the dataset is skipped, the active metadata transaction context, if any, can be expected to be aborted.
+     *
      * @throws HyracksDataException
      */
-    void beforeRebalance(MetadataProvider metadataProvider, Dataset source, Dataset target,
+    boolean canRebalance(MetadataProvider metadataProvider, Dataset source, Dataset target,
             IHyracksClientConnection hcc) throws HyracksDataException;
 
     /**
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/rebalance/NoOpDatasetRebalanceCallback.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/rebalance/NoOpDatasetRebalanceCallback.java
index 680adbf..7085567 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/rebalance/NoOpDatasetRebalanceCallback.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/rebalance/NoOpDatasetRebalanceCallback.java
@@ -33,9 +33,10 @@
     }
 
     @Override
-    public void beforeRebalance(MetadataProvider metadataProvider, Dataset source, Dataset target,
+    public boolean canRebalance(MetadataProvider metadataProvider, Dataset source, Dataset target,
             IHyracksClientConnection hcc) {
         // Does nothing.
+        return true;
     }
 
     @Override
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/RebalanceUtil.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/RebalanceUtil.java
index 1b0b4fd..837d2cb 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/RebalanceUtil.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/RebalanceUtil.java
@@ -20,6 +20,8 @@
 
 import static org.apache.asterix.app.translator.QueryTranslator.abort;
 import static org.apache.asterix.common.config.DatasetConfig.DatasetType;
+import static org.apache.asterix.common.utils.IdentifierUtil.dataset;
+import static org.apache.asterix.metadata.utils.DatasetUtil.getFullyQualifiedDisplayName;
 import static org.apache.asterix.om.utils.ProjectionFiltrationTypeUtil.ALL_FIELDS_TYPE;
 import static org.apache.hyracks.storage.am.common.dataflow.IndexDropOperatorDescriptor.DropOption;
 
@@ -88,13 +90,15 @@
      * @param targetNcNames,    the list of target nodes.
      * @param metadataProvider, the metadata provider.
      * @param hcc,              the reusable hyracks connection.
+     * @return <code>false</code> if the rebalance was safely skipped
      * @throws Exception
      */
-    public static void rebalance(DataverseName dataverseName, String datasetName, Set<String> targetNcNames,
+    public static boolean rebalance(DataverseName dataverseName, String datasetName, Set<String> targetNcNames,
             MetadataProvider metadataProvider, IHyracksClientConnection hcc,
             IDatasetRebalanceCallback datasetRebalanceCallback, boolean forceRebalance) throws Exception {
         Dataset sourceDataset;
         Dataset targetDataset;
+        boolean success = true;
         // Executes the first Metadata transaction.
         // Generates the rebalance target files. While doing that, hold read locks on the dataset so
         // that no one can drop the rebalance source dataset.
@@ -106,13 +110,13 @@
 
             // If the source dataset doesn't exist, then it's a no-op.
             if (sourceDataset == null) {
-                return;
+                return true;
             }
 
             Set<String> sourceNodes = new HashSet<>(metadataProvider.findNodes(sourceDataset.getNodeGroupName()));
 
             if (!forceRebalance && sourceNodes.equals(targetNcNames)) {
-                return;
+                return true;
             }
 
             if (!targetNcNames.isEmpty()) {
@@ -123,20 +127,25 @@
                 // The target dataset for rebalance.
                 targetDataset = sourceDataset.getTargetDatasetForRebalance(nodeGroupName);
 
-                LOGGER.info("Rebalancing dataset {} from node group {} with nodes {} to node group {} with nodes {}",
-                        sourceDataset.getDatasetName(), sourceDataset.getNodeGroupName(), sourceNodes,
-                        targetDataset.getNodeGroupName(), targetNcNames);
+                LOGGER.info("Rebalancing {} {} from node group {} with nodes {} to node group {} with nodes {}",
+                        dataset(), getFullyQualifiedDisplayName(sourceDataset), sourceDataset.getNodeGroupName(),
+                        sourceNodes, targetDataset.getNodeGroupName(), targetNcNames);
                 // Rebalances the source dataset into the target dataset.
                 if (sourceDataset.getDatasetType() != DatasetType.EXTERNAL) {
-                    rebalance(sourceDataset, targetDataset, metadataProvider, hcc, datasetRebalanceCallback);
+                    success = rebalance(sourceDataset, targetDataset, metadataProvider, hcc, datasetRebalanceCallback);
                 }
             } else {
                 targetDataset = null;
                 // if this the last NC in the cluster, just drop the dataset
                 purgeDataset(sourceDataset, metadataProvider, hcc);
             }
-            // Complete the metadata transaction.
-            MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+            if (success) {
+                // Complete the metadata transaction.
+                MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+            } else {
+                // Abort the metadata transaction, since we failed to rebalance the dataset
+                MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
+            }
         } catch (Exception e) {
             abort(e, e, mdTxnCtx);
             throw e;
@@ -144,7 +153,10 @@
 
         if (targetNcNames.isEmpty()) {
             // Nothing else to do since the dataset was dropped.
-            return;
+            return true;
+        } else if (!success) {
+            LOGGER.info("Dataset {} rebalance was skipped, see above log for reason", datasetName);
+            return false;
         }
         // Up to this point, since the bulk part of a rebalance operation is done,
         // the following two operations will retry after interrupt and finally rethrow InterruptedException,
@@ -163,6 +175,7 @@
             runMetadataTransaction(metadataProvider, () -> dropSourceDataset(sourceDataset, metadataProvider, hcc));
         });
         LOGGER.info("Dataset {} rebalance completed successfully", datasetName);
+        return true;
     }
 
     @FunctionalInterface
@@ -212,13 +225,16 @@
     }
 
     // Rebalances from the source to the target.
-    private static void rebalance(Dataset source, Dataset target, MetadataProvider metadataProvider,
+    private static boolean rebalance(Dataset source, Dataset target, MetadataProvider metadataProvider,
             IHyracksClientConnection hcc, IDatasetRebalanceCallback datasetRebalanceCallback) throws Exception {
         // Drops the target dataset files (if any) to make rebalance idempotent.
         dropDatasetFiles(target, metadataProvider, hcc);
 
         // Performs the specified operation before the target dataset is populated.
-        datasetRebalanceCallback.beforeRebalance(metadataProvider, source, target, hcc);
+        if (!datasetRebalanceCallback.canRebalance(metadataProvider, source, target, hcc)) {
+            // the callback indicates that this rebalance should be skipped; short circuit the remaining steps
+            return false;
+        }
 
         // Creates the rebalance target.
         createRebalanceTarget(target, metadataProvider, hcc);
@@ -231,6 +247,8 @@
 
         // Performs the specified operation after the target dataset is populated.
         datasetRebalanceCallback.afterRebalance(metadataProvider, source, target, hcc);
+
+        return true;
     }
 
     // Switches the metadata entity from the source dataset to the target dataset.
diff --git a/asterixdb/asterix-app/src/main/resources/cc-cloud-storage.conf b/asterixdb/asterix-app/src/main/resources/cc-cloud-storage.conf
new file mode 100644
index 0000000..be9bd54
--- /dev/null
+++ b/asterixdb/asterix-app/src/main/resources/cc-cloud-storage.conf
@@ -0,0 +1,68 @@
+; Licensed to the Apache Software Foundation (ASF) under one
+; or more contributor license agreements.  See the NOTICE file
+; distributed with this work for additional information
+; regarding copyright ownership.  The ASF licenses this file
+; to you under the Apache License, Version 2.0 (the
+; "License"); you may not use this file except in compliance
+; with the License.  You may obtain a copy of the License at
+;
+;   http://www.apache.org/licenses/LICENSE-2.0
+;
+; Unless required by applicable law or agreed to in writing,
+; software distributed under the License is distributed on an
+; "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+; KIND, either express or implied.  See the License for the
+; specific language governing permissions and limitations
+; under the License.
+
+[nc/asterix_nc1]
+txn.log.dir=target/tmp/asterix_nc1/txnlog
+core.dump.dir=target/tmp/asterix_nc1/coredump
+iodevices=asterix_nc1/iodevice1
+iodevices=asterix_nc1/iodevice2
+nc.api.port=19004
+#jvm.args=-agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=5006
+
+[nc/asterix_nc2]
+ncservice.port=9091
+txn.log.dir=target/tmp/asterix_nc2/txnlog
+core.dump.dir=target/tmp/asterix_nc2/coredump
+iodevices=asterix_nc2/iodevice1
+iodevices=asterix_nc2/iodevice2
+nc.api.port=19005
+#jvm.args=-agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=5007
+
+[nc]
+address=127.0.0.1
+command=asterixnc
+app.class=org.apache.asterix.hyracks.bootstrap.NCApplication
+jvm.args=-Xmx4096m -Dnode.Resolver="org.apache.asterix.external.util.IdentitiyResolverFactory"
+storage.buffercache.size=128MB
+storage.memorycomponent.globalbudget=512MB
+storage.io.scheduler=greedy
+storage.filtered.memorycomponent.max.size=16MB
+
+[cc]
+address = 127.0.0.1
+app.class=org.apache.asterix.hyracks.bootstrap.CCApplication
+heartbeat.period=2000
+heartbeat.max.misses=25
+
+[common]
+log.dir = logs/
+log.level = INFO
+compiler.cbo=false
+compiler.cbotest=true
+compiler.queryplanshape=zigzag
+compiler.framesize=32KB
+compiler.sortmemory=320KB
+compiler.groupmemory=160KB
+compiler.joinmemory=256KB
+compiler.textsearchmemory=160KB
+compiler.windowmemory=192KB
+compiler.sort.parallel=false
+compiler.internal.sanitycheck=true
+messaging.frame.size=4096
+messaging.frame.count=512
+cloud.deployment=true
+storage.buffercache.pagesize=32KB
diff --git a/asterixdb/asterix-app/src/main/resources/cc_static_partitioning.conf b/asterixdb/asterix-app/src/main/resources/cc_static_partitioning.conf
index 08d9dfd..9884116 100644
--- a/asterixdb/asterix-app/src/main/resources/cc_static_partitioning.conf
+++ b/asterixdb/asterix-app/src/main/resources/cc_static_partitioning.conf
@@ -51,6 +51,9 @@
 [common]
 log.dir = logs/
 log.level = INFO
+compiler.cbo=false
+compiler.cbotest=true
+compiler.queryplanshape=zigzag
 compiler.framesize=32KB
 compiler.sortmemory=320KB
 compiler.groupmemory=160KB
@@ -62,4 +65,5 @@
 messaging.frame.size=4096
 messaging.frame.count=512
 storage.buffercache.pagesize=32KB
-storage.partitioning=static
\ No newline at end of file
+storage.partitioning=static
+storage.compression.block=snappy
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/ResultExtractor.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/ResultExtractor.java
index 091fc20..05b8bbf 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/ResultExtractor.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/ResultExtractor.java
@@ -214,8 +214,8 @@
             result = OBJECT_READER.readValue(resultStr);
         } catch (Exception e) {
             // whoops, not JSON (e.g. 404) - just include the body
-            GlobalConfig.ASTERIX_LOGGER.log(Level.ERROR, resultStr);
-            throw new Exception(resultStr);
+            GlobalConfig.ASTERIX_LOGGER.log(Level.ERROR, "result: {} json parse exception: {}", resultStr, e);
+            throw new Exception(resultStr, e);
         }
 
         final boolean isJsonFormat = isJsonFormat(fmt);
diff --git a/asterixdb/asterix-app/src/test/resources/cc-analyze.conf b/asterixdb/asterix-app/src/test/resources/cc-analyze.conf
index 9d45ddb..16fa0ae 100644
--- a/asterixdb/asterix-app/src/test/resources/cc-analyze.conf
+++ b/asterixdb/asterix-app/src/test/resources/cc-analyze.conf
@@ -53,4 +53,4 @@
 log.dir = logs/
 log.level = INFO
 compiler.groupmemory=64MB
-storage.buffercache.pagesize=32KB
\ No newline at end of file
+storage.buffercache.pagesize=32KB
diff --git a/asterixdb/asterix-app/src/test/resources/cc-cloud-storage.conf b/asterixdb/asterix-app/src/test/resources/cc-cloud-storage.conf
index 7d0b06b..6373eae 100644
--- a/asterixdb/asterix-app/src/test/resources/cc-cloud-storage.conf
+++ b/asterixdb/asterix-app/src/test/resources/cc-cloud-storage.conf
@@ -62,10 +62,3 @@
 messaging.frame.count=512
 cloud.deployment=true
 storage.buffercache.pagesize=32KB
-storage.partitioning=static
-cloud.storage.scheme=s3
-cloud.storage.bucket=cloud-storage-container
-cloud.storage.region=us-west-2
-cloud.storage.endpoint=http://127.0.0.1:8001
-cloud.storage.anonymous.auth=true
-cloud.storage.cache.policy=lazy
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetLifecycleManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetLifecycleManager.java
index ac66f33..070fe65 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetLifecycleManager.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetLifecycleManager.java
@@ -162,12 +162,14 @@
     void flushDataset(IReplicationStrategy replicationStrategy, IntPredicate partitions) throws HyracksDataException;
 
     /**
-     * Waits for all ongoing IO operations on all open datasets that are matching {@code replicationStrategy}.
+     * Waits for all ongoing IO operations on all open datasets that are matching {@code replicationStrategy} and
+     * {@code partition}.
      *
      * @param replicationStrategy
+     * @param partition
      * @throws HyracksDataException
      */
-    void waitForIO(IReplicationStrategy replicationStrategy) throws HyracksDataException;
+    void waitForIO(IReplicationStrategy replicationStrategy, int partition) throws HyracksDataException;
 
     /**
      * @return the current datasets io stats
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/BaseOperationTracker.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/BaseOperationTracker.java
index 5964bb4..7d3dba4 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/BaseOperationTracker.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/BaseOperationTracker.java
@@ -33,17 +33,19 @@
     private static final Logger LOGGER = LogManager.getLogger();
     protected final int datasetID;
     protected final DatasetInfo dsInfo;
+    protected final int partition;
 
-    public BaseOperationTracker(int datasetID, DatasetInfo dsInfo) {
+    public BaseOperationTracker(int datasetID, DatasetInfo dsInfo, int partition) {
         this.datasetID = datasetID;
         this.dsInfo = dsInfo;
+        this.partition = partition;
     }
 
     @Override
     public void beforeOperation(ILSMIndex index, LSMOperationType opType, ISearchOperationCallback searchCallback,
             IModificationOperationCallback modificationCallback) throws HyracksDataException {
         if (opType == LSMOperationType.REPLICATE) {
-            dsInfo.declareActiveIOOperation(REPLICATE);
+            dsInfo.declareActiveIOOperation(REPLICATE, partition);
         }
     }
 
@@ -59,7 +61,7 @@
     public void completeOperation(ILSMIndex index, LSMOperationType opType, ISearchOperationCallback searchCallback,
             IModificationOperationCallback modificationCallback) throws HyracksDataException {
         if (opType == LSMOperationType.REPLICATE) {
-            dsInfo.undeclareActiveIOOperation(REPLICATE);
+            dsInfo.undeclareActiveIOOperation(REPLICATE, partition);
         }
     }
 
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetInfo.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetInfo.java
index d15d9be..87a3c2f 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetInfo.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetInfo.java
@@ -33,12 +33,16 @@
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
+import it.unimi.dsi.fastutil.ints.Int2IntMap;
+import it.unimi.dsi.fastutil.ints.Int2IntOpenHashMap;
+
 public class DatasetInfo extends Info implements Comparable<DatasetInfo> {
     private static final Logger LOGGER = LogManager.getLogger();
     // partition -> index
     private final Map<Integer, Set<IndexInfo>> partitionIndexes;
     // resourceID -> index
     private final Map<Long, IndexInfo> indexes;
+    private final Int2IntMap partitionPendingIO;
     private final int datasetID;
     private final ILogManager logManager;
     private final LogRecord waitLog = new LogRecord();
@@ -54,6 +58,7 @@
     public DatasetInfo(int datasetID, ILogManager logManager) {
         this.partitionIndexes = new HashMap<>();
         this.indexes = new HashMap<>();
+        this.partitionPendingIO = new Int2IntOpenHashMap();
         this.setLastAccess(-1);
         this.datasetID = datasetID;
         this.setRegistered(false);
@@ -74,7 +79,8 @@
         setLastAccess(System.currentTimeMillis());
     }
 
-    public synchronized void declareActiveIOOperation(ILSMIOOperation.LSMIOOperationType opType) {
+    public synchronized void declareActiveIOOperation(ILSMIOOperation.LSMIOOperationType opType, int partition) {
+        partitionPendingIO.put(partition, partitionPendingIO.getOrDefault(partition, 0) + 1);
         numActiveIOOps++;
         switch (opType) {
             case FLUSH:
@@ -91,7 +97,8 @@
         }
     }
 
-    public synchronized void undeclareActiveIOOperation(ILSMIOOperation.LSMIOOperationType opType) {
+    public synchronized void undeclareActiveIOOperation(ILSMIOOperation.LSMIOOperationType opType, int partition) {
+        partitionPendingIO.put(partition, partitionPendingIO.getOrDefault(partition, 0) - 1);
         numActiveIOOps--;
         switch (opType) {
             case FLUSH:
@@ -253,6 +260,26 @@
         }
     }
 
+    public void waitForIO(int partition) throws HyracksDataException {
+        logManager.log(waitLog);
+        synchronized (this) {
+            while (partitionPendingIO.getOrDefault(partition, 0) > 0) {
+                try {
+                    wait();
+                } catch (InterruptedException e) {
+                    Thread.currentThread().interrupt();
+                    throw HyracksDataException.create(e);
+                }
+            }
+            if (partitionPendingIO.getOrDefault(partition, 0) < 0) {
+                LOGGER.error("number of IO operations cannot be negative for dataset {}, partition {}", this,
+                        partition);
+                throw new IllegalStateException(
+                        "Number of IO operations cannot be negative: " + this + ", partition " + partition);
+            }
+        }
+    }
+
     public synchronized int getPendingFlushes() {
         return pendingFlushes;
     }
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
index 7a109cc..34f5114 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
@@ -559,10 +559,10 @@
     }
 
     @Override
-    public void waitForIO(IReplicationStrategy replicationStrategy) throws HyracksDataException {
+    public void waitForIO(IReplicationStrategy replicationStrategy, int partition) throws HyracksDataException {
         for (DatasetResource dsr : datasets.values()) {
             if (dsr.isOpen() && replicationStrategy.isMatch(dsr.getDatasetID())) {
-                dsr.getDatasetInfo().waitForIO();
+                dsr.getDatasetInfo().waitForIO(partition);
             }
         }
     }
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java
index 3ec2d22..2e32fb3 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java
@@ -58,7 +58,6 @@
 import org.apache.hyracks.storage.am.lsm.common.impls.LSMComponentId;
 import org.apache.hyracks.storage.common.IModificationOperationCallback;
 import org.apache.hyracks.storage.common.ISearchOperationCallback;
-import org.apache.hyracks.util.ExitUtil;
 import org.apache.hyracks.util.annotations.NotThreadSafe;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
@@ -66,7 +65,6 @@
 @NotThreadSafe
 public class PrimaryIndexOperationTracker extends BaseOperationTracker implements IoOperationCompleteListener {
     private static final Logger LOGGER = LogManager.getLogger();
-    private final int partition;
     // Number of active operations on an ILSMIndex instance.
     private final AtomicInteger numActiveOperations;
     private final ILogManager logManager;
@@ -80,8 +78,7 @@
 
     public PrimaryIndexOperationTracker(int datasetID, int partition, ILogManager logManager, DatasetInfo dsInfo,
             ILSMComponentIdGenerator idGenerator, IIndexCheckpointManagerProvider indexCheckpointManagerProvider) {
-        super(datasetID, dsInfo);
-        this.partition = partition;
+        super(datasetID, dsInfo, partition);
         this.logManager = logManager;
         this.numActiveOperations = new AtomicInteger();
         this.idGenerator = idGenerator;
@@ -161,10 +158,10 @@
                 }
             }
             if (primaryLsmIndex == null) {
-                LOGGER.fatal(
-                        "Primary index not found in dataset {} and partition {} open indexes {}; halting to clear memory state",
+                LOGGER.warn(
+                        "Primary index not found in dataset {} and partition {} open indexes {}; possible secondary index leaked files",
                         dsInfo.getDatasetID(), partition, indexes);
-                ExitUtil.halt(ExitUtil.EC_INCONSISTENT_STORAGE_REFERENCES);
+                return;
             }
             for (ILSMIndex lsmIndex : indexes) {
                 ILSMOperationTracker opTracker = lsmIndex.getOperationTracker();
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMIOOperationCallback.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMIOOperationCallback.java
index 9ab4848..150d5ee 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMIOOperationCallback.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMIOOperationCallback.java
@@ -72,6 +72,7 @@
     private final IIndexCheckpointManagerProvider indexCheckpointManagerProvider;
     protected final DatasetInfo dsInfo;
     protected final ILSMIndex lsmIndex;
+    private final int partition;
     private long firstLsnForCurrentMemoryComponent = 0L;
     private long persistenceLsn = 0L;
     private int pendingFlushes = 0;
@@ -82,6 +83,7 @@
         this.dsInfo = dsInfo;
         this.lsmIndex = lsmIndex;
         this.indexCheckpointManagerProvider = indexCheckpointManagerProvider;
+        this.partition = ResourceReference.ofIndex(lsmIndex.getIndexIdentifier()).getPartitionNum();
         componentIds.add(componentId);
     }
 
@@ -257,7 +259,7 @@
 
     @Override
     public synchronized void scheduled(ILSMIOOperation operation) throws HyracksDataException {
-        dsInfo.declareActiveIOOperation(operation.getIOOpertionType());
+        dsInfo.declareActiveIOOperation(operation.getIOOpertionType(), partition);
         if (operation.getIOOpertionType() == LSMIOOperationType.FLUSH) {
             pendingFlushes++;
             FlushOperation flush = (FlushOperation) operation;
@@ -280,7 +282,7 @@
                         pendingFlushes == 0 ? firstLsnForCurrentMemoryComponent : (Long) map.get(KEY_FLUSH_LOG_LSN);
             }
         }
-        dsInfo.undeclareActiveIOOperation(operation.getIOOpertionType());
+        dsInfo.undeclareActiveIOOperation(operation.getIOOpertionType(), partition);
     }
 
     public synchronized boolean hasPendingFlush() {
diff --git a/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/LSMIOOperationCallbackTest.java b/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/LSMIOOperationCallbackTest.java
index db0911b..33d513f 100644
--- a/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/LSMIOOperationCallbackTest.java
+++ b/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/LSMIOOperationCallbackTest.java
@@ -77,6 +77,7 @@
         String indexId = "mockIndexId";
         Mockito.when(mockIndex.getNumberOfAllMemoryComponents()).thenReturn(numMemoryComponents);
         Mockito.when(mockIndex.getCurrentMemoryComponent()).thenReturn(Mockito.mock(AbstractLSMMemoryComponent.class));
+        Mockito.when(mockIndex.getIndexIdentifier()).thenReturn(getIndexPath());
         DatasetInfo dsInfo = new DatasetInfo(101, null);
         LSMComponentIdGenerator idGenerator = new LSMComponentIdGenerator(numMemoryComponents, MIN_VALID_COMPONENT_ID);
         LSMIOOperationCallback callback = new LSMIOOperationCallback(dsInfo, mockIndex, idGenerator.getId(),
@@ -140,6 +141,7 @@
         ILSMComponentIdGenerator idGenerator = new LSMComponentIdGenerator(numMemoryComponents, MIN_VALID_COMPONENT_ID);
         ILSMIndex mockIndex = Mockito.mock(ILSMIndex.class);
         Mockito.when(mockIndex.getNumberOfAllMemoryComponents()).thenReturn(numMemoryComponents);
+        Mockito.when(mockIndex.getIndexIdentifier()).thenReturn(getIndexPath());
         ILSMMemoryComponent mockComponent = Mockito.mock(AbstractLSMMemoryComponent.class);
         Mockito.when(mockIndex.getCurrentMemoryComponent()).thenReturn(mockComponent);
         LSMIOOperationCallback callback = new LSMIOOperationCallback(dsInfo, mockIndex, idGenerator.getId(),
@@ -161,6 +163,7 @@
         ILSMComponentIdGenerator idGenerator = new LSMComponentIdGenerator(numMemoryComponents, MIN_VALID_COMPONENT_ID);
         ILSMIndex mockIndex = Mockito.mock(ILSMIndex.class);
         Mockito.when(mockIndex.getNumberOfAllMemoryComponents()).thenReturn(numMemoryComponents);
+        Mockito.when(mockIndex.getIndexIdentifier()).thenReturn(getIndexPath());
         ILSMMemoryComponent mockComponent = Mockito.mock(AbstractLSMMemoryComponent.class);
         Mockito.when(mockIndex.getCurrentMemoryComponent()).thenReturn(mockComponent);
         LSMIOOperationCallback callback = new LSMIOOperationCallback(dsInfo, mockIndex, idGenerator.getId(),
@@ -221,4 +224,8 @@
         Mockito.doReturn(indexCheckpointManager).when(indexCheckpointManagerProvider).get(Mockito.any());
         return indexCheckpointManagerProvider;
     }
+
+    private static String getIndexPath() {
+        return "storage/partition_0/dataverse/dataset/0/index";
+    }
 }
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/RMIClientFactory.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/RMIClientFactory.java
index 515e763..ce459e2 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/RMIClientFactory.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/RMIClientFactory.java
@@ -29,6 +29,7 @@
 import javax.net.ssl.SSLSocketFactory;
 
 import org.apache.hyracks.api.network.INetworkSecurityConfig;
+import org.apache.hyracks.api.network.INetworkSecurityManager;
 import org.apache.hyracks.ipc.security.NetworkSecurityManager;
 
 public class RMIClientFactory implements RMIClientSocketFactory, Serializable {
@@ -37,11 +38,21 @@
     private final INetworkSecurityConfig config;
     private transient SocketFactory socketFactory;
 
-    public RMIClientFactory(INetworkSecurityConfig config) {
+    private RMIClientFactory(INetworkSecurityConfig config) {
         this.config = config;
 
     }
 
+    public static RMIClientSocketFactory getSocketFactory(INetworkSecurityManager securityManager) {
+        // clients need to have the client factory on their classpath- to enable older clients, only use
+        // our client socket factory when SSL is enabled
+        INetworkSecurityConfig config = securityManager.getConfiguration();
+        if (config.isSslEnabled()) {
+            return new RMIClientFactory(config);
+        }
+        return null;
+    }
+
     public Socket createSocket(String host, int port) throws IOException {
         synchronized (this) {
             if (socketFactory == null) {
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/RMIServerFactory.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/RMIServerFactory.java
index 9506c5a..0128a87 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/RMIServerFactory.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/RMIServerFactory.java
@@ -19,8 +19,10 @@
 package org.apache.asterix.metadata;
 
 import java.io.IOException;
+import java.net.InetAddress;
 import java.net.ServerSocket;
 import java.rmi.server.RMIServerSocketFactory;
+import java.util.Optional;
 
 import javax.net.ServerSocketFactory;
 
@@ -28,17 +30,34 @@
 
 public class RMIServerFactory implements RMIServerSocketFactory {
 
+    // default backlog used by the JDK (e.g. sun.security.ssl.SSLServerSocketFactoryImpl)
+    private static final int DEFAULT_BACKLOG = 50;
     private final INetworkSecurityManager securityManager;
 
-    public RMIServerFactory(INetworkSecurityManager securityManager) {
+    private RMIServerFactory(INetworkSecurityManager securityManager) {
         this.securityManager = securityManager;
     }
 
+    public static RMIServerSocketFactory getSocketFactory(INetworkSecurityManager securityManager) {
+        if (securityManager.getConfiguration().isSslEnabled()) {
+            return new RMIServerFactory(securityManager);
+        }
+        return null;
+    }
+
     @Override
     public ServerSocket createServerSocket(int port) throws IOException {
+        ServerSocketFactory socketFactory;
         if (securityManager.getConfiguration().isSslEnabled()) {
-            return securityManager.newSSLContext().getServerSocketFactory().createServerSocket(port);
+            socketFactory = securityManager.newSSLContext().getServerSocketFactory();
+        } else {
+            socketFactory = ServerSocketFactory.getDefault();
         }
-        return ServerSocketFactory.getDefault().createServerSocket(port);
+        Optional<InetAddress> rmiBindAddress = securityManager.getConfiguration().getRMIBindAddress();
+        if (rmiBindAddress.isPresent()) {
+            return socketFactory.createServerSocket(port, DEFAULT_BACKLOG, rmiBindAddress.get());
+        } else {
+            return socketFactory.createServerSocket(port);
+        }
     }
 }
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/AsterixStateProxy.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/AsterixStateProxy.java
index 2104fdf..cedcccf 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/AsterixStateProxy.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/AsterixStateProxy.java
@@ -43,18 +43,9 @@
 
     public static IAsterixStateProxy registerRemoteObject(INetworkSecurityManager networkSecurityManager,
             int metadataCallbackPort) throws RemoteException {
-        IAsterixStateProxy stub;
-        // clients need to have the client factory on their classpath- to enable older clients, only use
-        // our client socket factory when SSL is enabled
-        if (networkSecurityManager.getConfiguration().isSslEnabled()) {
-            final RMIServerFactory serverSocketFactory = new RMIServerFactory(networkSecurityManager);
-            final RMIClientFactory clientSocketFactory =
-                    new RMIClientFactory(networkSecurityManager.getConfiguration());
-            stub = (IAsterixStateProxy) UnicastRemoteObject.exportObject(cc, metadataCallbackPort, clientSocketFactory,
-                    serverSocketFactory);
-        } else {
-            stub = (IAsterixStateProxy) UnicastRemoteObject.exportObject(cc, metadataCallbackPort);
-        }
+        IAsterixStateProxy stub = (IAsterixStateProxy) UnicastRemoteObject.exportObject(cc, metadataCallbackPort,
+                RMIClientFactory.getSocketFactory(networkSecurityManager),
+                RMIServerFactory.getSocketFactory(networkSecurityManager));
         LOGGER.info("Asterix Distributed State Proxy Bound");
         return stub;
     }
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/NodeGroup.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/NodeGroup.java
index a8d027f..a50fd4a 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/NodeGroup.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/NodeGroup.java
@@ -20,6 +20,7 @@
 package org.apache.asterix.metadata.entities;
 
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 
@@ -42,7 +43,7 @@
         this.nodeNames = nodeNames;
     }
 
-    public static NodeGroup createOrdered(String groupName, List<String> nodeNames) {
+    public static NodeGroup createOrdered(String groupName, Collection<String> nodeNames) {
         List<String> sortedNodeNames = new ArrayList<>(nodeNames);
         Collections.sort(sortedNodeNames);
         return new NodeGroup(groupName, sortedNodeNames);
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java
index 0cbdc24..88eafcd 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java
@@ -672,7 +672,7 @@
             nodeGroup = nodeGroup + "_" + UUID.randomUUID();
             appCtx.getMetadataLockManager().acquireNodeGroupWriteLock(metadataProvider.getLocks(), nodeGroup);
         }
-        MetadataManager.INSTANCE.addNodegroup(mdTxnCtx, NodeGroup.createOrdered(nodeGroup, new ArrayList<>(ncNames)));
+        MetadataManager.INSTANCE.addNodegroup(mdTxnCtx, NodeGroup.createOrdered(nodeGroup, ncNames));
         return nodeGroup;
     }
 
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/api/ReplicationDestination.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/api/ReplicationDestination.java
index d803756..f7a739b 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/api/ReplicationDestination.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/api/ReplicationDestination.java
@@ -20,7 +20,10 @@
 
 import java.io.IOException;
 import java.net.InetSocketAddress;
+import java.util.ArrayDeque;
+import java.util.HashMap;
 import java.util.HashSet;
+import java.util.Map;
 import java.util.Objects;
 import java.util.Optional;
 import java.util.Set;
@@ -29,6 +32,7 @@
 import org.apache.asterix.common.exceptions.ReplicationException;
 import org.apache.asterix.common.replication.IPartitionReplica;
 import org.apache.asterix.common.replication.IReplicationDestination;
+import org.apache.asterix.common.storage.ReplicaIdentifier;
 import org.apache.asterix.replication.management.NetworkingUtil;
 import org.apache.asterix.replication.messaging.ReplicationProtocol;
 import org.apache.hyracks.api.network.ISocketChannel;
@@ -41,6 +45,7 @@
     private static final Logger LOGGER = LogManager.getLogger();
     private final Set<IPartitionReplica> replicas = new HashSet<>();
     private final InetSocketAddress inputLocation;
+    private final Map<ReplicaIdentifier, ArrayDeque<PartitionReplica>> replicasConnPool = new HashMap<>();
     private InetSocketAddress resolvedLocation;
     private ISocketChannel logRepChannel;
 
@@ -64,6 +69,11 @@
     @Override
     public synchronized void remove(IPartitionReplica replica) {
         replicas.remove(replica);
+        ArrayDeque<PartitionReplica> partitionConnections = replicasConnPool.remove(replica.getIdentifier());
+        if (partitionConnections != null) {
+            partitionConnections.forEach(PartitionReplica::close);
+            partitionConnections.clear();
+        }
     }
 
     @Override
@@ -138,4 +148,26 @@
     public int hashCode() {
         return Objects.hash(inputLocation);
     }
+
+    public synchronized PartitionReplica getPartitionReplicaConnection(ReplicaIdentifier identifier,
+            INcApplicationContext appCtx) {
+        ArrayDeque<PartitionReplica> partitionReplicas =
+                replicasConnPool.computeIfAbsent(identifier, k -> new ArrayDeque<>());
+        if (!partitionReplicas.isEmpty()) {
+            return partitionReplicas.remove();
+        }
+        return new PartitionReplica(identifier, appCtx);
+    }
+
+    public synchronized void recycleConnection(PartitionReplica partitionReplica) {
+        ArrayDeque<PartitionReplica> partitionReplicas = replicasConnPool.get(partitionReplica.getIdentifier());
+        if (partitionReplicas != null) {
+            partitionReplicas.add(partitionReplica);
+        }
+    }
+
+    public synchronized void closeConnections() {
+        replicasConnPool
+                .forEach(((identifier, partitionReplicas) -> partitionReplicas.forEach(PartitionReplica::close)));
+    }
 }
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/IndexReplicationManager.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/IndexReplicationManager.java
index 063709a..8c514bd 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/IndexReplicationManager.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/IndexReplicationManager.java
@@ -21,22 +21,21 @@
 import java.util.HashSet;
 import java.util.Optional;
 import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.asterix.common.api.INcApplicationContext;
 import org.apache.asterix.common.exceptions.ReplicationException;
-import org.apache.asterix.common.replication.IPartitionReplica;
 import org.apache.asterix.common.replication.IReplicationDestination;
 import org.apache.asterix.common.replication.IReplicationManager;
 import org.apache.asterix.common.replication.IReplicationStrategy;
 import org.apache.asterix.common.storage.DatasetResourceReference;
-import org.apache.asterix.common.storage.ResourceReference;
-import org.apache.asterix.replication.api.PartitionReplica;
 import org.apache.asterix.replication.api.ReplicationDestination;
-import org.apache.asterix.replication.sync.IndexSynchronizer;
 import org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.replication.IReplicationJob;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationScheduler;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexReplicationJob;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
@@ -45,13 +44,15 @@
 
     private static final Logger LOGGER = LogManager.getLogger();
     private final IReplicationManager replicationManager;
-    private final Set<ReplicationDestination> destinations = new HashSet<>();
+    private final Set<ReplicationDestination> destinations = ConcurrentHashMap.newKeySet();
     private final LinkedBlockingQueue<IReplicationJob> replicationJobsQ = new LinkedBlockingQueue<>();
     private final IReplicationStrategy replicationStrategy;
     private final PersistentLocalResourceRepository resourceRepository;
     private final INcApplicationContext appCtx;
+    private final ILSMIOOperationScheduler ioScheduler;
     private final Object transferLock = new Object();
     private final Set<ReplicationDestination> failedDest = new HashSet<>();
+    private final AtomicInteger pendingRepOpsCount = new AtomicInteger();
 
     public IndexReplicationManager(INcApplicationContext appCtx, IReplicationManager replicationManager) {
         this.appCtx = appCtx;
@@ -59,6 +60,8 @@
         this.resourceRepository = (PersistentLocalResourceRepository) appCtx.getLocalResourceRepository();
         replicationStrategy = replicationManager.getReplicationStrategy();
         appCtx.getThreadExecutor().execute(new ReplicationJobsProcessor());
+        ioScheduler = appCtx.getStorageComponentProvider().getIoOperationSchedulerProvider()
+                .getIoScheduler(appCtx.getServiceContext());
     }
 
     public void register(ReplicationDestination dest) {
@@ -72,12 +75,18 @@
     public void unregister(IReplicationDestination dest) {
         synchronized (transferLock) {
             LOGGER.info(() -> "unregister " + dest);
+            for (ReplicationDestination existingDest : destinations) {
+                if (existingDest.equals(dest)) {
+                    existingDest.closeConnections();
+                    break;
+                }
+            }
             destinations.remove(dest);
             failedDest.remove(dest);
         }
     }
 
-    private void handleFailure(ReplicationDestination dest, Exception e) {
+    public void handleFailure(ReplicationDestination dest, Exception e) {
         synchronized (transferLock) {
             if (failedDest.contains(dest)) {
                 return;
@@ -87,6 +96,7 @@
                 LOGGER.error("replica at {} failed", dest);
                 failedDest.add(dest);
             }
+            dest.closeConnections();
             replicationManager.notifyFailure(dest, e);
         }
     }
@@ -99,71 +109,62 @@
         process(job);
     }
 
-    private void process(IReplicationJob job) {
-        try {
-            if (skip(job)) {
-                return;
-            }
-            synchronized (transferLock) {
-                if (destinations.isEmpty()) {
-                    return;
-                }
-                final IndexSynchronizer synchronizer = new IndexSynchronizer(job, appCtx);
-                final int indexPartition = getJobPartition(job);
-                for (ReplicationDestination dest : destinations) {
-                    try {
-                        Optional<IPartitionReplica> partitionReplica = dest.getPartitionReplica(indexPartition);
-                        if (!partitionReplica.isPresent()) {
-                            continue;
-                        }
-                        PartitionReplica replica = (PartitionReplica) partitionReplica.get();
-                        synchronizer.sync(replica);
-                    } catch (Exception e) {
-                        handleFailure(dest, e);
-                    }
-                }
-                closeChannels();
-            }
-        } finally {
-            afterReplication(job);
+    public Set<ReplicationDestination> getDestinations() {
+        synchronized (transferLock) {
+            return destinations;
         }
     }
 
-    private boolean skip(IReplicationJob job) {
-        try {
-            final String fileToReplicate = job.getAnyFile();
-            final Optional<DatasetResourceReference> indexFileRefOpt =
-                    resourceRepository.getLocalResourceReference(fileToReplicate);
-            if (!indexFileRefOpt.isPresent()) {
-                LOGGER.warn("skipping replication of {} due to missing dataset resource reference", fileToReplicate);
-                return true;
+    private void process(IReplicationJob job) {
+        pendingRepOpsCount.incrementAndGet();
+        Optional<DatasetResourceReference> jobIndexRefOpt = getJobIndexRef(job);
+        if (jobIndexRefOpt.isEmpty()) {
+            LOGGER.warn("skipping replication of {} due to missing dataset resource reference", job.getAnyFile());
+            afterReplication(job);
+            return;
+        }
+        ReplicationOperation rp = new ReplicationOperation(appCtx, jobIndexRefOpt.get(), job, this);
+        if (job.getExecutionType() == IReplicationJob.ReplicationExecutionType.SYNC) {
+            rp.call();
+        } else {
+            try {
+                ioScheduler.scheduleOperation(rp);
+            } catch (HyracksDataException e) {
+                throw new ReplicationException(e);
             }
-            return !replicationStrategy.isMatch(indexFileRefOpt.get().getDatasetId());
+        }
+    }
+
+    public boolean skip(DatasetResourceReference indexRef) {
+        return !replicationStrategy.isMatch(indexRef.getDatasetId());
+    }
+
+    public Optional<DatasetResourceReference> getJobIndexRef(IReplicationJob job) {
+        final String fileToReplicate = job.getAnyFile();
+        try {
+            return resourceRepository.getLocalResourceReference(fileToReplicate);
         } catch (HyracksDataException e) {
             throw new IllegalStateException("Couldn't find resource for " + job.getAnyFile(), e);
         }
     }
 
-    private int getJobPartition(IReplicationJob job) {
-        return ResourceReference.of(job.getAnyFile()).getPartitionNum();
-    }
-
     private void closeChannels() {
-        if (!replicationJobsQ.isEmpty()) {
-            return;
-        }
         LOGGER.trace("no pending replication jobs; closing connections to replicas");
         for (ReplicationDestination dest : destinations) {
-            dest.getReplicas().stream().map(PartitionReplica.class::cast).forEach(PartitionReplica::close);
+            dest.closeConnections();
         }
     }
 
-    private static void afterReplication(IReplicationJob job) {
+    public void afterReplication(IReplicationJob job) {
         try {
+            int pendingOps = pendingRepOpsCount.decrementAndGet();
             if (job.getOperation() == IReplicationJob.ReplicationOperation.REPLICATE
                     && job instanceof ILSMIndexReplicationJob) {
                 ((ILSMIndexReplicationJob) job).endReplication();
             }
+            if (pendingOps == 0 && replicationJobsQ.isEmpty()) {
+                closeChannels();
+            }
         } catch (HyracksDataException e) {
             throw new ReplicationException(e);
         }
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationOperation.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationOperation.java
new file mode 100644
index 0000000..258f24a
--- /dev/null
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationOperation.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.replication.management;
+
+import java.util.Optional;
+import java.util.Set;
+
+import org.apache.asterix.common.api.INcApplicationContext;
+import org.apache.asterix.common.replication.IPartitionReplica;
+import org.apache.asterix.common.storage.DatasetResourceReference;
+import org.apache.asterix.replication.api.PartitionReplica;
+import org.apache.asterix.replication.api.ReplicationDestination;
+import org.apache.asterix.replication.sync.IndexSynchronizer;
+import org.apache.hyracks.api.replication.IReplicationJob;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback;
+import org.apache.hyracks.storage.am.lsm.common.impls.AbstractIoOperation;
+import org.apache.hyracks.storage.am.lsm.common.impls.LSMComponentFileReferences;
+import org.apache.hyracks.storage.am.lsm.common.impls.NoOpIOOperationCallbackFactory;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+public class ReplicationOperation extends AbstractIoOperation {
+
+    private static final Logger LOGGER = LogManager.getLogger();
+
+    private static final ILSMIOOperationCallback INSTANCE =
+            NoOpIOOperationCallbackFactory.INSTANCE.createIoOpCallback(null);
+    private final INcApplicationContext appCtx;
+    private final DatasetResourceReference indexRef;
+    private final IReplicationJob job;
+    private final IndexReplicationManager indexReplicationManager;
+
+    public ReplicationOperation(INcApplicationContext appCtx, DatasetResourceReference indexRef, IReplicationJob job,
+            IndexReplicationManager indexReplicationManager) {
+        super(null, null, INSTANCE, indexRef.getRelativePath().toString());
+        this.appCtx = appCtx;
+        this.indexRef = indexRef;
+        this.job = job;
+        this.indexReplicationManager = indexReplicationManager;
+    }
+
+    @Override
+    public LSMIOOperationType getIOOpertionType() {
+        return LSMIOOperationType.REPLICATE;
+    }
+
+    @Override
+    public LSMIOOperationStatus call() {
+        try {
+            Set<ReplicationDestination> destinations = indexReplicationManager.getDestinations();
+            if (destinations.isEmpty() || indexReplicationManager.skip(indexRef)) {
+                return LSMIOOperationStatus.SUCCESS;
+            }
+            LOGGER.debug("started replicate operation on index {}", indexRef);
+            final IndexSynchronizer synchronizer = new IndexSynchronizer(job, appCtx);
+            final int indexPartition = indexRef.getPartitionId();
+            for (ReplicationDestination dest : destinations) {
+                Optional<IPartitionReplica> partitionReplica = dest.getPartitionReplica(indexPartition);
+                if (partitionReplica.isEmpty()) {
+                    continue;
+                }
+                PartitionReplica destReplica = null;
+                try {
+                    destReplica = dest.getPartitionReplicaConnection(partitionReplica.get().getIdentifier(), appCtx);
+                    synchronizer.sync(destReplica);
+                    dest.recycleConnection(destReplica);
+                } catch (Exception e) {
+                    if (destReplica != null) {
+                        destReplica.close();
+                    }
+                    indexReplicationManager.handleFailure(dest, e);
+                }
+            }
+            LOGGER.debug("completed replicate operation on index {}", indexRef);
+            return LSMIOOperationStatus.SUCCESS;
+        } finally {
+            indexReplicationManager.afterReplication(job);
+        }
+    }
+
+    @Override
+    protected LSMComponentFileReferences getComponentFiles() {
+        return null;
+    }
+
+    @Override
+    public long getRemainingPages() {
+        return 0;
+    }
+}
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaSynchronizer.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaSynchronizer.java
index 459ff01..68ccd54 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaSynchronizer.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaSynchronizer.java
@@ -103,6 +103,6 @@
     private void waitForReplicatedDatasetsIO() throws HyracksDataException {
         // wait for IO operations to ensure replicated datasets files won't change during replica sync
         final IReplicationStrategy replStrategy = appCtx.getReplicationManager().getReplicationStrategy();
-        appCtx.getDatasetLifecycleManager().waitForIO(replStrategy);
+        appCtx.getDatasetLifecycleManager().waitForIO(replStrategy, replica.getIdentifier().getPartition());
     }
 }
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexOperationTrackerFactory.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexOperationTrackerFactory.java
index a104ae3..827b713 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexOperationTrackerFactory.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexOperationTrackerFactory.java
@@ -21,6 +21,7 @@
 import org.apache.asterix.common.api.IDatasetLifecycleManager;
 import org.apache.asterix.common.api.INcApplicationContext;
 import org.apache.asterix.common.context.BaseOperationTracker;
+import org.apache.asterix.common.utils.StoragePathUtil;
 import org.apache.hyracks.api.application.INCServiceContext;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.io.IJsonSerializable;
@@ -46,7 +47,8 @@
     public ILSMOperationTracker getOperationTracker(INCServiceContext ctx, IResource resource) {
         IDatasetLifecycleManager dslcManager =
                 ((INcApplicationContext) ctx.getApplicationContext()).getDatasetLifecycleManager();
-        return new BaseOperationTracker(datasetId, dslcManager.getDatasetInfo(datasetId));
+        int partition = StoragePathUtil.getPartitionNumFromRelativePath(resource.getPath());
+        return new BaseOperationTracker(datasetId, dslcManager.getDatasetInfo(datasetId), partition);
     }
 
     @Override
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManagerWithReplication.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManagerWithReplication.java
index 8a1cc65..1c614c9 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManagerWithReplication.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManagerWithReplication.java
@@ -18,6 +18,8 @@
  */
 package org.apache.asterix.transaction.management.service.logging;
 
+import java.util.concurrent.TimeUnit;
+
 import org.apache.asterix.common.exceptions.ACIDException;
 import org.apache.asterix.common.replication.IReplicationManager;
 import org.apache.asterix.common.replication.IReplicationStrategy;
@@ -26,6 +28,7 @@
 import org.apache.asterix.common.transactions.LogSource;
 import org.apache.asterix.common.transactions.LogType;
 import org.apache.hyracks.api.util.InvokeUtil;
+import org.apache.logging.log4j.Logger;
 
 import it.unimi.dsi.fastutil.longs.LongOpenHashSet;
 import it.unimi.dsi.fastutil.longs.LongSet;
@@ -33,12 +36,16 @@
 
 public class LogManagerWithReplication extends LogManager {
 
+    private static final Logger LOGGER = org.apache.logging.log4j.LogManager.getLogger();
     private IReplicationManager replicationManager;
     private IReplicationStrategy replicationStrategy;
     private final LongSet replicatedTxn = LongSets.synchronize(new LongOpenHashSet());
+    private final long replicationTimeoutMillis;
 
     public LogManagerWithReplication(ITransactionSubsystem txnSubsystem) {
         super(txnSubsystem);
+        replicationTimeoutMillis = TimeUnit.SECONDS
+                .toMillis(txnSubsystem.getApplicationContext().getReplicationProperties().getReplicationTimeOut());
     }
 
     @SuppressWarnings("squid:S2445")
@@ -94,8 +101,18 @@
                     //wait for job Commit/Abort ACK from replicas
                     if (logRecord.isReplicate() && (logRecord.getLogType() == LogType.JOB_COMMIT
                             || logRecord.getLogType() == LogType.ABORT)) {
+                        long replicationTimeOut = replicationTimeoutMillis;
                         while (!logRecord.isReplicated()) {
-                            logRecord.wait();
+                            if (replicationTimeOut <= 0) {
+                                LOGGER.warn(
+                                        "{} ms passed without receiving acks for log {}; setting log as replicated due to timeout",
+                                        replicationTimeoutMillis, logRecord.getLogRecordForDisplay());
+                                logRecord.setReplicated(true);
+                                continue;
+                            }
+                            final long startTime = System.nanoTime();
+                            logRecord.wait(replicationTimeOut);
+                            replicationTimeOut -= TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTime);
                         }
                     }
                 }
diff --git a/asterixdb/pom.xml b/asterixdb/pom.xml
index 681e9a8..30a987e 100644
--- a/asterixdb/pom.xml
+++ b/asterixdb/pom.xml
@@ -2021,12 +2021,12 @@
       <dependency>
         <groupId>org.eclipse.jetty</groupId>
         <artifactId>jetty-util</artifactId>
-        <version>9.4.48.v20220622</version>
+        <version>9.4.51.v20230217</version>
       </dependency>
       <dependency>
         <groupId>org.eclipse.jetty</groupId>
         <artifactId>jetty-util-ajax</artifactId>
-        <version>9.4.48.v20220622</version>
+        <version>9.4.51.v20230217</version>
       </dependency>
       <dependency>
         <groupId>org.apache.avro</groupId>
diff --git a/asterixdb/src/main/appended-resources/supplemental-models.xml b/asterixdb/src/main/appended-resources/supplemental-models.xml
index 5d61da3..81ed7f0 100644
--- a/asterixdb/src/main/appended-resources/supplemental-models.xml
+++ b/asterixdb/src/main/appended-resources/supplemental-models.xml
@@ -163,9 +163,9 @@
       <artifactId>netty-transport</artifactId>
       <properties>
         <!-- netty is ALv2, and does not contain any embedded LICENSE or NOTICE file -->
-        <license.ignoreMissingEmbeddedLicense>4.1.87.Final</license.ignoreMissingEmbeddedLicense>
-        <license.ignoreMissingEmbeddedNotice>4.1.87.Final</license.ignoreMissingEmbeddedNotice>
-        <license.ignoreNoticeOverride>4.1.87.Final</license.ignoreNoticeOverride>
+        <license.ignoreMissingEmbeddedLicense>4.1.94.Final</license.ignoreMissingEmbeddedLicense>
+        <license.ignoreMissingEmbeddedNotice>4.1.94.Final</license.ignoreMissingEmbeddedNotice>
+        <license.ignoreNoticeOverride>4.1.94.Final</license.ignoreNoticeOverride>
       </properties>
     </project>
   </supplement>
@@ -175,9 +175,9 @@
       <artifactId>netty-transport-classes-epoll</artifactId>
       <properties>
         <!-- netty is ALv2, and does not contain any embedded LICENSE or NOTICE file -->
-        <license.ignoreMissingEmbeddedLicense>4.1.87.Final</license.ignoreMissingEmbeddedLicense>
-        <license.ignoreMissingEmbeddedNotice>4.1.87.Final</license.ignoreMissingEmbeddedNotice>
-        <license.ignoreNoticeOverride>4.1.87.Final</license.ignoreNoticeOverride>
+        <license.ignoreMissingEmbeddedLicense>4.1.94.Final</license.ignoreMissingEmbeddedLicense>
+        <license.ignoreMissingEmbeddedNotice>4.1.94.Final</license.ignoreMissingEmbeddedNotice>
+        <license.ignoreNoticeOverride>4.1.94.Final</license.ignoreNoticeOverride>
       </properties>
     </project>
   </supplement>
@@ -187,9 +187,9 @@
       <artifactId>netty-transport-native-unix-common</artifactId>
       <properties>
         <!-- netty is ALv2, and does not contain any embedded LICENSE or NOTICE file -->
-        <license.ignoreMissingEmbeddedLicense>4.1.87.Final</license.ignoreMissingEmbeddedLicense>
-        <license.ignoreMissingEmbeddedNotice>4.1.87.Final</license.ignoreMissingEmbeddedNotice>
-        <license.ignoreNoticeOverride>4.1.87.Final</license.ignoreNoticeOverride>
+        <license.ignoreMissingEmbeddedLicense>4.1.94.Final</license.ignoreMissingEmbeddedLicense>
+        <license.ignoreMissingEmbeddedNotice>4.1.94.Final</license.ignoreMissingEmbeddedNotice>
+        <license.ignoreNoticeOverride>4.1.94.Final</license.ignoreNoticeOverride>
       </properties>
     </project>
   </supplement>
@@ -199,9 +199,9 @@
       <artifactId>netty-codec</artifactId>
       <properties>
         <!-- netty is ALv2, and does not contain any embedded LICENSE or NOTICE file -->
-        <license.ignoreMissingEmbeddedLicense>4.1.87.Final</license.ignoreMissingEmbeddedLicense>
-        <license.ignoreMissingEmbeddedNotice>4.1.87.Final</license.ignoreMissingEmbeddedNotice>
-        <license.ignoreNoticeOverride>4.1.87.Final</license.ignoreNoticeOverride>
+        <license.ignoreMissingEmbeddedLicense>4.1.94.Final</license.ignoreMissingEmbeddedLicense>
+        <license.ignoreMissingEmbeddedNotice>4.1.94.Final</license.ignoreMissingEmbeddedNotice>
+        <license.ignoreNoticeOverride>4.1.94.Final</license.ignoreNoticeOverride>
       </properties>
     </project>
   </supplement>
@@ -211,9 +211,9 @@
       <artifactId>netty-codec-dns</artifactId>
       <properties>
         <!-- netty is ALv2, and does not contain any embedded LICENSE or NOTICE file -->
-        <license.ignoreMissingEmbeddedLicense>4.1.87.Final</license.ignoreMissingEmbeddedLicense>
-        <license.ignoreMissingEmbeddedNotice>4.1.87.Final</license.ignoreMissingEmbeddedNotice>
-        <license.ignoreNoticeOverride>4.1.87.Final</license.ignoreNoticeOverride>
+        <license.ignoreMissingEmbeddedLicense>4.1.94.Final</license.ignoreMissingEmbeddedLicense>
+        <license.ignoreMissingEmbeddedNotice>4.1.94.Final</license.ignoreMissingEmbeddedNotice>
+        <license.ignoreNoticeOverride>4.1.94.Final</license.ignoreNoticeOverride>
       </properties>
     </project>
   </supplement>
@@ -223,9 +223,9 @@
       <artifactId>netty-codec-http2</artifactId>
       <properties>
         <!-- netty is ALv2, and does not contain any embedded LICENSE or NOTICE file -->
-        <license.ignoreMissingEmbeddedLicense>4.1.87.Final</license.ignoreMissingEmbeddedLicense>
-        <license.ignoreMissingEmbeddedNotice>4.1.87.Final</license.ignoreMissingEmbeddedNotice>
-        <license.ignoreNoticeOverride>4.1.87.Final</license.ignoreNoticeOverride>
+        <license.ignoreMissingEmbeddedLicense>4.1.94.Final</license.ignoreMissingEmbeddedLicense>
+        <license.ignoreMissingEmbeddedNotice>4.1.94.Final</license.ignoreMissingEmbeddedNotice>
+        <license.ignoreNoticeOverride>4.1.94.Final</license.ignoreNoticeOverride>
       </properties>
     </project>
   </supplement>
@@ -235,9 +235,9 @@
       <artifactId>netty-codec-socks</artifactId>
       <properties>
         <!-- netty is ALv2, and does not contain any embedded LICENSE or NOTICE file -->
-        <license.ignoreMissingEmbeddedLicense>4.1.87.Final</license.ignoreMissingEmbeddedLicense>
-        <license.ignoreMissingEmbeddedNotice>4.1.87.Final</license.ignoreMissingEmbeddedNotice>
-        <license.ignoreNoticeOverride>4.1.87.Final</license.ignoreNoticeOverride>
+        <license.ignoreMissingEmbeddedLicense>4.1.94.Final</license.ignoreMissingEmbeddedLicense>
+        <license.ignoreMissingEmbeddedNotice>4.1.94.Final</license.ignoreMissingEmbeddedNotice>
+        <license.ignoreNoticeOverride>4.1.94.Final</license.ignoreNoticeOverride>
       </properties>
     </project>
   </supplement>
@@ -247,9 +247,9 @@
       <artifactId>netty-handler</artifactId>
       <properties>
         <!-- netty is ALv2, and does not contain any embedded LICENSE or NOTICE file -->
-        <license.ignoreMissingEmbeddedLicense>4.1.87.Final</license.ignoreMissingEmbeddedLicense>
-        <license.ignoreMissingEmbeddedNotice>4.1.87.Final</license.ignoreMissingEmbeddedNotice>
-        <license.ignoreNoticeOverride>4.1.87.Final</license.ignoreNoticeOverride>
+        <license.ignoreMissingEmbeddedLicense>4.1.94.Final</license.ignoreMissingEmbeddedLicense>
+        <license.ignoreMissingEmbeddedNotice>4.1.94.Final</license.ignoreMissingEmbeddedNotice>
+        <license.ignoreNoticeOverride>4.1.94.Final</license.ignoreNoticeOverride>
       </properties>
     </project>
   </supplement>
@@ -259,9 +259,9 @@
       <artifactId>netty-handler-proxy</artifactId>
       <properties>
         <!-- netty is ALv2, and does not contain any embedded LICENSE or NOTICE file -->
-        <license.ignoreMissingEmbeddedLicense>4.1.87.Final</license.ignoreMissingEmbeddedLicense>
-        <license.ignoreMissingEmbeddedNotice>4.1.87.Final</license.ignoreMissingEmbeddedNotice>
-        <license.ignoreNoticeOverride>4.1.87.Final</license.ignoreNoticeOverride>
+        <license.ignoreMissingEmbeddedLicense>4.1.94.Final</license.ignoreMissingEmbeddedLicense>
+        <license.ignoreMissingEmbeddedNotice>4.1.94.Final</license.ignoreMissingEmbeddedNotice>
+        <license.ignoreNoticeOverride>4.1.94.Final</license.ignoreNoticeOverride>
       </properties>
     </project>
   </supplement>
@@ -271,9 +271,9 @@
       <artifactId>netty-buffer</artifactId>
       <properties>
         <!-- netty is ALv2, and does not contain any embedded LICENSE or NOTICE file -->
-        <license.ignoreMissingEmbeddedLicense>4.1.87.Final</license.ignoreMissingEmbeddedLicense>
-        <license.ignoreMissingEmbeddedNotice>4.1.87.Final</license.ignoreMissingEmbeddedNotice>
-        <license.ignoreNoticeOverride>4.1.87.Final</license.ignoreNoticeOverride>
+        <license.ignoreMissingEmbeddedLicense>4.1.94.Final</license.ignoreMissingEmbeddedLicense>
+        <license.ignoreMissingEmbeddedNotice>4.1.94.Final</license.ignoreMissingEmbeddedNotice>
+        <license.ignoreNoticeOverride>4.1.94.Final</license.ignoreNoticeOverride>
       </properties>
     </project>
   </supplement>
@@ -283,9 +283,9 @@
       <artifactId>netty-common</artifactId>
       <properties>
         <!-- netty is ALv2, and does not contain any embedded LICENSE or NOTICE file -->
-        <license.ignoreMissingEmbeddedLicense>4.1.87.Final</license.ignoreMissingEmbeddedLicense>
-        <license.ignoreMissingEmbeddedNotice>4.1.87.Final</license.ignoreMissingEmbeddedNotice>
-        <license.ignoreNoticeOverride>4.1.87.Final</license.ignoreNoticeOverride>
+        <license.ignoreMissingEmbeddedLicense>4.1.94.Final</license.ignoreMissingEmbeddedLicense>
+        <license.ignoreMissingEmbeddedNotice>4.1.94.Final</license.ignoreMissingEmbeddedNotice>
+        <license.ignoreNoticeOverride>4.1.94.Final</license.ignoreNoticeOverride>
       </properties>
     </project>
   </supplement>
@@ -295,9 +295,9 @@
       <artifactId>netty-codec-http</artifactId>
       <properties>
         <!-- netty is ALv2, and does not contain any embedded LICENSE or NOTICE file -->
-        <license.ignoreMissingEmbeddedLicense>4.1.87.Final</license.ignoreMissingEmbeddedLicense>
-        <license.ignoreMissingEmbeddedNotice>4.1.87.Final</license.ignoreMissingEmbeddedNotice>
-        <license.ignoreNoticeOverride>4.1.87.Final</license.ignoreNoticeOverride>
+        <license.ignoreMissingEmbeddedLicense>4.1.94.Final</license.ignoreMissingEmbeddedLicense>
+        <license.ignoreMissingEmbeddedNotice>4.1.94.Final</license.ignoreMissingEmbeddedNotice>
+        <license.ignoreNoticeOverride>4.1.94.Final</license.ignoreNoticeOverride>
       </properties>
     </project>
   </supplement>
@@ -307,9 +307,9 @@
       <artifactId>netty-resolver</artifactId>
       <properties>
         <!-- netty is ALv2, and does not contain any embedded LICENSE or NOTICE file -->
-        <license.ignoreMissingEmbeddedLicense>4.1.87.Final</license.ignoreMissingEmbeddedLicense>
-        <license.ignoreMissingEmbeddedNotice>4.1.87.Final</license.ignoreMissingEmbeddedNotice>
-        <license.ignoreNoticeOverride>4.1.87.Final</license.ignoreNoticeOverride>
+        <license.ignoreMissingEmbeddedLicense>4.1.94.Final</license.ignoreMissingEmbeddedLicense>
+        <license.ignoreMissingEmbeddedNotice>4.1.94.Final</license.ignoreMissingEmbeddedNotice>
+        <license.ignoreNoticeOverride>4.1.94.Final</license.ignoreNoticeOverride>
       </properties>
     </project>
   </supplement>
@@ -319,9 +319,9 @@
       <artifactId>netty-resolver-dns</artifactId>
       <properties>
         <!-- netty is ALv2, and does not contain any embedded LICENSE or NOTICE file -->
-        <license.ignoreMissingEmbeddedLicense>4.1.87.Final</license.ignoreMissingEmbeddedLicense>
-        <license.ignoreMissingEmbeddedNotice>4.1.87.Final</license.ignoreMissingEmbeddedNotice>
-        <license.ignoreNoticeOverride>4.1.87.Final</license.ignoreNoticeOverride>
+        <license.ignoreMissingEmbeddedLicense>4.1.94.Final</license.ignoreMissingEmbeddedLicense>
+        <license.ignoreMissingEmbeddedNotice>4.1.94.Final</license.ignoreMissingEmbeddedNotice>
+        <license.ignoreNoticeOverride>4.1.94.Final</license.ignoreNoticeOverride>
       </properties>
     </project>
   </supplement>
@@ -331,9 +331,9 @@
       <artifactId>netty-transport-native-unix-common</artifactId>
       <properties>
         <!-- netty is ALv2, and does not contain any embedded LICENSE or NOTICE file -->
-        <license.ignoreMissingEmbeddedLicense>4.1.87.Final</license.ignoreMissingEmbeddedLicense>
-        <license.ignoreMissingEmbeddedNotice>4.1.87.Final</license.ignoreMissingEmbeddedNotice>
-        <license.ignoreNoticeOverride>4.1.87.Final</license.ignoreNoticeOverride>
+        <license.ignoreMissingEmbeddedLicense>4.1.94.Final</license.ignoreMissingEmbeddedLicense>
+        <license.ignoreMissingEmbeddedNotice>4.1.94.Final</license.ignoreMissingEmbeddedNotice>
+        <license.ignoreNoticeOverride>4.1.94.Final</license.ignoreNoticeOverride>
       </properties>
     </project>
   </supplement>
@@ -463,9 +463,10 @@
       <properties>
         <!-- snappy-java is ALv2, and does not contain any embedded LICENSE or NOTICE file -->
         <!-- license override not needed, ALv2 is specified in its pom.xml -->
-        <!-- see https://raw.githubusercontent.com/xerial/snappy-java/1.1.8.4/LICENSE -->
-        <license.ignoreMissingEmbeddedLicense>1.1.8.4</license.ignoreMissingEmbeddedLicense>
-        <license.ignoreMissingEmbeddedNotice>1.1.8.4</license.ignoreMissingEmbeddedNotice>
+        <!-- see https://raw.githubusercontent.com/xerial/snappy-java/v1.1.10.1/LICENSE -->
+        <license.ignoreMissingEmbeddedLicense>1.1.10.1</license.ignoreMissingEmbeddedLicense>
+        <license.ignoreMissingEmbeddedNotice>1.1.10.1</license.ignoreMissingEmbeddedNotice>
+        <license.ignoreNoticeOverride>1.1.10.1</license.ignoreNoticeOverride>
       </properties>
     </project>
   </supplement>
diff --git a/asterixdb/src/main/licenses/content/raw.githubusercontent.com_netty_netty_netty-4.1.87.Final_NOTICE.txt b/asterixdb/src/main/licenses/content/raw.githubusercontent.com_netty_netty_netty-4.1.94.Final_NOTICE.txt
similarity index 100%
rename from asterixdb/src/main/licenses/content/raw.githubusercontent.com_netty_netty_netty-4.1.87.Final_NOTICE.txt
rename to asterixdb/src/main/licenses/content/raw.githubusercontent.com_netty_netty_netty-4.1.94.Final_NOTICE.txt
diff --git a/asterixdb/src/main/licenses/content/raw.githubusercontent.com_xerial_snappy-java_v1.1.10.1_NOTICE.txt b/asterixdb/src/main/licenses/content/raw.githubusercontent.com_xerial_snappy-java_v1.1.10.1_NOTICE.txt
new file mode 100644
index 0000000..19301705
--- /dev/null
+++ b/asterixdb/src/main/licenses/content/raw.githubusercontent.com_xerial_snappy-java_v1.1.10.1_NOTICE.txt
@@ -0,0 +1,16 @@
+This product includes software developed by Google
+ Snappy: http://code.google.com/p/snappy/ (New BSD License)
+
+This product includes software developed by Apache
+ PureJavaCrc32C from apache-hadoop-common http://hadoop.apache.org/
+ (Apache 2.0 license)
+
+This library containd statically linked libstdc++. This inclusion is allowed by 
+"GCC RUntime Library Exception" 
+http://gcc.gnu.org/onlinedocs/libstdc++/manual/license.html
+
+== Contributors ==
+  * Tatu Saloranta  
+    * Providing benchmark suite
+  * Alec Wysoker
+    * Performance and memory usage improvement
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/NetworkAddress.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/NetworkAddress.java
index d74f500..75fbb92 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/NetworkAddress.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/NetworkAddress.java
@@ -25,6 +25,7 @@
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.net.UnknownHostException;
+import java.util.Objects;
 
 import org.apache.hyracks.api.io.IWritable;
 
@@ -82,6 +83,10 @@
         return inetSocketAddress;
     }
 
+    public InetSocketAddress toInetSocketAddress() {
+        return new InetSocketAddress(address, port);
+    }
+
     public int getPort() {
         return port;
     }
@@ -102,7 +107,7 @@
             return false;
         }
         NetworkAddress on = (NetworkAddress) o;
-        return on.port == port && on.address == address;
+        return on.port == port && Objects.equals(on.address, address);
     }
 
     @Override
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/network/INetworkSecurityConfig.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/network/INetworkSecurityConfig.java
index b483158..7fc0335 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/network/INetworkSecurityConfig.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/network/INetworkSecurityConfig.java
@@ -20,7 +20,9 @@
 
 import java.io.File;
 import java.io.Serializable;
+import java.net.InetAddress;
 import java.security.KeyStore;
+import java.util.Optional;
 
 public interface INetworkSecurityConfig extends Serializable {
 
@@ -65,4 +67,11 @@
      * @return the trust store file
      */
     File getTrustStoreFile();
+
+    /**
+     * The optional address to bind for RMI server sockets; or absent to bind to all addresses / interfaces.
+     *
+     * @return the optional bind address
+     */
+    Optional<InetAddress> getRMIBindAddress();
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/InvokeUtil.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/InvokeUtil.java
index 7d04cf2..57f9750 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/InvokeUtil.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/InvokeUtil.java
@@ -29,6 +29,7 @@
 import org.apache.hyracks.util.ComputingAction;
 import org.apache.hyracks.util.IDelay;
 import org.apache.hyracks.util.IOInterruptibleAction;
+import org.apache.hyracks.util.IOThrowingAction;
 import org.apache.hyracks.util.IRetryPolicy;
 import org.apache.hyracks.util.InterruptibleAction;
 import org.apache.hyracks.util.Span;
@@ -188,7 +189,7 @@
         }
     }
 
-    @SuppressWarnings({ "squid:S1181", "squid:S1193" }) // catching Throwable, instanceof of exception
+    @SuppressWarnings({ "squid:S1181", "squid:S1193", "ConstantConditions" }) // catching Throwable, instanceofs
     public static void tryWithCleanups(ThrowingAction action, ThrowingAction... cleanups) throws Exception {
         Throwable savedT = null;
         boolean suppressedInterrupted = false;
@@ -225,6 +226,43 @@
         }
     }
 
+    @SuppressWarnings({ "squid:S1181", "squid:S1193", "ConstantConditions" }) // catching Throwable, instanceofs
+    public static void tryIoWithCleanups(IOThrowingAction action, IOThrowingAction... cleanups) throws IOException {
+        Throwable savedT = null;
+        boolean suppressedInterrupted = false;
+        try {
+            action.run();
+        } catch (Throwable t) {
+            savedT = t;
+        } finally {
+            for (IOThrowingAction cleanup : cleanups) {
+                try {
+                    cleanup.run();
+                } catch (Throwable t) {
+                    if (savedT != null) {
+                        savedT.addSuppressed(t);
+                        suppressedInterrupted = suppressedInterrupted || t instanceof InterruptedException;
+                    } else {
+                        savedT = t;
+                    }
+                }
+            }
+        }
+        if (savedT == null) {
+            return;
+        }
+        if (suppressedInterrupted) {
+            Thread.currentThread().interrupt();
+        }
+        if (savedT instanceof Error) {
+            throw (Error) savedT;
+        } else if (savedT instanceof IOException) {
+            throw (IOException) savedT;
+        } else {
+            throw HyracksDataException.create(savedT);
+        }
+    }
+
     /**
      * Runs the supplied action, after suspending any pending interruption. An error will be logged if
      * the action is itself interrupted.
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/CancelJobWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/CancelJobWork.java
index e3135df..9a08e8e2 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/CancelJobWork.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/CancelJobWork.java
@@ -47,4 +47,9 @@
             callback.setException(e);
         }
     }
+
+    @Override
+    public String toString() {
+        return getName() + " jobId:" + jobId;
+    }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobletCleanupNotificationWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobletCleanupNotificationWork.java
index 727793b..c3a09f9 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobletCleanupNotificationWork.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobletCleanupNotificationWork.java
@@ -37,7 +37,7 @@
 public class JobletCleanupNotificationWork extends AbstractHeartbeatWork {
     private static final Logger LOGGER = LogManager.getLogger();
 
-    private JobId jobId;
+    private final JobId jobId;
 
     public JobletCleanupNotificationWork(ClusterControllerService ccs, JobId jobId, String nodeId) {
         super(ccs, nodeId, null);
@@ -77,4 +77,9 @@
             }
         }
     }
+
+    @Override
+    public String toString() {
+        return getName() + " jobId:" + jobId + ", nodeId:" + nodeId;
+    }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterNodeWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterNodeWork.java
index b1700fe..ec21785 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterNodeWork.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterNodeWork.java
@@ -18,6 +18,7 @@
  */
 package org.apache.hyracks.control.cc.work;
 
+import java.net.InetSocketAddress;
 import java.util.HashMap;
 import java.util.Map;
 
@@ -50,8 +51,9 @@
     protected void doRun() throws Exception {
         String id = reg.getNodeId();
         LOGGER.info("registering node: {}", id);
-        NodeControllerRemoteProxy nc = new NodeControllerRemoteProxy(ccs.getCcId(),
-                ccs.getClusterIPC().getReconnectingHandle(reg.getNodeControllerAddress().resolveInetSocketAddress()));
+        InetSocketAddress ncAddress = reg.getNodeControllerAddress().toInetSocketAddress();
+        NodeControllerRemoteProxy nc =
+                new NodeControllerRemoteProxy(ccs.getCcId(), ccs.getClusterIPC().getReconnectingHandle(ncAddress));
         INodeManager nodeManager = ccs.getNodeManager();
         NodeParameters params = new NodeParameters();
         params.setClusterControllerInfo(ccs.getClusterControllerInfo());
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/WaitForJobCompletionWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/WaitForJobCompletionWork.java
index 14d92fb..ed3e574 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/WaitForJobCompletionWork.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/WaitForJobCompletionWork.java
@@ -87,4 +87,9 @@
             });
         }
     }
+
+    @Override
+    public String toString() {
+        return getName() + " jobId:" + jobId;
+    }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
index 8c9cbfb..f69d106 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
@@ -426,7 +426,7 @@
         NodeParameters nodeParameters = ccc.getNodeParameters();
         // Start heartbeat generator.
         heartbeatManagers.computeIfAbsent(ccId, newCcId -> HeartbeatManager.init(this, ccc, hbTask.getHeartbeatData(),
-                nodeRegistration.getNodeControllerAddress().resolveInetSocketAddress()));
+                nodeRegistration.getNodeControllerAddress().toInetSocketAddress()));
         if (!ccTimers.containsKey(ccId) && nodeParameters.getProfileDumpPeriod() > 0) {
             Timer ccTimer = new Timer("Timer-" + ccId, true);
             // Schedule profile dump generator.
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/AbortTasksWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/AbortTasksWork.java
index f47e1ce..bfe3706 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/AbortTasksWork.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/AbortTasksWork.java
@@ -70,4 +70,9 @@
                     "Joblet couldn't be found. Tasks of job " + jobId + " have all either completed or failed");
         }
     }
+
+    @Override
+    public String toString() {
+        return getName() + " jobId:" + jobId;
+    }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/CleanupJobletWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/CleanupJobletWork.java
index ae2cfa0..75edd38 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/CleanupJobletWork.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/CleanupJobletWork.java
@@ -35,7 +35,7 @@
 
     private final JobId jobId;
 
-    private JobStatus status;
+    private final JobStatus status;
 
     public CleanupJobletWork(NodeControllerService ncs, JobId jobId, JobStatus status) {
         this.ncs = ncs;
@@ -54,4 +54,9 @@
             joblet.cleanup(status);
         }
     }
+
+    @Override
+    public String toString() {
+        return getName() + " jobId:" + jobId + ", status:" + status;
+    }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/StartTasksWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/StartTasksWork.java
index f6c144d..f277046 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/StartTasksWork.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/StartTasksWork.java
@@ -315,4 +315,9 @@
         }
         return channelsForInputConnectors;
     }
+
+    @Override
+    public String toString() {
+        return getName() + " jobId:" + jobId;
+    }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/ChunkedResponse.java b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/ChunkedResponse.java
index e00c519..740af2f 100644
--- a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/ChunkedResponse.java
+++ b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/ChunkedResponse.java
@@ -25,6 +25,8 @@
 import java.nio.charset.Charset;
 import java.nio.charset.StandardCharsets;
 
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.util.InvokeUtil;
 import org.apache.hyracks.http.api.IServletResponse;
 import org.apache.hyracks.http.server.utils.HttpUtil;
 import org.apache.logging.log4j.Level;
@@ -45,6 +47,7 @@
 import io.netty.handler.codec.http.HttpResponseStatus;
 import io.netty.handler.codec.http.HttpVersion;
 import io.netty.handler.codec.http.LastHttpContent;
+import io.netty.util.ReferenceCountUtil;
 
 /**
  * A chunked http response. Here is how it is expected to work:
@@ -114,28 +117,40 @@
 
     @Override
     public void close() throws IOException {
-        if (writer != null) {
-            writer.close();
-        } else {
-            outputStream.close();
-        }
-        if (errorBuf == null && response.status() == HttpResponseStatus.OK) {
-            if (!done) {
-                respond(LastHttpContent.EMPTY_LAST_CONTENT);
-            }
-        } else {
-            // There was an error
-            if (headerSent) {
-                LOGGER.log(Level.WARN, "Error after header write of chunked response");
-                if (errorBuf != null) {
-                    errorBuf.release();
+        try {
+            InvokeUtil.tryIoWithCleanups(() -> {
+                if (writer != null) {
+                    writer.close();
+                } else {
+                    outputStream.close();
                 }
-                future = ctx.channel().close().addListener(handler);
-            } else {
-                // we didn't send anything to the user, we need to send an non-chunked error response
-                fullResponse(response.protocolVersion(), response.status(),
-                        errorBuf == null ? ctx.alloc().buffer(0, 0) : errorBuf, response.headers());
-            }
+                if (errorBuf == null && response.status() == HttpResponseStatus.OK) {
+                    if (!done) {
+                        respond(LastHttpContent.EMPTY_LAST_CONTENT);
+                    }
+                } else {
+                    // There was an error
+                    if (headerSent) {
+                        LOGGER.log(Level.WARN, "Error after header write of chunked response");
+                        future = ctx.channel().close().addListener(handler);
+                    } else {
+                        // we didn't send anything to the user, we need to send an non-chunked error response
+                        fullResponse(response.protocolVersion(), response.status(),
+                                errorBuf == null ? ctx.alloc().buffer(0, 0) : errorBuf, response.headers());
+                        // The responsibility of releasing the error buffer is now with the netty pipeline since it is
+                        // forwarded within the http content. We must nullify buffer to avoid releasing the buffer twice.
+                        errorBuf = null;
+                    }
+                }
+            }, outputStream::close, () -> {
+                ReferenceCountUtil.release(errorBuf);
+                // We must nullify buffer to avoid releasing the buffer twice in case of duplicate close()
+                errorBuf = null;
+            });
+        } catch (IOException e) {
+            throw e;
+        } catch (Exception e) {
+            throw HyracksDataException.create(e);
         }
         done = true;
     }
diff --git a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/security/NetworkSecurityConfig.java b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/security/NetworkSecurityConfig.java
index 2170c15..bfcd623 100644
--- a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/security/NetworkSecurityConfig.java
+++ b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/security/NetworkSecurityConfig.java
@@ -22,10 +22,12 @@
 import java.io.IOException;
 import java.io.ObjectInputStream;
 import java.io.ObjectOutputStream;
+import java.net.InetAddress;
 import java.security.KeyStore;
 import java.security.KeyStoreException;
 import java.security.NoSuchAlgorithmException;
 import java.security.cert.CertificateException;
+import java.util.Optional;
 
 import org.apache.hyracks.api.network.INetworkSecurityConfig;
 
@@ -90,6 +92,11 @@
         return trustStoreFile;
     }
 
+    @Override
+    public Optional<InetAddress> getRMIBindAddress() {
+        return Optional.empty();
+    }
+
     private void writeObject(ObjectOutputStream out) throws IOException {
         out.defaultWriteObject();
         writeStore(keyStore, out);
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractAsynchronousScheduler.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractAsynchronousScheduler.java
index e266a6f..049da38 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractAsynchronousScheduler.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractAsynchronousScheduler.java
@@ -38,9 +38,10 @@
 
     private final int maxNumFlushes;
     protected final Map<String, ILSMIOOperation> runningFlushOperations = new HashMap<>();
+    protected final Map<String, ILSMIOOperation> runningReplicateOperations = new HashMap<>();
     protected final Deque<ILSMIOOperation> waitingFlushOperations = new ArrayDeque<>();
     protected final Deque<ILSMIOOperation> waitingMergeOperations = new ArrayDeque<>();
-
+    protected final Deque<ILSMIOOperation> waitingReplicateOperations = new ArrayDeque<>();
     protected final Map<String, Throwable> failedGroups = new HashMap<>();
 
     public AbstractAsynchronousScheduler(ThreadFactory threadFactory, final IIoOperationFailedCallback callback,
@@ -58,8 +59,11 @@
             case MERGE:
                 scheduleMerge(operation);
                 break;
+            case REPLICATE:
+                scheduleReplicate(operation);
+                break;
             case NOOP:
-                return;
+                break;
             default:
                 // this should never happen
                 // just guard here to avoid silent failures in case of future extensions
@@ -75,6 +79,10 @@
                 break;
             case MERGE:
                 completeMerge(operation);
+                break;
+            case REPLICATE:
+                completeReplicate(operation);
+                break;
             case NOOP:
                 return;
             default:
@@ -149,6 +157,46 @@
         }
     }
 
+    private void scheduleReplicate(ILSMIOOperation operation) {
+        String id = operation.getIndexIdentifier();
+        synchronized (executor) {
+            if (runningReplicateOperations.size() >= maxNumFlushes || runningReplicateOperations.containsKey(id)) {
+                waitingReplicateOperations.add(operation);
+            } else {
+                runningReplicateOperations.put(id, operation);
+                executor.submit(operation);
+            }
+        }
+    }
+
+    private void completeReplicate(ILSMIOOperation operation) {
+        String id = operation.getIndexIdentifier();
+        synchronized (executor) {
+            runningReplicateOperations.remove(id);
+            // Schedule replicate in FIFO order. Must make sure that there is at most one scheduled replicate for each index.
+            for (ILSMIOOperation replicateOp : waitingReplicateOperations) {
+                String replicateOpId = replicateOp.getIndexIdentifier();
+                if (runningReplicateOperations.size() < maxNumFlushes) {
+                    if (!runningReplicateOperations.containsKey(replicateOpId) && !replicateOp.isCompleted()) {
+                        runningReplicateOperations.put(replicateOpId, replicateOp);
+                        executor.submit(replicateOp);
+                    }
+                } else {
+                    break;
+                }
+            }
+            // cleanup scheduled replicate
+            while (!waitingReplicateOperations.isEmpty()) {
+                ILSMIOOperation top = waitingReplicateOperations.peek();
+                if (top.isCompleted() || runningReplicateOperations.get(top.getIndexIdentifier()) == top) {
+                    waitingReplicateOperations.poll();
+                } else {
+                    break;
+                }
+            }
+        }
+    }
+
     @Override
     public void close() throws IOException {
         executor.shutdown();
diff --git a/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/IOThrowingAction.java b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/IOThrowingAction.java
new file mode 100644
index 0000000..b9430d8
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/IOThrowingAction.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.util;
+
+import java.io.IOException;
+
+@FunctionalInterface
+public interface IOThrowingAction {
+    void run() throws IOException; // NOSONAR
+
+    static ComputingAction<Void> asComputingAction(IOThrowingAction action) {
+        return () -> {
+            action.run();
+            return null;
+        };
+    }
+}
diff --git a/hyracks-fullstack/pom.xml b/hyracks-fullstack/pom.xml
index df23608..5ed10e8 100644
--- a/hyracks-fullstack/pom.xml
+++ b/hyracks-fullstack/pom.xml
@@ -73,10 +73,10 @@
     <hadoop.version>3.3.1</hadoop.version>
     <jacoco.version>0.7.6.201602180812</jacoco.version>
     <log4j.version>2.19.0</log4j.version>
-    <snappy.version>1.1.8.4</snappy.version>
-    <jackson.version>2.14.1</jackson.version>
+    <snappy.version>1.1.10.1</snappy.version>
+    <jackson.version>2.14.3</jackson.version>
     <jackson-databind.version>${jackson.version}</jackson-databind.version>
-    <netty.version>4.1.87.Final</netty.version>
+    <netty.version>4.1.94.Final</netty.version>
 
     <implementation.title>Apache Hyracks and Algebricks - ${project.name}</implementation.title>
     <implementation.url>https://asterixdb.apache.org/</implementation.url>
@@ -112,6 +112,16 @@
       </dependency>
       <dependency>
         <groupId>io.netty</groupId>
+        <artifactId>netty-codec-dns</artifactId>
+        <version>${netty.version}</version>
+      </dependency>
+      <dependency>
+        <groupId>io.netty</groupId>
+        <artifactId>netty-codec-socks</artifactId>
+        <version>${netty.version}</version>
+      </dependency>
+      <dependency>
+        <groupId>io.netty</groupId>
         <artifactId>netty-handler</artifactId>
         <version>${netty.version}</version>
       </dependency>
@@ -127,6 +137,11 @@
       </dependency>
       <dependency>
         <groupId>io.netty</groupId>
+        <artifactId>netty-resolver</artifactId>
+        <version>${netty.version}</version>
+      </dependency>
+      <dependency>
+        <groupId>io.netty</groupId>
         <artifactId>netty-resolver-dns</artifactId>
         <version>${netty.version}</version>
       </dependency>
@@ -482,12 +497,12 @@
       <dependency>
         <groupId>org.eclipse.jetty</groupId>
         <artifactId>jetty-util</artifactId>
-        <version>9.4.48.v20220622</version>
+        <version>9.4.51.v20230217</version>
       </dependency>
       <dependency>
         <groupId>org.eclipse.jetty</groupId>
         <artifactId>jetty-util-ajax</artifactId>
-        <version>9.4.48.v20220622</version>
+        <version>9.4.51.v20230217</version>
       </dependency>
       <!-- Manually included to avoid CVE-2023-1370 -->
       <dependency>