Merge commit 'bd728afe' from stabilization-f69489

Change-Id: Ic382adcb9e6f0a084a029206a0785d979a45faad
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java
index b034f47..62f2c02 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java
@@ -92,9 +92,9 @@
     private AlgebricksAbsolutePartitionConstraint locations;
     protected ActivityState prevState;
     protected JobId jobId;
-    protected long statsTimestamp;
+    protected volatile long statsTimestamp;
     protected String stats;
-    protected boolean isFetchingStats;
+    protected volatile boolean isFetchingStats;
     protected int numRegistered;
     protected int numDeRegistered;
     protected volatile RecoveryTask rt;
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/IndexCheckpointManager.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/IndexCheckpointManager.java
index 3c62d99..420585a 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/IndexCheckpointManager.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/IndexCheckpointManager.java
@@ -55,7 +55,8 @@
     }
 
     @Override
-    public synchronized void init(long validComponentSequence, long lsn) throws HyracksDataException {
+    public synchronized void init(long validComponentSequence, long lsn, long validComponentId)
+            throws HyracksDataException {
         List<IndexCheckpoint> checkpoints;
         try {
             checkpoints = getCheckpoints();
@@ -66,7 +67,7 @@
             LOGGER.warn(() -> "Checkpoints found on initializing: " + indexPath);
             delete();
         }
-        IndexCheckpoint firstCheckpoint = IndexCheckpoint.first(validComponentSequence, lsn);
+        IndexCheckpoint firstCheckpoint = IndexCheckpoint.first(validComponentSequence, lsn, validComponentId);
         persist(firstCheckpoint);
     }
 
@@ -196,6 +197,9 @@
                 // ensure it was written correctly by reading it
                 read(checkpointPath);
                 return;
+            } catch (ClosedByInterruptException e) {
+                LOGGER.info("interrupted while writing checkpoint at {}", checkpointPath);
+                throw HyracksDataException.create(e);
             } catch (IOException e) {
                 if (i == MAX_CHECKPOINT_WRITE_ATTEMPTS) {
                     throw HyracksDataException.create(e);
diff --git a/asterixdb/asterix-app/src/test/resources/optimizerts/results/split-materialization-above-join.plan b/asterixdb/asterix-app/src/test/resources/optimizerts/results/split-materialization-above-join.plan
index 22bc323..a81a142 100644
--- a/asterixdb/asterix-app/src/test/resources/optimizerts/results/split-materialization-above-join.plan
+++ b/asterixdb/asterix-app/src/test/resources/optimizerts/results/split-materialization-above-join.plan
@@ -32,26 +32,26 @@
                                               -- STREAM_PROJECT  |PARTITIONED|
                                                 -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
                                                   -- HYBRID_HASH_JOIN [$$prefixTokenLeft][$$prefixTokenRight]  |PARTITIONED|
-                                                    -- HASH_PARTITION_EXCHANGE [$$prefixTokenLeft]  |PARTITIONED|
-                                                      -- UNNEST  |PARTITIONED|
-                                                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                          -- PRE_CLUSTERED_GROUP_BY[$$95]  |PARTITIONED|
-                                                                  {
-                                                                    -- AGGREGATE  |LOCAL|
-                                                                      -- STREAM_SELECT  |LOCAL|
-                                                                        -- NESTED_TUPLE_SOURCE  |LOCAL|
-                                                                  }
-                                                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                              -- STABLE_SORT [$$95(ASC), $$i(ASC)]  |PARTITIONED|
-                                                                -- HASH_PARTITION_EXCHANGE [$$95]  |PARTITIONED|
-                                                                  -- STREAM_PROJECT  |PARTITIONED|
-                                                                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                      -- HYBRID_HASH_JOIN [$$tokenUnranked][$$tokenGroupped]  |PARTITIONED|
-                                                                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                          -- STREAM_PROJECT  |PARTITIONED|
-                                                                            -- ASSIGN  |PARTITIONED|
+                                                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                      -- STREAM_PROJECT  |PARTITIONED|
+                                                        -- ASSIGN  |PARTITIONED|
+                                                          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                            -- REPLICATE  |PARTITIONED|
+                                                              -- HASH_PARTITION_EXCHANGE [$$prefixTokenRight]  |PARTITIONED|
+                                                                -- UNNEST  |PARTITIONED|
+                                                                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                    -- PRE_CLUSTERED_GROUP_BY[$$97]  |PARTITIONED|
+                                                                            {
+                                                                              -- AGGREGATE  |LOCAL|
+                                                                                -- STREAM_SELECT  |LOCAL|
+                                                                                  -- NESTED_TUPLE_SOURCE  |LOCAL|
+                                                                            }
+                                                                      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                        -- STABLE_SORT [$$97(ASC), $$i(ASC)]  |PARTITIONED|
+                                                                          -- HASH_PARTITION_EXCHANGE [$$97]  |PARTITIONED|
+                                                                            -- STREAM_PROJECT  |PARTITIONED|
                                                                               -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                                -- REPLICATE  |PARTITIONED|
+                                                                                -- HYBRID_HASH_JOIN [$$tokenUnranked][$$tokenGroupped]  |PARTITIONED|
                                                                                   -- HASH_PARTITION_EXCHANGE [$$tokenUnranked]  |PARTITIONED|
                                                                                     -- STREAM_PROJECT  |PARTITIONED|
                                                                                       -- UNNEST  |PARTITIONED|
@@ -62,24 +62,19 @@
                                                                                                 -- DATASOURCE_SCAN  |PARTITIONED|
                                                                                                   -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
                                                                                                     -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
-                                                                        -- HASH_PARTITION_EXCHANGE [$$tokenGroupped]  |PARTITIONED|
-                                                                          -- ASSIGN  |PARTITIONED|
-                                                                            -- RUNNING_AGGREGATE  |PARTITIONED|
-                                                                              -- STREAM_PROJECT  |PARTITIONED|
-                                                                                -- SORT_MERGE_EXCHANGE [$$99(ASC), $$tokenGroupped(ASC) ]  |PARTITIONED|
-                                                                                  -- STABLE_SORT [$$99(ASC), $$tokenGroupped(ASC)]  |PARTITIONED|
-                                                                                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                                      -- SORT_GROUP_BY[$$128]  |PARTITIONED|
-                                                                                              {
-                                                                                                -- AGGREGATE  |LOCAL|
-                                                                                                  -- NESTED_TUPLE_SOURCE  |LOCAL|
-                                                                                              }
-                                                                                        -- HASH_PARTITION_EXCHANGE [$$128]  |PARTITIONED|
-                                                                                          -- STREAM_PROJECT  |PARTITIONED|
-                                                                                            -- ASSIGN  |PARTITIONED|
+                                                                                  -- HASH_PARTITION_EXCHANGE [$$tokenGroupped]  |PARTITIONED|
+                                                                                    -- ASSIGN  |PARTITIONED|
+                                                                                      -- RUNNING_AGGREGATE  |PARTITIONED|
+                                                                                        -- STREAM_PROJECT  |PARTITIONED|
+                                                                                          -- SORT_MERGE_EXCHANGE [$$102(ASC), $$tokenGroupped(ASC) ]  |PARTITIONED|
+                                                                                            -- STABLE_SORT [$$102(ASC), $$tokenGroupped(ASC)]  |PARTITIONED|
                                                                                               -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                                                -- REPLICATE  |PARTITIONED|
-                                                                                                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                                -- SORT_GROUP_BY[$$130]  |PARTITIONED|
+                                                                                                        {
+                                                                                                          -- AGGREGATE  |LOCAL|
+                                                                                                            -- NESTED_TUPLE_SOURCE  |LOCAL|
+                                                                                                        }
+                                                                                                  -- HASH_PARTITION_EXCHANGE [$$130]  |PARTITIONED|
                                                                                                     -- SORT_GROUP_BY[$$token]  |PARTITIONED|
                                                                                                             {
                                                                                                               -- AGGREGATE  |LOCAL|
@@ -97,23 +92,23 @@
                                                                                                                         -- DATASOURCE_SCAN  |PARTITIONED|
                                                                                                                           -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
                                                                                                                             -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
-                                                    -- HASH_PARTITION_EXCHANGE [$$prefixTokenRight]  |PARTITIONED|
-                                                      -- UNNEST  |PARTITIONED|
-                                                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                          -- PRE_CLUSTERED_GROUP_BY[$$97]  |PARTITIONED|
-                                                                  {
-                                                                    -- AGGREGATE  |LOCAL|
-                                                                      -- STREAM_SELECT  |LOCAL|
-                                                                        -- NESTED_TUPLE_SOURCE  |LOCAL|
-                                                                  }
+                                                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                      -- REPLICATE  |PARTITIONED|
+                                                        -- HASH_PARTITION_EXCHANGE [$$prefixTokenRight]  |PARTITIONED|
+                                                          -- UNNEST  |PARTITIONED|
                                                             -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                              -- STABLE_SORT [$$97(ASC), $$i(ASC)]  |PARTITIONED|
-                                                                -- HASH_PARTITION_EXCHANGE [$$97]  |PARTITIONED|
-                                                                  -- STREAM_PROJECT  |PARTITIONED|
-                                                                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                      -- HYBRID_HASH_JOIN [$$tokenUnranked][$$tokenGroupped]  |PARTITIONED|
+                                                              -- PRE_CLUSTERED_GROUP_BY[$$97]  |PARTITIONED|
+                                                                      {
+                                                                        -- AGGREGATE  |LOCAL|
+                                                                          -- STREAM_SELECT  |LOCAL|
+                                                                            -- NESTED_TUPLE_SOURCE  |LOCAL|
+                                                                      }
+                                                                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                  -- STABLE_SORT [$$97(ASC), $$i(ASC)]  |PARTITIONED|
+                                                                    -- HASH_PARTITION_EXCHANGE [$$97]  |PARTITIONED|
+                                                                      -- STREAM_PROJECT  |PARTITIONED|
                                                                         -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                          -- REPLICATE  |PARTITIONED|
+                                                                          -- HYBRID_HASH_JOIN [$$tokenUnranked][$$tokenGroupped]  |PARTITIONED|
                                                                             -- HASH_PARTITION_EXCHANGE [$$tokenUnranked]  |PARTITIONED|
                                                                               -- STREAM_PROJECT  |PARTITIONED|
                                                                                 -- UNNEST  |PARTITIONED|
@@ -124,21 +119,19 @@
                                                                                           -- DATASOURCE_SCAN  |PARTITIONED|
                                                                                             -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
                                                                                               -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
-                                                                        -- HASH_PARTITION_EXCHANGE [$$tokenGroupped]  |PARTITIONED|
-                                                                          -- ASSIGN  |PARTITIONED|
-                                                                            -- RUNNING_AGGREGATE  |PARTITIONED|
-                                                                              -- STREAM_PROJECT  |PARTITIONED|
-                                                                                -- SORT_MERGE_EXCHANGE [$$102(ASC), $$tokenGroupped(ASC) ]  |PARTITIONED|
-                                                                                  -- STABLE_SORT [$$102(ASC), $$tokenGroupped(ASC)]  |PARTITIONED|
-                                                                                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                                      -- SORT_GROUP_BY[$$130]  |PARTITIONED|
-                                                                                              {
-                                                                                                -- AGGREGATE  |LOCAL|
-                                                                                                  -- NESTED_TUPLE_SOURCE  |LOCAL|
-                                                                                              }
-                                                                                        -- HASH_PARTITION_EXCHANGE [$$130]  |PARTITIONED|
-                                                                                          -- REPLICATE  |PARTITIONED|
-                                                                                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                            -- HASH_PARTITION_EXCHANGE [$$tokenGroupped]  |PARTITIONED|
+                                                                              -- ASSIGN  |PARTITIONED|
+                                                                                -- RUNNING_AGGREGATE  |PARTITIONED|
+                                                                                  -- STREAM_PROJECT  |PARTITIONED|
+                                                                                    -- SORT_MERGE_EXCHANGE [$$102(ASC), $$tokenGroupped(ASC) ]  |PARTITIONED|
+                                                                                      -- STABLE_SORT [$$102(ASC), $$tokenGroupped(ASC)]  |PARTITIONED|
+                                                                                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                          -- SORT_GROUP_BY[$$130]  |PARTITIONED|
+                                                                                                  {
+                                                                                                    -- AGGREGATE  |LOCAL|
+                                                                                                      -- NESTED_TUPLE_SOURCE  |LOCAL|
+                                                                                                  }
+                                                                                            -- HASH_PARTITION_EXCHANGE [$$130]  |PARTITIONED|
                                                                                               -- SORT_GROUP_BY[$$token]  |PARTITIONED|
                                                                                                       {
                                                                                                         -- AGGREGATE  |LOCAL|
diff --git a/asterixdb/asterix-app/src/test/resources/optimizerts/results/tpcds/query-ASTERIXDB-1581-correlated.plan b/asterixdb/asterix-app/src/test/resources/optimizerts/results/tpcds/query-ASTERIXDB-1581-correlated.plan
index c636836..fff35d0 100644
--- a/asterixdb/asterix-app/src/test/resources/optimizerts/results/tpcds/query-ASTERIXDB-1581-correlated.plan
+++ b/asterixdb/asterix-app/src/test/resources/optimizerts/results/tpcds/query-ASTERIXDB-1581-correlated.plan
@@ -51,16 +51,19 @@
                                                         -- HYBRID_HASH_JOIN [$$120][$$130]  |PARTITIONED|
                                                           -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
                                                             -- STREAM_PROJECT  |PARTITIONED|
-                                                              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                -- STREAM_PROJECT  |PARTITIONED|
-                                                                  -- ASSIGN  |PARTITIONED|
+                                                              -- ASSIGN  |PARTITIONED|
+                                                                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                  -- REPLICATE  |PARTITIONED|
                                                                     -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                      -- REPLICATE  |PARTITIONED|
-                                                                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                          -- BTREE_SEARCH  |PARTITIONED|
-                                                                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                              -- ASSIGN  |PARTITIONED|
-                                                                                -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+                                                                      -- STREAM_PROJECT  |PARTITIONED|
+                                                                        -- ASSIGN  |PARTITIONED|
+                                                                          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                            -- REPLICATE  |PARTITIONED|
+                                                                              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                -- BTREE_SEARCH  |PARTITIONED|
+                                                                                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                    -- ASSIGN  |PARTITIONED|
+                                                                                      -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
                                                           -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
                                                             -- STREAM_PROJECT  |PARTITIONED|
                                                               -- ASSIGN  |PARTITIONED|
@@ -83,50 +86,49 @@
                                                   -- STREAM_PROJECT  |PARTITIONED|
                                                     -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
                                                       -- HYBRID_HASH_JOIN [$$134][$$133]  |PARTITIONED|
-                                                        -- HASH_PARTITION_EXCHANGE [$$134]  |PARTITIONED|
+                                                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
                                                           -- STREAM_PROJECT  |PARTITIONED|
                                                             -- ASSIGN  |PARTITIONED|
-                                                              -- STREAM_PROJECT  |PARTITIONED|
-                                                                -- STREAM_SELECT  |PARTITIONED|
-                                                                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                    -- PRE_CLUSTERED_GROUP_BY[$$142]  |PARTITIONED|
-                                                                            {
-                                                                              -- AGGREGATE  |LOCAL|
-                                                                                -- AGGREGATE  |LOCAL|
-                                                                                  -- STREAM_SELECT  |LOCAL|
-                                                                                    -- NESTED_TUPLE_SOURCE  |LOCAL|
-                                                                            }
-                                                                      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                -- REPLICATE  |PARTITIONED|
+                                                                  -- HASH_PARTITION_EXCHANGE [$$171]  |PARTITIONED|
+                                                                    -- STREAM_PROJECT  |PARTITIONED|
+                                                                      -- ASSIGN  |PARTITIONED|
                                                                         -- STREAM_PROJECT  |PARTITIONED|
-                                                                          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                            -- HYBRID_HASH_JOIN [$$142][$$144]  |PARTITIONED|
-                                                                              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                                -- STREAM_PROJECT  |PARTITIONED|
-                                                                                  -- ASSIGN  |PARTITIONED|
+                                                                          -- STREAM_SELECT  |PARTITIONED|
+                                                                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                              -- PRE_CLUSTERED_GROUP_BY[$$176]  |PARTITIONED|
+                                                                                      {
+                                                                                        -- AGGREGATE  |LOCAL|
+                                                                                          -- AGGREGATE  |LOCAL|
+                                                                                            -- STREAM_SELECT  |LOCAL|
+                                                                                              -- NESTED_TUPLE_SOURCE  |LOCAL|
+                                                                                      }
+                                                                                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                  -- STREAM_PROJECT  |PARTITIONED|
                                                                                     -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                                      -- REPLICATE  |PARTITIONED|
+                                                                                      -- HYBRID_HASH_JOIN [$$176][$$177]  |PARTITIONED|
                                                                                         -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                                          -- BTREE_SEARCH  |PARTITIONED|
+                                                                                          -- REPLICATE  |PARTITIONED|
                                                                                             -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                                              -- ASSIGN  |PARTITIONED|
-                                                                                                -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
-                                                                              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                                -- STREAM_PROJECT  |PARTITIONED|
-                                                                                  -- ASSIGN  |PARTITIONED|
-                                                                                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                                      -- REPLICATE  |PARTITIONED|
-                                                                                        -- HASH_PARTITION_EXCHANGE [$$177]  |PARTITIONED|
-                                                                                          -- ASSIGN  |PARTITIONED|
-                                                                                            -- STREAM_PROJECT  |PARTITIONED|
-                                                                                              -- ASSIGN  |PARTITIONED|
+                                                                                              -- BTREE_SEARCH  |PARTITIONED|
                                                                                                 -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                                                  -- REPLICATE  |PARTITIONED|
+                                                                                                  -- ASSIGN  |PARTITIONED|
+                                                                                                    -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+                                                                                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                          -- REPLICATE  |PARTITIONED|
+                                                                                            -- HASH_PARTITION_EXCHANGE [$$177]  |PARTITIONED|
+                                                                                              -- ASSIGN  |PARTITIONED|
+                                                                                                -- STREAM_PROJECT  |PARTITIONED|
+                                                                                                  -- ASSIGN  |PARTITIONED|
                                                                                                     -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                                                      -- STREAM_PROJECT  |PARTITIONED|
+                                                                                                      -- REPLICATE  |PARTITIONED|
                                                                                                         -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                                                          -- DATASOURCE_SCAN  |PARTITIONED|
+                                                                                                          -- STREAM_PROJECT  |PARTITIONED|
                                                                                                             -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                                                              -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+                                                                                                              -- DATASOURCE_SCAN  |PARTITIONED|
+                                                                                                                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                                                  -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
                                                         -- HASH_PARTITION_EXCHANGE [$$133]  |PARTITIONED|
                                                           -- STREAM_PROJECT  |PARTITIONED|
                                                             -- ASSIGN  |PARTITIONED|
@@ -178,7 +180,7 @@
                                                                           -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
                                                                             -- HYBRID_HASH_JOIN [$$158][$$159]  |PARTITIONED|
                                                                               -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                                -- STREAM_PROJECT  |PARTITIONED|
+                                                                                -- REPLICATE  |PARTITIONED|
                                                                                   -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
                                                                                     -- STREAM_PROJECT  |PARTITIONED|
                                                                                       -- ASSIGN  |PARTITIONED|
@@ -211,44 +213,46 @@
                                                             -- STREAM_PROJECT  |PARTITIONED|
                                                               -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
                                                                 -- HYBRID_HASH_JOIN [$$171][$$170]  |PARTITIONED|
-                                                                  -- HASH_PARTITION_EXCHANGE [$$171]  |PARTITIONED|
-                                                                    -- STREAM_PROJECT  |PARTITIONED|
-                                                                      -- ASSIGN  |PARTITIONED|
+                                                                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                    -- REPLICATE  |PARTITIONED|
+                                                                      -- HASH_PARTITION_EXCHANGE [$$171]  |PARTITIONED|
                                                                         -- STREAM_PROJECT  |PARTITIONED|
-                                                                          -- STREAM_SELECT  |PARTITIONED|
-                                                                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                              -- PRE_CLUSTERED_GROUP_BY[$$176]  |PARTITIONED|
-                                                                                      {
-                                                                                        -- AGGREGATE  |LOCAL|
-                                                                                          -- AGGREGATE  |LOCAL|
-                                                                                            -- STREAM_SELECT  |LOCAL|
-                                                                                              -- NESTED_TUPLE_SOURCE  |LOCAL|
-                                                                                      }
+                                                                          -- ASSIGN  |PARTITIONED|
+                                                                            -- STREAM_PROJECT  |PARTITIONED|
+                                                                              -- STREAM_SELECT  |PARTITIONED|
                                                                                 -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                                  -- STREAM_PROJECT  |PARTITIONED|
+                                                                                  -- PRE_CLUSTERED_GROUP_BY[$$176]  |PARTITIONED|
+                                                                                          {
+                                                                                            -- AGGREGATE  |LOCAL|
+                                                                                              -- AGGREGATE  |LOCAL|
+                                                                                                -- STREAM_SELECT  |LOCAL|
+                                                                                                  -- NESTED_TUPLE_SOURCE  |LOCAL|
+                                                                                          }
                                                                                     -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                                      -- HYBRID_HASH_JOIN [$$176][$$177]  |PARTITIONED|
+                                                                                      -- STREAM_PROJECT  |PARTITIONED|
                                                                                         -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                                          -- REPLICATE  |PARTITIONED|
+                                                                                          -- HYBRID_HASH_JOIN [$$176][$$177]  |PARTITIONED|
                                                                                             -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                                              -- BTREE_SEARCH  |PARTITIONED|
+                                                                                              -- REPLICATE  |PARTITIONED|
                                                                                                 -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                                                  -- ASSIGN  |PARTITIONED|
-                                                                                                    -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
-                                                                                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                                          -- REPLICATE  |PARTITIONED|
-                                                                                            -- HASH_PARTITION_EXCHANGE [$$177]  |PARTITIONED|
-                                                                                              -- ASSIGN  |PARTITIONED|
-                                                                                                -- STREAM_PROJECT  |PARTITIONED|
-                                                                                                  -- ASSIGN  |PARTITIONED|
+                                                                                                  -- BTREE_SEARCH  |PARTITIONED|
                                                                                                     -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                                                      -- REPLICATE  |PARTITIONED|
+                                                                                                      -- ASSIGN  |PARTITIONED|
+                                                                                                        -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+                                                                                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                              -- REPLICATE  |PARTITIONED|
+                                                                                                -- HASH_PARTITION_EXCHANGE [$$177]  |PARTITIONED|
+                                                                                                  -- ASSIGN  |PARTITIONED|
+                                                                                                    -- STREAM_PROJECT  |PARTITIONED|
+                                                                                                      -- ASSIGN  |PARTITIONED|
                                                                                                         -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                                                          -- STREAM_PROJECT  |PARTITIONED|
+                                                                                                          -- REPLICATE  |PARTITIONED|
                                                                                                             -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                                                              -- DATASOURCE_SCAN  |PARTITIONED|
+                                                                                                              -- STREAM_PROJECT  |PARTITIONED|
                                                                                                                 -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                                                                  -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+                                                                                                                  -- DATASOURCE_SCAN  |PARTITIONED|
+                                                                                                                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                                                      -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
                                                                   -- HASH_PARTITION_EXCHANGE [$$170]  |PARTITIONED|
                                                                     -- STREAM_PROJECT  |PARTITIONED|
                                                                       -- ASSIGN  |PARTITIONED|
diff --git a/asterixdb/asterix-app/src/test/resources/optimizerts/results/tpcds/query-ASTERIXDB-1581.plan b/asterixdb/asterix-app/src/test/resources/optimizerts/results/tpcds/query-ASTERIXDB-1581.plan
index 38d0b5f..d0afd76 100644
--- a/asterixdb/asterix-app/src/test/resources/optimizerts/results/tpcds/query-ASTERIXDB-1581.plan
+++ b/asterixdb/asterix-app/src/test/resources/optimizerts/results/tpcds/query-ASTERIXDB-1581.plan
@@ -45,24 +45,29 @@
                                           -- STREAM_PROJECT  |PARTITIONED|
                                             -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
                                               -- HYBRID_HASH_JOIN [$$85][$$142]  |PARTITIONED|
-                                                -- HASH_PARTITION_EXCHANGE [$$85]  |PARTITIONED|
-                                                  -- STREAM_PROJECT  |UNPARTITIONED|
-                                                    -- ASSIGN  |UNPARTITIONED|
-                                                      -- ONE_TO_ONE_EXCHANGE  |UNPARTITIONED|
-                                                        -- REPLICATE  |UNPARTITIONED|
-                                                          -- ONE_TO_ONE_EXCHANGE  |UNPARTITIONED|
-                                                            -- AGGREGATE  |UNPARTITIONED|
-                                                              -- AGGREGATE  |UNPARTITIONED|
-                                                                -- RANDOM_MERGE_EXCHANGE  |PARTITIONED|
-                                                                  -- AGGREGATE  |PARTITIONED|
-                                                                    -- STREAM_PROJECT  |PARTITIONED|
-                                                                      -- STREAM_SELECT  |PARTITIONED|
-                                                                        -- ASSIGN  |PARTITIONED|
-                                                                          -- STREAM_PROJECT  |PARTITIONED|
-                                                                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                              -- DATASOURCE_SCAN  |PARTITIONED|
-                                                                                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                                  -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+                                                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                  -- STREAM_PROJECT  |PARTITIONED|
+                                                    -- ASSIGN  |PARTITIONED|
+                                                      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                        -- REPLICATE  |PARTITIONED|
+                                                          -- HASH_PARTITION_EXCHANGE [$$151]  |PARTITIONED|
+                                                            -- STREAM_PROJECT  |UNPARTITIONED|
+                                                              -- ASSIGN  |UNPARTITIONED|
+                                                                -- ONE_TO_ONE_EXCHANGE  |UNPARTITIONED|
+                                                                  -- REPLICATE  |UNPARTITIONED|
+                                                                    -- ONE_TO_ONE_EXCHANGE  |UNPARTITIONED|
+                                                                      -- AGGREGATE  |UNPARTITIONED|
+                                                                        -- AGGREGATE  |UNPARTITIONED|
+                                                                          -- RANDOM_MERGE_EXCHANGE  |PARTITIONED|
+                                                                            -- AGGREGATE  |PARTITIONED|
+                                                                              -- STREAM_PROJECT  |PARTITIONED|
+                                                                                -- STREAM_SELECT  |PARTITIONED|
+                                                                                  -- ASSIGN  |PARTITIONED|
+                                                                                    -- STREAM_PROJECT  |PARTITIONED|
+                                                                                      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                        -- DATASOURCE_SCAN  |PARTITIONED|
+                                                                                          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                            -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
                                                 -- HASH_PARTITION_EXCHANGE [$$142]  |PARTITIONED|
                                                   -- ASSIGN  |PARTITIONED|
                                                     -- STREAM_PROJECT  |PARTITIONED|
@@ -122,24 +127,26 @@
                                                                     -- STREAM_PROJECT  |PARTITIONED|
                                                                       -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
                                                                         -- HYBRID_HASH_JOIN [$$151][$$152]  |PARTITIONED|
-                                                                          -- HASH_PARTITION_EXCHANGE [$$151]  |PARTITIONED|
-                                                                            -- STREAM_PROJECT  |UNPARTITIONED|
-                                                                              -- ASSIGN  |UNPARTITIONED|
-                                                                                -- ONE_TO_ONE_EXCHANGE  |UNPARTITIONED|
-                                                                                  -- REPLICATE  |UNPARTITIONED|
+                                                                          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                            -- REPLICATE  |PARTITIONED|
+                                                                              -- HASH_PARTITION_EXCHANGE [$$151]  |PARTITIONED|
+                                                                                -- STREAM_PROJECT  |UNPARTITIONED|
+                                                                                  -- ASSIGN  |UNPARTITIONED|
                                                                                     -- ONE_TO_ONE_EXCHANGE  |UNPARTITIONED|
-                                                                                      -- AGGREGATE  |UNPARTITIONED|
-                                                                                        -- AGGREGATE  |UNPARTITIONED|
-                                                                                          -- RANDOM_MERGE_EXCHANGE  |PARTITIONED|
-                                                                                            -- AGGREGATE  |PARTITIONED|
-                                                                                              -- STREAM_PROJECT  |PARTITIONED|
-                                                                                                -- STREAM_SELECT  |PARTITIONED|
-                                                                                                  -- ASSIGN  |PARTITIONED|
-                                                                                                    -- STREAM_PROJECT  |PARTITIONED|
-                                                                                                      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                                                        -- DATASOURCE_SCAN  |PARTITIONED|
+                                                                                      -- REPLICATE  |UNPARTITIONED|
+                                                                                        -- ONE_TO_ONE_EXCHANGE  |UNPARTITIONED|
+                                                                                          -- AGGREGATE  |UNPARTITIONED|
+                                                                                            -- AGGREGATE  |UNPARTITIONED|
+                                                                                              -- RANDOM_MERGE_EXCHANGE  |PARTITIONED|
+                                                                                                -- AGGREGATE  |PARTITIONED|
+                                                                                                  -- STREAM_PROJECT  |PARTITIONED|
+                                                                                                    -- STREAM_SELECT  |PARTITIONED|
+                                                                                                      -- ASSIGN  |PARTITIONED|
+                                                                                                        -- STREAM_PROJECT  |PARTITIONED|
                                                                                                           -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                                                            -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+                                                                                                            -- DATASOURCE_SCAN  |PARTITIONED|
+                                                                                                              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                                                -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
                                                                           -- HASH_PARTITION_EXCHANGE [$$152]  |PARTITIONED|
                                                                             -- ASSIGN  |PARTITIONED|
                                                                               -- STREAM_PROJECT  |PARTITIONED|
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IIndexCheckpointManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IIndexCheckpointManager.java
index dd9ede5..2f0eddf 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IIndexCheckpointManager.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IIndexCheckpointManager.java
@@ -29,9 +29,10 @@
      *
      * @param validComponentSequence
      * @param lsn
+     * @param validComponentId
      * @throws HyracksDataException
      */
-    void init(long validComponentSequence, long lsn) throws HyracksDataException;
+    void init(long validComponentSequence, long lsn, long validComponentId) throws HyracksDataException;
 
     /**
      * Called when a new LSM disk component is flushed. When called, the index checkpoint is updated
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IndexCheckpoint.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IndexCheckpoint.java
index 9654473..cb34600 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IndexCheckpoint.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IndexCheckpoint.java
@@ -23,7 +23,6 @@
 import java.util.Map;
 
 import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.storage.am.lsm.common.impls.LSMComponentId;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
@@ -42,12 +41,12 @@
     private long lastComponentId;
     private Map<Long, Long> masterNodeFlushMap;
 
-    public static IndexCheckpoint first(long lastComponentSequence, long lowWatermark) {
+    public static IndexCheckpoint first(long lastComponentSequence, long lowWatermark, long validComponentId) {
         IndexCheckpoint firstCheckpoint = new IndexCheckpoint();
         firstCheckpoint.id = INITIAL_CHECKPOINT_ID;
         firstCheckpoint.lowWatermark = lowWatermark;
         firstCheckpoint.validComponentSequence = lastComponentSequence;
-        firstCheckpoint.lastComponentId = LSMComponentId.EMPTY_INDEX_LAST_COMPONENT_ID.getMaxId();
+        firstCheckpoint.lastComponentId = validComponentId;
         firstCheckpoint.masterNodeFlushMap = new HashMap<>();
         return firstCheckpoint;
     }
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/CheckpointPartitionIndexesTask.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/CheckpointPartitionIndexesTask.java
index 448613b..e778cce 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/CheckpointPartitionIndexesTask.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/CheckpointPartitionIndexesTask.java
@@ -45,9 +45,11 @@
 public class CheckpointPartitionIndexesTask implements IReplicaTask {
 
     private final int partition;
+    private final long maxComponentId;
 
-    public CheckpointPartitionIndexesTask(int partition) {
+    public CheckpointPartitionIndexesTask(int partition, long maxComponentId) {
         this.partition = partition;
+        this.maxComponentId = maxComponentId;
     }
 
     @Override
@@ -75,7 +77,7 @@
                 maxComponentSequence =
                         Math.max(maxComponentSequence, IndexComponentFileReference.of(file).getSequenceEnd());
             }
-            indexCheckpointManager.init(maxComponentSequence, currentLSN);
+            indexCheckpointManager.init(maxComponentSequence, currentLSN, maxComponentId);
         }
         ReplicationProtocol.sendAck(worker.getChannel(), worker.getReusableBuffer());
     }
@@ -90,6 +92,7 @@
         try {
             DataOutputStream dos = new DataOutputStream(out);
             dos.writeInt(partition);
+            dos.writeLong(maxComponentId);
         } catch (IOException e) {
             throw HyracksDataException.create(e);
         }
@@ -98,7 +101,8 @@
     public static CheckpointPartitionIndexesTask create(DataInput input) throws HyracksDataException {
         try {
             int partition = input.readInt();
-            return new CheckpointPartitionIndexesTask(partition);
+            long maxComponentId = input.readLong();
+            return new CheckpointPartitionIndexesTask(partition, maxComponentId);
         } catch (IOException e) {
             throw HyracksDataException.create(e);
         }
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ReplicateFileTask.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ReplicateFileTask.java
index f53d448..ae36c13 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ReplicateFileTask.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ReplicateFileTask.java
@@ -40,6 +40,7 @@
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.io.FileReference;
 import org.apache.hyracks.api.io.IIOManager;
+import org.apache.hyracks.storage.am.lsm.common.impls.LSMComponentId;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
@@ -98,7 +99,8 @@
         final IIndexCheckpointManager indexCheckpointManager = checkpointManagerProvider.get(indexRef);
         final long currentLSN = appCtx.getTransactionSubsystem().getLogManager().getAppendLSN();
         indexCheckpointManager.delete();
-        indexCheckpointManager.init(Long.MIN_VALUE, currentLSN);
+        indexCheckpointManager.init(Long.MIN_VALUE, currentLSN,
+                LSMComponentId.EMPTY_INDEX_LAST_COMPONENT_ID.getMaxId());
         LOGGER.info(() -> "Checkpoint index: " + indexRef);
     }
 
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaSynchronizer.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaSynchronizer.java
index ef85977..09f1205 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaSynchronizer.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaSynchronizer.java
@@ -25,6 +25,8 @@
 import org.apache.asterix.replication.api.PartitionReplica;
 import org.apache.asterix.replication.messaging.CheckpointPartitionIndexesTask;
 import org.apache.asterix.replication.messaging.ReplicationProtocol;
+import org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
 
 /**
  * Performs the steps required to ensure any newly added replica
@@ -60,9 +62,17 @@
     }
 
     private void checkpointReplicaIndexes() throws IOException {
+        final int partition = replica.getIdentifier().getPartition();
         CheckpointPartitionIndexesTask task =
-                new CheckpointPartitionIndexesTask(replica.getIdentifier().getPartition());
+                new CheckpointPartitionIndexesTask(partition, getPartitionMaxComponentId(partition));
         ReplicationProtocol.sendTo(replica, task);
         ReplicationProtocol.waitForAck(replica);
     }
+
+    private long getPartitionMaxComponentId(int partition) throws HyracksDataException {
+        final IReplicationStrategy replStrategy = appCtx.getReplicationManager().getReplicationStrategy();
+        final PersistentLocalResourceRepository localResourceRepository =
+                (PersistentLocalResourceRepository) appCtx.getLocalResourceRepository();
+        return localResourceRepository.getReplicatedIndexesMaxComponentId(partition, replStrategy);
+    }
 }
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
index c0da095..8f870c0 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
@@ -66,8 +66,8 @@
 import org.apache.hyracks.api.replication.IReplicationJob.ReplicationOperation;
 import org.apache.hyracks.api.util.IoUtil;
 import org.apache.hyracks.storage.am.common.api.ITreeIndexFrame;
-import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndexFileManager;
 import org.apache.hyracks.storage.am.lsm.common.impls.IndexComponentFileReference;
+import org.apache.hyracks.storage.am.lsm.common.impls.LSMComponentId;
 import org.apache.hyracks.storage.common.ILocalResourceRepository;
 import org.apache.hyracks.storage.common.LocalResource;
 import org.apache.hyracks.util.ExitUtil;
@@ -196,7 +196,8 @@
             byte[] bytes = OBJECT_MAPPER.writeValueAsBytes(resource.toJson(persistedResourceRegistry));
             final Path path = Paths.get(resourceFile.getAbsolutePath());
             Files.write(path, bytes);
-            indexCheckpointManagerProvider.get(DatasetResourceReference.of(resource)).init(Long.MIN_VALUE, 0);
+            indexCheckpointManagerProvider.get(DatasetResourceReference.of(resource)).init(Long.MIN_VALUE, 0,
+                    LSMComponentId.EMPTY_INDEX_LAST_COMPONENT_ID.getMaxId());
             deleteResourceFileMask(resourceFile);
         } catch (Exception e) {
             cleanup(resourceFile);
@@ -393,6 +394,21 @@
         return partitionReplicatedFiles;
     }
 
+    public long getReplicatedIndexesMaxComponentId(int partition, IReplicationStrategy strategy)
+            throws HyracksDataException {
+        long maxComponentId = LSMComponentId.MIN_VALID_COMPONENT_ID;
+        final Map<Long, LocalResource> partitionResources = getPartitionResources(partition);
+        for (LocalResource lr : partitionResources.values()) {
+            DatasetLocalResource datasetLocalResource = (DatasetLocalResource) lr.getResource();
+            if (strategy.isMatch(datasetLocalResource.getDatasetId())) {
+                final IIndexCheckpointManager indexCheckpointManager =
+                        indexCheckpointManagerProvider.get(DatasetResourceReference.of(lr));
+                maxComponentId = Math.max(maxComponentId, indexCheckpointManager.getLatest().getLastComponentId());
+            }
+        }
+        return maxComponentId;
+    }
+
     private List<String> getIndexFiles(File indexDir) {
         final List<String> indexFiles = new ArrayList<>();
         if (indexDir.isDirectory()) {
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractPreclusteredGroupByPOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractPreclusteredGroupByPOperator.java
index 64e50ed..c18d76c 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractPreclusteredGroupByPOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractPreclusteredGroupByPOperator.java
@@ -89,6 +89,10 @@
         IPartitioningProperty pp = childProp.getPartitioningProperty();
         Map<LogicalVariable, LogicalVariable> ppSubstMap = computePartitioningPropertySubstitutionMap(gby, pp);
         if (ppSubstMap != null) {
+            // We cannot modify pp directly, since it is owned by the input operator.
+            // Otherwise, the partitioning property would be modified even before this group by operator,
+            // which will be undesirable.
+            pp = pp.clonePartitioningProperty();
             pp.substituteColumnVars(ppSubstMap);
         }
         List<ILocalStructuralProperty> childLocals = childProp.getLocalProperties();
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/BroadcastPartitioningProperty.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/BroadcastPartitioningProperty.java
index bc6a45d..3e78fd2 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/BroadcastPartitioningProperty.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/BroadcastPartitioningProperty.java
@@ -60,6 +60,12 @@
 
     @Override
     public void substituteColumnVars(Map<LogicalVariable, LogicalVariable> varMap) {
+
+    }
+
+    @Override
+    public IPartitioningProperty clonePartitioningProperty() {
+        return new BroadcastPartitioningProperty(domain);
     }
 
 }
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/IPartitioningProperty.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/IPartitioningProperty.java
index f41d197..5164192 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/IPartitioningProperty.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/IPartitioningProperty.java
@@ -77,6 +77,8 @@
     void setNodeDomain(INodeDomain domain);
 
     void substituteColumnVars(Map<LogicalVariable, LogicalVariable> varMap);
+
+    IPartitioningProperty clonePartitioningProperty();
 }
 
 class UnpartitionedProperty implements IPartitioningProperty {
@@ -116,4 +118,9 @@
     public void substituteColumnVars(Map<LogicalVariable, LogicalVariable> variableMap) {
         // No partition columns are maintained for UNPARTITIONED.
     }
+
+    @Override
+    public IPartitioningProperty clonePartitioningProperty() {
+        return new UnpartitionedProperty();
+    }
 }
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/OrderedPartitionedProperty.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/OrderedPartitionedProperty.java
index 23c8273..b5a2bb5 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/OrderedPartitionedProperty.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/OrderedPartitionedProperty.java
@@ -28,7 +28,7 @@
 
 public class OrderedPartitionedProperty implements IPartitioningProperty {
 
-    private List<OrderColumn> orderColumns;
+    private final List<OrderColumn> orderColumns;
     private INodeDomain domain;
 
     public OrderedPartitionedProperty(List<OrderColumn> orderColumns, INodeDomain domain) {
@@ -92,4 +92,9 @@
         }
     }
 
+    @Override
+    public IPartitioningProperty clonePartitioningProperty() {
+        return new OrderedPartitionedProperty(new ArrayList<>(orderColumns), domain);
+    }
+
 }
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/RandomPartitioningProperty.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/RandomPartitioningProperty.java
index bbd835c..951a031 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/RandomPartitioningProperty.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/RandomPartitioningProperty.java
@@ -65,6 +65,12 @@
 
     @Override
     public void substituteColumnVars(Map<LogicalVariable, LogicalVariable> varMap) {
+
+    }
+
+    @Override
+    public IPartitioningProperty clonePartitioningProperty() {
+        return new RandomPartitioningProperty(domain);
     }
 
 }
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/UnorderedPartitionedProperty.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/UnorderedPartitionedProperty.java
index f59638c..5966407 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/UnorderedPartitionedProperty.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/UnorderedPartitionedProperty.java
@@ -23,6 +23,7 @@
 import java.util.Map;
 import java.util.Set;
 
+import org.apache.hyracks.algebricks.common.utils.ListSet;
 import org.apache.hyracks.algebricks.core.algebra.base.EquivalenceClass;
 import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
 
@@ -77,4 +78,9 @@
         });
     }
 
+    @Override
+    public IPartitioningProperty clonePartitioningProperty() {
+        return new UnorderedPartitionedProperty(new ListSet<>(columnSet), domain);
+    }
+
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/ChunkedResponse.java b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/ChunkedResponse.java
index f02654e..a67b40e 100644
--- a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/ChunkedResponse.java
+++ b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/ChunkedResponse.java
@@ -79,9 +79,8 @@
         response = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.INTERNAL_SERVER_ERROR);
         response.headers().set(HttpHeaderNames.TRANSFER_ENCODING, HttpHeaderValues.CHUNKED);
         keepAlive = HttpUtil.isKeepAlive(request);
-        if (keepAlive) {
-            response.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE);
-        }
+        response.headers().set(HttpHeaderNames.CONNECTION,
+                keepAlive ? HttpHeaderValues.KEEP_ALIVE : HttpHeaderValues.CLOSE);
     }
 
     @Override
@@ -119,17 +118,10 @@
                 }
                 future = ctx.channel().close();
             } else {
-                if (keepAlive && response.status() != HttpResponseStatus.UNAUTHORIZED) {
-                    response.headers().remove(HttpHeaderNames.CONNECTION);
-                }
-                // we didn't send anything to the user, we need to send an unchunked error response
+                // we didn't send anything to the user, we need to send an non-chunked error response
                 fullResponse(response.protocolVersion(), response.status(),
                         error == null ? ctx.alloc().buffer(0, 0) : error, response.headers());
             }
-            if (response.status() != HttpResponseStatus.UNAUTHORIZED) {
-                // since the request failed, we need to close the channel on complete
-                future.addListener(ChannelFutureListener.CLOSE);
-            }
         }
         done = true;
     }
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java
index 3d928a4..9199fbb 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java
@@ -577,7 +577,7 @@
         if (c != EmptyComponent.INSTANCE) {
             diskComponents.add(0, c);
         }
-        assert checkComponentIds();
+        validateComponentIds();
     }
 
     @Override
@@ -588,7 +588,7 @@
         if (newComponent != EmptyComponent.INSTANCE) {
             diskComponents.add(swapIndex, newComponent);
         }
-        assert checkComponentIds();
+        validateComponentIds();
     }
 
     /**
@@ -597,16 +597,16 @@
      *
      * @throws HyracksDataException
      */
-    private boolean checkComponentIds() throws HyracksDataException {
+    private void validateComponentIds() throws HyracksDataException {
         for (int i = 0; i < diskComponents.size() - 1; i++) {
             ILSMComponentId id1 = diskComponents.get(i).getId();
             ILSMComponentId id2 = diskComponents.get(i + 1).getId();
             IdCompareResult cmp = id1.compareTo(id2);
             if (cmp != IdCompareResult.UNKNOWN && cmp != IdCompareResult.GREATER_THAN) {
-                return false;
+                throw new IllegalStateException(
+                        "found non-decreasing component ids (" + id1 + " -> " + id2 + ") on index " + this);
             }
         }
-        return true;
     }
 
     @Override
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/ChainedLSMDiskComponentBulkLoader.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/ChainedLSMDiskComponentBulkLoader.java
index ab59b59..3fa45c9 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/ChainedLSMDiskComponentBulkLoader.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/ChainedLSMDiskComponentBulkLoader.java
@@ -101,8 +101,8 @@
             for (int i = 0; i < bulkloadersCount; i++) {
                 bulkloaderChain.get(i).cleanupArtifacts();;
             }
+            diskComponent.deactivateAndDestroy();
         }
-        diskComponent.deactivateAndDestroy();
     }
 
     @Override
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMIndexDiskComponentBulkLoader.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMIndexDiskComponentBulkLoader.java
index 3a43ba7..c739ad0 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMIndexDiskComponentBulkLoader.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMIndexDiskComponentBulkLoader.java
@@ -22,6 +22,7 @@
 import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponentBulkLoader;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation.LSMIOOperationStatus;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexOperationContext;
 import org.apache.hyracks.storage.common.IIndexBulkLoader;
@@ -31,6 +32,7 @@
     private final AbstractLSMIndex lsmIndex;
     private final ILSMDiskComponentBulkLoader componentBulkLoader;
     private final ILSMIndexOperationContext opCtx;
+    private boolean failed = false;
 
     public LSMIndexDiskComponentBulkLoader(AbstractLSMIndex lsmIndex, ILSMIndexOperationContext opCtx, float fillFactor,
             boolean verifyInput, long numElementsHint) throws HyracksDataException {
@@ -68,20 +70,10 @@
     @Override
     public void end() throws HyracksDataException {
         try {
-            try {
-                lsmIndex.getIOOperationCallback().afterOperation(opCtx.getIoOperation());
-                componentBulkLoader.end();
-            } catch (Throwable th) { // NOSONAR Must not call afterFinalize without setting failure
-                opCtx.getIoOperation().setStatus(LSMIOOperationStatus.FAILURE);
-                opCtx.getIoOperation().setFailure(th);
-                throw th;
-            } finally {
-                lsmIndex.getIOOperationCallback().afterFinalize(opCtx.getIoOperation());
-            }
-            if (opCtx.getIoOperation().getStatus() == LSMIOOperationStatus.SUCCESS
-                    && opCtx.getIoOperation().getNewComponent().getComponentSize() > 0) {
-                lsmIndex.getHarness().addBulkLoadedComponent(opCtx.getIoOperation());
-            }
+            presistComponentToDisk();
+        } catch (Throwable th) { // NOSONAR must cleanup in case of any failure
+            fail(th);
+            throw th;
         } finally {
             lsmIndex.getIOOperationCallback().completed(opCtx.getIoOperation());
         }
@@ -116,4 +108,28 @@
         return opCtx.getIoOperation().getFailure();
     }
 
+    private void presistComponentToDisk() throws HyracksDataException {
+        try {
+            lsmIndex.getIOOperationCallback().afterOperation(opCtx.getIoOperation());
+            componentBulkLoader.end();
+        } catch (Throwable th) { // NOSONAR Must not call afterFinalize without setting failure
+            fail(th);
+            throw th;
+        } finally {
+            lsmIndex.getIOOperationCallback().afterFinalize(opCtx.getIoOperation());
+        }
+        if (opCtx.getIoOperation().getStatus() == LSMIOOperationStatus.SUCCESS
+                && opCtx.getIoOperation().getNewComponent().getComponentSize() > 0) {
+            lsmIndex.getHarness().addBulkLoadedComponent(opCtx.getIoOperation());
+        }
+    }
+
+    private void fail(Throwable th) {
+        if (!failed) {
+            failed = true;
+            final ILSMIOOperation loadOp = opCtx.getIoOperation();
+            loadOp.setFailure(th);
+            lsmIndex.cleanUpFilesForFailedOperation(loadOp);
+        }
+    }
 }
\ No newline at end of file