Merge commit '142aad08' from stabilization-f69489

Change-Id: I7ebe1f339fa27fc269f56f4e394622b452310638
diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveSourceOperatorNodePushable.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveSourceOperatorNodePushable.java
index b1a1fcc..453ffa0 100644
--- a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveSourceOperatorNodePushable.java
+++ b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveSourceOperatorNodePushable.java
@@ -26,7 +26,6 @@
 import org.apache.hyracks.api.comm.IFrameWriter;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.api.job.JobId;
 import org.apache.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNodePushable;
 import org.apache.logging.log4j.Level;
 import org.apache.logging.log4j.LogManager;
@@ -135,9 +134,4 @@
     public final IFrameWriter getInputFrameWriter(int index) {
         return null;
     }
-
-    @Override
-    public JobId getJobId() {
-        return ctx.getJobletContext().getJobId();
-    }
 }
diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveRuntime.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveRuntime.java
index 2da7193..a52f01e 100644
--- a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveRuntime.java
+++ b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveRuntime.java
@@ -21,7 +21,6 @@
 import java.util.concurrent.TimeUnit;
 
 import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.api.job.JobId;
 
 public interface IActiveRuntime {
 
@@ -44,11 +43,6 @@
     void stop(long timeout, TimeUnit unit) throws HyracksDataException, InterruptedException;
 
     /**
-     * @return the job id associated with this active runtime
-     */
-    JobId getJobId();
-
-    /**
      * @return the runtime stats for monitoring purposes
      */
     default String getStats() {
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/SqlppExpressionToPlanTranslator.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/SqlppExpressionToPlanTranslator.java
index 1c762af..03c4bc5 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/SqlppExpressionToPlanTranslator.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/SqlppExpressionToPlanTranslator.java
@@ -23,8 +23,10 @@
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.Deque;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
 import org.apache.asterix.algebra.base.ILangExpressionToPlanTranslator;
 import org.apache.asterix.common.exceptions.CompilationException;
@@ -736,16 +738,18 @@
             throws CompilationException {
         List<Expression> recordExprs = new ArrayList<>();
         List<FieldBinding> fieldBindings = new ArrayList<>();
+        Set<String> fieldNames = new HashSet<>();
+
         for (Projection projection : selectRegular.getProjections()) {
             if (projection.varStar()) {
-                SourceLocation sourceLoc = projection.getSourceLocation();
                 if (!fieldBindings.isEmpty()) {
                     RecordConstructor recordConstr = new RecordConstructor(new ArrayList<>(fieldBindings));
-                    recordConstr.setSourceLocation(sourceLoc);
+                    recordConstr.setSourceLocation(selectRegular.getSourceLocation());
                     recordExprs.add(recordConstr);
                     fieldBindings.clear();
                 }
                 Expression projectionExpr = projection.getExpression();
+                SourceLocation sourceLoc = projection.getSourceLocation();
                 CallExpr toObjectExpr = new CallExpr(new FunctionSignature(BuiltinFunctions.TO_OBJECT),
                         Collections.singletonList(projectionExpr));
                 toObjectExpr.setSourceLocation(sourceLoc);
@@ -755,21 +759,22 @@
                 recordExprs.add(ifMissingOrNullExpr);
             } else if (projection.star()) {
                 if (selectBlock.hasGroupbyClause()) {
-                    getGroupBindings(selectBlock.getGroupbyClause(), fieldBindings);
+                    getGroupBindings(selectBlock.getGroupbyClause(), fieldBindings, fieldNames);
                     if (selectBlock.hasLetClausesAfterGroupby()) {
-                        getLetBindings(selectBlock.getLetListAfterGroupby(), fieldBindings);
+                        getLetBindings(selectBlock.getLetListAfterGroupby(), fieldBindings, fieldNames);
                     }
                 } else if (selectBlock.hasFromClause()) {
-                    getFromBindings(selectBlock.getFromClause(), fieldBindings);
+                    getFromBindings(selectBlock.getFromClause(), fieldBindings, fieldNames);
                     if (selectBlock.hasLetClauses()) {
-                        getLetBindings(selectBlock.getLetList(), fieldBindings);
+                        getLetBindings(selectBlock.getLetList(), fieldBindings, fieldNames);
                     }
                 } else if (selectBlock.hasLetClauses()) {
-                    getLetBindings(selectBlock.getLetList(), fieldBindings);
+                    getLetBindings(selectBlock.getLetList(), fieldBindings, fieldNames);
                 }
+            } else if (projection.hasName()) {
+                fieldBindings.add(getFieldBinding(projection, fieldNames));
             } else {
-                fieldBindings.add(new FieldBinding(new LiteralExpr(new StringLiteral(projection.getName())),
-                        projection.getExpression()));
+                throw new CompilationException(ErrorCode.COMPILATION_ILLEGAL_STATE, projection.getSourceLocation());
             }
         }
         if (!fieldBindings.isEmpty()) {
@@ -789,49 +794,68 @@
     }
 
     // Generates all field bindings according to the from clause.
-    private void getFromBindings(FromClause fromClause, List<FieldBinding> outFieldBindings) {
+    private void getFromBindings(FromClause fromClause, List<FieldBinding> outFieldBindings, Set<String> outFieldNames)
+            throws CompilationException {
         for (FromTerm fromTerm : fromClause.getFromTerms()) {
-            outFieldBindings.add(getFieldBinding(fromTerm.getLeftVariable()));
+            outFieldBindings.add(getFieldBinding(fromTerm.getLeftVariable(), outFieldNames));
             if (fromTerm.hasPositionalVariable()) {
-                outFieldBindings.add(getFieldBinding(fromTerm.getPositionalVariable()));
+                outFieldBindings.add(getFieldBinding(fromTerm.getPositionalVariable(), outFieldNames));
             }
             if (!fromTerm.hasCorrelateClauses()) {
                 continue;
             }
             for (AbstractBinaryCorrelateClause correlateClause : fromTerm.getCorrelateClauses()) {
-                outFieldBindings.add(getFieldBinding(correlateClause.getRightVariable()));
+                outFieldBindings.add(getFieldBinding(correlateClause.getRightVariable(), outFieldNames));
                 if (correlateClause.hasPositionalVariable()) {
-                    outFieldBindings.add(getFieldBinding(correlateClause.getPositionalVariable()));
+                    outFieldBindings.add(getFieldBinding(correlateClause.getPositionalVariable(), outFieldNames));
                 }
             }
         }
     }
 
     // Generates all field bindings according to the from clause.
-    private void getGroupBindings(GroupbyClause groupbyClause, List<FieldBinding> outFieldBindings) {
+    private void getGroupBindings(GroupbyClause groupbyClause, List<FieldBinding> outFieldBindings,
+            Set<String> outFieldNames) throws CompilationException {
         for (GbyVariableExpressionPair pair : groupbyClause.getGbyPairList()) {
-            outFieldBindings.add(getFieldBinding(pair.getVar()));
+            outFieldBindings.add(getFieldBinding(pair.getVar(), outFieldNames));
         }
         if (groupbyClause.hasGroupVar()) {
-            outFieldBindings.add(getFieldBinding(groupbyClause.getGroupVar()));
+            outFieldBindings.add(getFieldBinding(groupbyClause.getGroupVar(), outFieldNames));
         }
         if (groupbyClause.hasWithMap()) {
-            throw new IllegalStateException(groupbyClause.getWithVarMap().values().toString()); // no WITH in SQLPP
+            // no WITH in SQLPP
+            throw new CompilationException(ErrorCode.COMPILATION_ILLEGAL_STATE, groupbyClause.getSourceLocation(),
+                    groupbyClause.getWithVarMap().values().toString());
         }
     }
 
     // Generates all field bindings according to the let clause.
-    private void getLetBindings(List<LetClause> letClauses, List<FieldBinding> outFieldBindings) {
+    private void getLetBindings(List<LetClause> letClauses, List<FieldBinding> outFieldBindings,
+            Set<String> outFieldNames) throws CompilationException {
         for (LetClause letClause : letClauses) {
-            outFieldBindings.add(getFieldBinding(letClause.getVarExpr()));
+            outFieldBindings.add(getFieldBinding(letClause.getVarExpr(), outFieldNames));
         }
     }
 
     // Generates a field binding for a variable.
-    private FieldBinding getFieldBinding(VariableExpr var) {
-        LiteralExpr fieldName = new LiteralExpr(
-                new StringLiteral(SqlppVariableUtil.variableNameToDisplayedFieldName(var.getVar().getValue())));
-        return new FieldBinding(fieldName, var);
+    private FieldBinding getFieldBinding(VariableExpr varExpr, Set<String> outFieldNames) throws CompilationException {
+        String fieldName = SqlppVariableUtil.variableNameToDisplayedFieldName(varExpr.getVar().getValue());
+        return generateFieldBinding(fieldName, varExpr, outFieldNames, varExpr.getSourceLocation());
+    }
+
+    // Generates a field binding for a named projection.
+    private FieldBinding getFieldBinding(Projection projection, Set<String> outFieldNames) throws CompilationException {
+        String fieldName = projection.getName();
+        Expression fieldValueExpr = projection.getExpression();
+        return generateFieldBinding(fieldName, fieldValueExpr, outFieldNames, projection.getSourceLocation());
+    }
+
+    private FieldBinding generateFieldBinding(String fieldName, Expression fieldValueExpr, Set<String> outFieldNames,
+            SourceLocation sourceLoc) throws CompilationException {
+        if (!outFieldNames.add(fieldName)) {
+            throw new CompilationException(ErrorCode.DUPLICATE_FIELD_NAME, sourceLoc, fieldName);
+        }
+        return new FieldBinding(new LiteralExpr(new StringLiteral(fieldName)), fieldValueExpr);
     }
 
     @Override
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/RecoveryTask.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/RecoveryTask.java
index 5d722e7..0172b28 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/RecoveryTask.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/RecoveryTask.java
@@ -47,7 +47,6 @@
     private final IRetryPolicyFactory retryPolicyFactory;
     private final MetadataProvider metadataProvider;
     private final IClusterStateManager clusterStateManager;
-    private Exception failure;
 
     public RecoveryTask(ICcApplicationContext appCtx, ActiveEntityEventsListener listener,
             IRetryPolicyFactory retryPolicyFactory) {
@@ -105,50 +104,46 @@
         }
     }
 
-    protected Void doRecover(IRetryPolicy policy)
-            throws AlgebricksException, HyracksDataException, InterruptedException {
+    protected Void doRecover(IRetryPolicy policy) throws AlgebricksException, InterruptedException {
         LOGGER.log(level, "Actual Recovery task has started");
-        if (listener.getState() != ActivityState.TEMPORARILY_FAILED) {
-            LOGGER.log(level, "but its state is not temp failure and so we're just returning");
-            return null;
-        }
-        LOGGER.log(level, "calling the policy");
+        Exception failure = null;
         while (policy.retry(failure)) {
             synchronized (listener) {
-                if (cancelRecovery) {
-                    return null;
-                }
-                while (clusterStateManager.getState() != ClusterState.ACTIVE) {
-                    if (cancelRecovery) {
-                        return null;
-                    }
+                while (!cancelRecovery && clusterStateManager.getState() != ClusterState.ACTIVE) {
                     listener.wait();
                 }
+                if (cancelRecovery) {
+                    LOGGER.log(level, "Recovery has been cancelled");
+                    return null;
+                }
             }
             IMetadataLockManager lockManager = metadataProvider.getApplicationContext().getMetadataLockManager();
-            lockManager.acquireActiveEntityWriteLock(metadataProvider.getLocks(),
-                    listener.getEntityId().getDataverse() + '.' + listener.getEntityId().getEntityName());
-            for (Dataset dataset : listener.getDatasets()) {
-                lockManager.acquireDataverseReadLock(metadataProvider.getLocks(), dataset.getDataverseName());
-                lockManager.acquireDatasetExclusiveModificationLock(metadataProvider.getLocks(),
-                        DatasetUtil.getFullyQualifiedName(dataset));
-            }
-            synchronized (listener) {
-                try {
-                    if (cancelRecovery) {
-                        return null;
-                    }
-                    listener.setState(ActivityState.RECOVERING);
-                    listener.doStart(metadataProvider);
-                    return null;
-                } catch (Exception e) {
-                    LOGGER.log(level, "Attempt to revive " + listener.getEntityId() + " failed", e);
-                    listener.setState(ActivityState.TEMPORARILY_FAILED);
-                    failure = e;
-                } finally {
-                    metadataProvider.getLocks().reset();
+            try {
+                lockManager.acquireActiveEntityWriteLock(metadataProvider.getLocks(),
+                        listener.getEntityId().getDataverse() + '.' + listener.getEntityId().getEntityName());
+                for (Dataset dataset : listener.getDatasets()) {
+                    lockManager.acquireDataverseReadLock(metadataProvider.getLocks(), dataset.getDataverseName());
+                    lockManager.acquireDatasetExclusiveModificationLock(metadataProvider.getLocks(),
+                            DatasetUtil.getFullyQualifiedName(dataset));
                 }
-                listener.notifyAll();
+                synchronized (listener) {
+                    try {
+                        if (!cancelRecovery && listener.getState() == ActivityState.TEMPORARILY_FAILED) {
+                            listener.setState(ActivityState.RECOVERING);
+                            listener.doStart(metadataProvider);
+                        }
+                        LOGGER.log(level, "Recovery completed successfully");
+                        return null;
+                    } finally {
+                        listener.notifyAll();
+                    }
+                }
+            } catch (Exception e) {
+                LOGGER.log(level, "Attempt to revive " + listener.getEntityId() + " failed", e);
+                listener.setState(ActivityState.TEMPORARILY_FAILED);
+                failure = e;
+            } finally {
+                metadataProvider.getLocks().reset();
             }
         }
         // Recovery task is essntially over now either through failure or through cancellation(stop)
@@ -160,6 +155,9 @@
                     // 1. set the state to permanent failure.
                     // 2. set the entity to not running to avoid auto recovery attempt
                     && listener.getState() != ActivityState.SUSPENDED) {
+                LOGGER.log(level, "Recovery is cancelled because the current state {} is neither {} nor {}",
+                        listener.getState(), ActivityState.TEMPORARILY_FAILED,
+                        listener.getState() != ActivityState.SUSPENDED);
                 return null;
             }
         }
@@ -172,10 +170,7 @@
                         DatasetUtil.getFullyQualifiedName(dataset));
             }
             synchronized (listener) {
-                if (cancelRecovery) {
-                    return null;
-                }
-                if (listener.getState() == ActivityState.TEMPORARILY_FAILED) {
+                if (!cancelRecovery && listener.getState() == ActivityState.TEMPORARILY_FAILED) {
                     LOGGER.warn("Recovery for {} permanently failed", listener.getEntityId());
                     listener.setState(ActivityState.STOPPED);
                     listener.setRunning(metadataProvider, false);
@@ -187,8 +182,4 @@
         }
         return null;
     }
-
-    public Exception getFailure() {
-        return failure;
-    }
 }
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/MessagingChannelWriteInterface.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/MessagingChannelWriteInterface.java
index 9ad870c..84f7831 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/MessagingChannelWriteInterface.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/MessagingChannelWriteInterface.java
@@ -54,7 +54,7 @@
             ecodeSent = true;
             ccb.reportLocalEOS();
             adjustChannelWritability();
-        } else if (eos && !eosSent) {
+        } else if (isPendingCloseWrite()) {
             writerState.getCommand().setChannelId(channelId);
             writerState.getCommand().setCommandType(MuxDemuxCommand.CommandType.CLOSE_CHANNEL);
             writerState.getCommand().setData(0);
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/logging/CheckpointingTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/logging/CheckpointingTest.java
index 22dec60..e67246a 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/logging/CheckpointingTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/logging/CheckpointingTest.java
@@ -65,7 +65,6 @@
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
-import org.mockito.stubbing.Answer;
 
 public class CheckpointingTest {
 
@@ -134,7 +133,7 @@
                 ICheckpointManager checkpointManager = nc.getTransactionSubsystem().getCheckpointManager();
                 LogManager logManager = (LogManager) nc.getTransactionSubsystem().getLogManager();
                 // Number of log files after node startup should be one
-                int numberOfLogFiles = logManager.getLogFileIds().size();
+                int numberOfLogFiles = logManager.getOrderedLogFileIds().size();
                 Assert.assertEquals(1, numberOfLogFiles);
 
                 // Low-water mark LSN
@@ -142,10 +141,10 @@
                 // Low-water mark log file id
                 long initialLowWaterMarkFileId = logManager.getLogFileId(lowWaterMarkLSN);
                 // Initial Low-water mark should be in the only available log file
-                Assert.assertEquals(initialLowWaterMarkFileId, logManager.getLogFileIds().get(0).longValue());
+                Assert.assertEquals(initialLowWaterMarkFileId, logManager.getOrderedLogFileIds().get(0).longValue());
 
                 // Insert records until a new log file is created
-                while (logManager.getLogFileIds().size() == 1) {
+                while (logManager.getOrderedLogFileIds().size() == 1) {
                     ITupleReference tuple = tupleGenerator.next();
                     DataflowUtils.addTupleToFrame(tupleAppender, tuple, insertOp);
                 }
@@ -160,9 +159,9 @@
                      * the low-water mark is still in it (i.e. it is still required for
                      * recovery)
                      */
-                    int numberOfLogFilesBeforeCheckpoint = logManager.getLogFileIds().size();
+                    int numberOfLogFilesBeforeCheckpoint = logManager.getOrderedLogFileIds().size();
                     checkpointManager.tryCheckpoint(logManager.getAppendLSN());
-                    int numberOfLogFilesAfterCheckpoint = logManager.getLogFileIds().size();
+                    int numberOfLogFilesAfterCheckpoint = logManager.getOrderedLogFileIds().size();
                     Assert.assertEquals(numberOfLogFilesBeforeCheckpoint, numberOfLogFilesAfterCheckpoint);
 
                     /*
@@ -203,7 +202,7 @@
 
                     checkpointManager.tryCheckpoint(lowWaterMarkLSN);
                     // Validate initialLowWaterMarkFileId was deleted
-                    for (Long fileId : logManager.getLogFileIds()) {
+                    for (Long fileId : logManager.getOrderedLogFileIds()) {
                         Assert.assertNotEquals(initialLowWaterMarkFileId, fileId.longValue());
                     }
 
@@ -298,7 +297,7 @@
                 // Make sure the valid checkout wouldn't force full recovery
                 Assert.assertTrue(validCheckpoint.getMinMCTFirstLsn() >= minFirstLSN);
                 // Add a corrupted (empty) checkpoint file with a timestamp > than current checkpoint
-                Path corruptedCheckpointPath = checkpointManager.getCheckpointPath(validCheckpoint.getTimeStamp() + 1);
+                Path corruptedCheckpointPath = checkpointManager.getCheckpointPath(validCheckpoint.getId() + 1);
                 File corruptedCheckpoint = corruptedCheckpointPath.toFile();
                 corruptedCheckpoint.createNewFile();
                 // Make sure the corrupted checkpoint file was created
@@ -306,11 +305,11 @@
                 // Try to get the latest checkpoint again
                 Checkpoint cpAfterCorruption = checkpointManager.getLatest();
                 // Make sure the valid checkpoint was returned
-                Assert.assertEquals(validCheckpoint.getTimeStamp(), cpAfterCorruption.getTimeStamp());
+                Assert.assertEquals(validCheckpoint.getId(), cpAfterCorruption.getId());
                 // Make sure the corrupted checkpoint file was deleted
                 Assert.assertFalse(corruptedCheckpoint.exists());
                 // Corrupt the valid checkpoint by replacing its content
-                final Path validCheckpointPath = checkpointManager.getCheckpointPath(validCheckpoint.getTimeStamp());
+                final Path validCheckpointPath = checkpointManager.getCheckpointPath(validCheckpoint.getId());
                 File validCheckpointFile = validCheckpointPath.toFile();
                 Assert.assertTrue(validCheckpointFile.exists());
                 // Delete the valid checkpoint file and create it as an empty file
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/txn/LogManagerTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/txn/LogManagerTest.java
index 4453a1d..964bf66 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/txn/LogManagerTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/txn/LogManagerTest.java
@@ -147,7 +147,7 @@
     public void interruptedLogFileSwitch() throws Exception {
         final INcApplicationContext ncAppCtx = (INcApplicationContext) integrationUtil.ncs[0].getApplicationContext();
         final LogManager logManager = (LogManager) ncAppCtx.getTransactionSubsystem().getLogManager();
-        int logFileCountBeforeInterrupt = logManager.getLogFileIds().size();
+        int logFileCountBeforeInterrupt = logManager.getOrderedLogFileIds().size();
 
         // ensure an interrupted transactor will create next log file but will fail to position the log channel
         final AtomicBoolean failed = new AtomicBoolean(false);
@@ -162,7 +162,7 @@
         interruptedTransactor.start();
         interruptedTransactor.join();
         // ensure a new log file was created and survived interrupt
-        int logFileCountAfterInterrupt = logManager.getLogFileIds().size();
+        int logFileCountAfterInterrupt = logManager.getOrderedLogFileIds().size();
         Assert.assertEquals(logFileCountBeforeInterrupt + 1, logFileCountAfterInterrupt);
         Assert.assertFalse(failed.get());
 
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/flwor/query-ASTERIXDB-2446-2/query-ASTERIXDB-2446-2.01.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/flwor/query-ASTERIXDB-2446-2/query-ASTERIXDB-2446-2.01.ddl.sqlpp
new file mode 100644
index 0000000..12a6034
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/flwor/query-ASTERIXDB-2446-2/query-ASTERIXDB-2446-2.01.ddl.sqlpp
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+drop  dataverse test if exists;
+create  dataverse test;
+
+use test;
+
+create type test.empInfoType as
+{
+  id : bigint
+};
+
+create  dataset empDataset(empInfoType) primary key id;
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/flwor/query-ASTERIXDB-2446-2/query-ASTERIXDB-2446-2.02.update.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/flwor/query-ASTERIXDB-2446-2/query-ASTERIXDB-2446-2.02.update.sqlpp
new file mode 100644
index 0000000..04ab807
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/flwor/query-ASTERIXDB-2446-2/query-ASTERIXDB-2446-2.02.update.sqlpp
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use test;
+
+load  dataset empDataset using localfs ((`path`=`asterix_nc1://data/types/empDataset.adm`),(`format`=`adm`));
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/flwor/query-ASTERIXDB-2446-2/query-ASTERIXDB-2446-2.03.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/flwor/query-ASTERIXDB-2446-2/query-ASTERIXDB-2446-2.03.query.sqlpp
new file mode 100644
index 0000000..9577855
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/flwor/query-ASTERIXDB-2446-2/query-ASTERIXDB-2446-2.03.query.sqlpp
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/*
+ * Description: Ensure error if there's a duplicate field name in the SELECT clause
+ *              Project expression and constant
+ */
+
+use test;
+
+select empno as a, 2 as a
+from empDataset
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/flwor/query-ASTERIXDB-2446-2/query-ASTERIXDB-2446-2.04.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/flwor/query-ASTERIXDB-2446-2/query-ASTERIXDB-2446-2.04.query.sqlpp
new file mode 100644
index 0000000..5eac30e
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/flwor/query-ASTERIXDB-2446-2/query-ASTERIXDB-2446-2.04.query.sqlpp
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/*
+ * Description: Ensure error if there's a duplicate field name in the SELECT clause
+ *              Project expression and expression
+ */
+
+use test;
+
+select empno as b, name as b
+from empDataset
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/flwor/query-ASTERIXDB-2446-2/query-ASTERIXDB-2446-2.05.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/flwor/query-ASTERIXDB-2446-2/query-ASTERIXDB-2446-2.05.query.sqlpp
new file mode 100644
index 0000000..ea4372a
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/flwor/query-ASTERIXDB-2446-2/query-ASTERIXDB-2446-2.05.query.sqlpp
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/*
+ * Description: Ensure error if there's a duplicate field name in the SELECT clause
+ *              FROM variable and another field
+ */
+
+use test;
+
+select c, empno as c
+from empDataset c
+
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/flwor/query-ASTERIXDB-2446-2/query-ASTERIXDB-2446-2.06.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/flwor/query-ASTERIXDB-2446-2/query-ASTERIXDB-2446-2.06.query.sqlpp
new file mode 100644
index 0000000..788577b
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/flwor/query-ASTERIXDB-2446-2/query-ASTERIXDB-2446-2.06.query.sqlpp
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/*
+ * Description: Ensure error if there's a duplicate field name in the SELECT clause
+ *              Project * (FROM variable) and another field
+ */
+
+use test;
+
+select *, empno as d
+from empDataset d
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/flwor/query-ASTERIXDB-2446-2/query-ASTERIXDB-2446-2.07.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/flwor/query-ASTERIXDB-2446-2/query-ASTERIXDB-2446-2.07.query.sqlpp
new file mode 100644
index 0000000..232d29c
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/flwor/query-ASTERIXDB-2446-2/query-ASTERIXDB-2446-2.07.query.sqlpp
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/*
+ * Description: Ensure error if there's a duplicate field name in the SELECT clause
+ *              Project * (LET variable) and another field
+ */
+
+use test;
+
+select *, emp.name as e
+from empDataset emp
+let e = d.deptno
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/flwor/query-ASTERIXDB-2446-2/query-ASTERIXDB-2446-2.08.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/flwor/query-ASTERIXDB-2446-2/query-ASTERIXDB-2446-2.08.query.sqlpp
new file mode 100644
index 0000000..48c96a0
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/flwor/query-ASTERIXDB-2446-2/query-ASTERIXDB-2446-2.08.query.sqlpp
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/*
+ * Description: Ensure error if there's a duplicate field name in the SELECT clause
+ *              Project * (GROUP BY key variable) and another field
+ */
+
+use test;
+
+select *, count(*) as f
+from empDataset emp
+group by emp.age f
+
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/flwor/query-ASTERIXDB-2446-2/query-ASTERIXDB-2446-2.09.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/flwor/query-ASTERIXDB-2446-2/query-ASTERIXDB-2446-2.09.query.sqlpp
new file mode 100644
index 0000000..9655bb7
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/flwor/query-ASTERIXDB-2446-2/query-ASTERIXDB-2446-2.09.query.sqlpp
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/*
+ * Description: Ensure error if there's a duplicate field name in the SELECT clause
+ *              Project * (LET after GROUP BY variable) and another field
+ */
+
+use test;
+
+select *, count(*) as g
+from empDataset emp
+group by emp.age f
+let g = f + 1
+
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
index 6edc7bf..9bb995e 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
@@ -100,6 +100,18 @@
         <expected-error>ASX0013: Duplicate field name "e"</expected-error>
       </compilation-unit>
     </test-case>
+    <test-case FilePath="flwor">
+      <compilation-unit name="query-ASTERIXDB-2446-2">
+        <output-dir compare="Text">query-ASTERIXDB-883</output-dir>
+        <expected-error>ASX0013: Duplicate field name "a" (in line 27, at column 20)</expected-error>
+        <expected-error>ASX0013: Duplicate field name "b" (in line 27, at column 20)</expected-error>
+        <expected-error>ASX0013: Duplicate field name "c" (in line 27, at column 11)</expected-error>
+        <expected-error>ASX0013: Duplicate field name "d" (in line 27, at column 11)</expected-error>
+        <expected-error>ASX0013: Duplicate field name "e" (in line 27, at column 14)</expected-error>
+        <expected-error>ASX0013: Duplicate field name "f" (in line 27, at column 11)</expected-error>
+        <expected-error>ASX0013: Duplicate field name "g" (in line 27, at column 11)</expected-error>
+      </compilation-unit>
+    </test-case>
   </test-group>
   <test-group name="explain">
     <test-case FilePath="explain">
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IndexCheckpoint.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IndexCheckpoint.java
index f84167e..9654473 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IndexCheckpoint.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IndexCheckpoint.java
@@ -24,6 +24,8 @@
 
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.storage.am.lsm.common.impls.LSMComponentId;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
 
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.core.JsonProcessingException;
@@ -31,6 +33,7 @@
 
 public class IndexCheckpoint {
 
+    private static final Logger LOGGER = LogManager.getLogger();
     private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
     private static final long INITIAL_CHECKPOINT_ID = 0;
     private long id;
@@ -52,6 +55,9 @@
     public static IndexCheckpoint next(IndexCheckpoint latest, long lowWatermark, long validComponentSequence,
             long lastComponentId) {
         if (lowWatermark < latest.getLowWatermark()) {
+            if (LOGGER.isErrorEnabled()) {
+                LOGGER.error("low watermark {} less than the latest checkpoint low watermark {}", lowWatermark, latest);
+            }
             throw new IllegalStateException("Low watermark should always be increasing");
         }
         IndexCheckpoint next = new IndexCheckpoint();
@@ -104,4 +110,13 @@
             throw HyracksDataException.create(e);
         }
     }
+
+    @Override
+    public String toString() {
+        try {
+            return asJson();
+        } catch (HyracksDataException e) {
+            throw new IllegalStateException(e);
+        }
+    }
 }
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/Checkpoint.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/Checkpoint.java
index 7f1a1c7..8fe0353 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/Checkpoint.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/Checkpoint.java
@@ -18,28 +18,29 @@
  */
 package org.apache.asterix.common.transactions;
 
-import com.fasterxml.jackson.databind.JsonNode;
-import com.fasterxml.jackson.databind.node.ObjectNode;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.io.IJsonSerializable;
 import org.apache.hyracks.api.io.IPersistedResourceRegistry;
 
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+
 public class Checkpoint implements Comparable<Checkpoint>, IJsonSerializable {
 
     private static final long serialVersionUID = 1L;
     private final long checkpointLsn;
     private final long minMCTFirstLsn;
     private final long maxTxnId;
-    private final long timeStamp;
     private final boolean sharp;
     private final int storageVersion;
+    private long id;
 
-    public Checkpoint(long checkpointLsn, long minMCTFirstLsn, long maxTxnId, long timeStamp, boolean sharp,
+    public Checkpoint(long id, long checkpointLsn, long minMCTFirstLsn, long maxTxnId, boolean sharp,
             int storageVersion) {
+        this.id = id;
         this.checkpointLsn = checkpointLsn;
         this.minMCTFirstLsn = minMCTFirstLsn;
         this.maxTxnId = maxTxnId;
-        this.timeStamp = timeStamp;
         this.sharp = sharp;
         this.storageVersion = storageVersion;
     }
@@ -56,8 +57,8 @@
         return maxTxnId;
     }
 
-    public long getTimeStamp() {
-        return timeStamp;
+    public long getId() {
+        return id;
     }
 
     public boolean isSharp() {
@@ -69,68 +70,47 @@
     }
 
     @Override
-    public int compareTo(Checkpoint checkpoint) {
-        long compareTimeStamp = checkpoint.getTimeStamp();
-
-        // Descending order
-        long diff = compareTimeStamp - this.timeStamp;
-        if (diff > 0) {
-            return 1;
-        } else if (diff == 0) {
-            return 0;
-        } else {
-            return -1;
-        }
+    public int compareTo(Checkpoint other) {
+        return Long.compare(this.id, other.id);
     }
 
     @Override
-    public boolean equals(Object obj) {
-        if (this == obj) {
+    public boolean equals(Object o) {
+        if (this == o) {
             return true;
         }
-        if (obj == null) {
+        if (o == null || getClass() != o.getClass()) {
             return false;
         }
-        if (!(obj instanceof Checkpoint)) {
-            return false;
-        }
-        Checkpoint other = (Checkpoint) obj;
-        return compareTo(other) == 0;
+        Checkpoint that = (Checkpoint) o;
+        return id == that.id;
     }
 
     @Override
     public int hashCode() {
-        final int prime = 31;
-        int result = 1;
-        result = prime * result + (int) (checkpointLsn ^ (checkpointLsn >>> 32));
-        result = prime * result + Long.hashCode(maxTxnId);
-        result = prime * result + (int) (minMCTFirstLsn ^ (minMCTFirstLsn >>> 32));
-        result = prime * result + (sharp ? 1231 : 1237);
-        result = prime * result + storageVersion;
-        result = prime * result + (int) (timeStamp ^ (timeStamp >>> 32));
-        return result;
+        return Long.hashCode(id);
     }
 
     @Override
     public JsonNode toJson(IPersistedResourceRegistry registry) throws HyracksDataException {
         final ObjectNode checkpointJson = registry.getClassIdentifier(getClass(), serialVersionUID);
+        checkpointJson.put("id", id);
         checkpointJson.put("checkpointLsn", checkpointLsn);
         checkpointJson.put("minMCTFirstLsn", minMCTFirstLsn);
         checkpointJson.put("maxTxnId", maxTxnId);
-        checkpointJson.put("timeStamp", timeStamp);
-        checkpointJson.put("sharp", timeStamp);
+        checkpointJson.put("sharp", sharp);
         checkpointJson.put("storageVersion", storageVersion);
         return checkpointJson;
     }
 
     @SuppressWarnings("squid:S1172") // unused parameter
     public static IJsonSerializable fromJson(IPersistedResourceRegistry registry, JsonNode json) {
+        long id = json.get("id").asLong();
         long checkpointLsn = json.get("checkpointLsn").asLong();
         long minMCTFirstLsn = json.get("minMCTFirstLsn").asLong();
         long maxTxnId = json.get("maxTxnId").asLong();
-        long timeStamp = json.get("timeStamp").asLong();
         boolean sharp = json.get("sharp").asBoolean();
         int storageVersion = json.get("storageVersion").asInt();
-        return new Checkpoint(checkpointLsn, minMCTFirstLsn, maxTxnId, timeStamp, sharp, storageVersion);
+        return new Checkpoint(id, checkpointLsn, minMCTFirstLsn, maxTxnId, sharp, storageVersion);
     }
 }
\ No newline at end of file
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ICheckpointManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ICheckpointManager.java
index e3cf8b8..36cea55 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ICheckpointManager.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ICheckpointManager.java
@@ -18,7 +18,6 @@
  */
 package org.apache.asterix.common.transactions;
 
-import org.apache.asterix.common.exceptions.ACIDException;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.lifecycle.ILifeCycleComponent;
 
@@ -26,10 +25,8 @@
 
     /**
      * @return The latest checkpoint on disk if any exists. Otherwise null.
-     * @throws ACIDException
-     *             when a checkpoint file cannot be read.
      */
-    Checkpoint getLatest() throws ACIDException;
+    Checkpoint getLatest();
 
     /**
      * Performs a sharp checkpoint.
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StorageConstants.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StorageConstants.java
index 19b006f..644f3c0 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StorageConstants.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StorageConstants.java
@@ -41,7 +41,7 @@
     /**
      * The storage version of AsterixDB related artifacts (e.g. log files, checkpoint files, etc..).
      */
-    private static final int LOCAL_STORAGE_VERSION = 4;
+    private static final int LOCAL_STORAGE_VERSION = 5;
 
     /**
      * The storage version of AsterixDB stack.
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorNodePushable.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorNodePushable.java
index ba8074fe..b855981 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorNodePushable.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorNodePushable.java
@@ -206,6 +206,12 @@
             public void finish() throws HyracksDataException {
                 lsmAccessor.getCtx().setOperation(IndexOperation.UPSERT);
             }
+
+            @Override
+            public void fail(Throwable th) {
+                // We must fail before we exit the components
+                frameOpCallback.fail(th);
+            }
         };
         tracer = ctx.getJobletContext().getServiceContext().getTracer();
         traceCategory = tracer.getRegistry().get(TraceUtils.LATENCY);
@@ -314,12 +320,7 @@
     public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
         accessor.reset(buffer);
         int itemCount = accessor.getTupleCount();
-        try {
-            lsmAccessor.batchOperate(accessor, tuple, processor, frameOpCallback);
-        } catch (Throwable th) {// NOSONAR: Must notify of all failures
-            frameOpCallback.fail(th);
-            throw th;
-        }
+        lsmAccessor.batchOperate(accessor, tuple, processor, frameOpCallback);
         if (itemCount > 0) {
             lastRecordInTimeStamp = System.currentTimeMillis();
         }
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java
index c0d18df..0a6dda9 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java
@@ -29,7 +29,6 @@
 import java.nio.file.Paths;
 import java.util.ArrayList;
 import java.util.Collections;
-import java.util.Comparator;
 import java.util.List;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutionException;
@@ -103,7 +102,8 @@
         nodeId = txnSubsystem.getId();
         flushLogsQ = new LinkedBlockingQueue<>();
         txnSubsystem.getApplicationContext().getThreadExecutor().execute(new FlushLogsLogger());
-        initializeLogManager(SMALLEST_LOG_FILE_ID);
+        final long onDiskMaxLogFileId = getOnDiskMaxLogFileId();
+        initializeLogManager(onDiskMaxLogFileId);
     }
 
     private void initializeLogManager(long nextLogFileId) {
@@ -365,56 +365,32 @@
         }
     }
 
-    private long initializeLogAnchor(long nextLogFileId) {
-        long fileId = 0;
-        long offset = 0;
-        File fileLogDir = new File(logDir);
-        try {
-            if (fileLogDir.exists()) {
-                List<Long> logFileIds = getLogFileIds();
-                if (logFileIds.isEmpty()) {
-                    fileId = nextLogFileId;
-                    createFileIfNotExists(getLogFilePath(fileId));
-                    if (LOGGER.isInfoEnabled()) {
-                        LOGGER.info("created a log file: " + getLogFilePath(fileId));
-                    }
-                } else {
-                    fileId = logFileIds.get(logFileIds.size() - 1);
-                    File logFile = new File(getLogFilePath(fileId));
-                    offset = logFile.length();
-                }
-            } else {
-                fileId = nextLogFileId;
-                createNewDirectory(logDir);
-                if (LOGGER.isInfoEnabled()) {
-                    LOGGER.info("created the log directory: " + logManagerProperties.getLogDir());
-                }
-                createFileIfNotExists(getLogFilePath(fileId));
-                if (LOGGER.isInfoEnabled()) {
-                    LOGGER.info("created a log file: " + getLogFilePath(fileId));
-                }
-            }
-        } catch (IOException ioe) {
-            throw new IllegalStateException("Failed to initialize the log anchor", ioe);
-        }
+    private long initializeLogAnchor(long fileId) {
+        final String logFilePath = getLogFilePath(fileId);
+        createFileIfNotExists(logFilePath);
+        final File logFile = new File(logFilePath);
+        long offset = logFile.length();
         if (LOGGER.isInfoEnabled()) {
-            LOGGER.info("log file Id: " + fileId + ", offset: " + offset);
+            LOGGER.info("initializing log anchor with log file Id: {} at offset: {}", fileId, offset);
         }
-        return logFileSize * fileId + offset;
+        return getLogFileFirstLsn(fileId) + offset;
     }
 
     @Override
     public void renewLogFiles() {
         terminateLogFlusher();
         closeCurrentLogFile();
-        long lastMaxLogFileId = deleteAllLogFiles();
-        initializeLogManager(lastMaxLogFileId + 1);
+        long nextLogFileId = getNextLogFileId();
+        createFileIfNotExists(getLogFilePath(nextLogFileId));
+        final long logFileFirstLsn = getLogFileFirstLsn(nextLogFileId);
+        deleteOldLogFiles(logFileFirstLsn);
+        initializeLogManager(nextLogFileId);
     }
 
     @Override
     public void deleteOldLogFiles(long checkpointLSN) {
         Long checkpointLSNLogFileID = getLogFileId(checkpointLSN);
-        List<Long> logFileIds = getLogFileIds();
+        List<Long> logFileIds = getOrderedLogFileIds();
         if (!logFileIds.isEmpty()) {
             //sort log files from oldest to newest
             Collections.sort(logFileIds);
@@ -461,24 +437,7 @@
         }
     }
 
-    private long deleteAllLogFiles() {
-        List<Long> logFileIds = getLogFileIds();
-        if (!logFileIds.isEmpty()) {
-            for (Long id : logFileIds) {
-                File file = new File(getLogFilePath(id));
-                LOGGER.info("Deleting log file: " + file.getAbsolutePath());
-                if (!file.delete()) {
-                    throw new IllegalStateException("Failed to delete a file: " + file.getAbsolutePath());
-                }
-                LOGGER.info("log file: " + file.getAbsolutePath() + " was deleted successfully");
-            }
-            return logFileIds.get(logFileIds.size() - 1);
-        } else {
-            throw new IllegalStateException("Couldn't find any log files.");
-        }
-    }
-
-    public List<Long> getLogFileIds() {
+    public List<Long> getOrderedLogFileIds() {
         File fileLogDir = new File(logDir);
         String[] logFileNames = null;
         List<Long> logFileIds = null;
@@ -510,12 +469,7 @@
         for (String fileName : logFileNames) {
             logFileIds.add(Long.parseLong(fileName.substring(logFilePrefix.length() + 1)));
         }
-        Collections.sort(logFileIds, new Comparator<Long>() {
-            @Override
-            public int compare(Long arg0, Long arg1) {
-                return arg0.compareTo(arg1);
-            }
-        });
+        logFileIds.sort(Long::compareTo);
         return logFileIds;
     }
 
@@ -531,17 +485,21 @@
         return lsn / logFileSize;
     }
 
-    private static boolean createFileIfNotExists(String path) throws IOException {
-        File file = new File(path);
-        File parentFile = file.getParentFile();
-        if (parentFile != null) {
-            parentFile.mkdirs();
+    private static void createFileIfNotExists(String path) {
+        try {
+            File file = new File(path);
+            if (file.exists()) {
+                return;
+            }
+            File parentFile = file.getParentFile();
+            if (parentFile != null) {
+                parentFile.mkdirs();
+            }
+            Files.createFile(file.toPath());
+            LOGGER.info("Created log file {}", path);
+        } catch (IOException e) {
+            throw new IllegalStateException("Failed to create file in " + path, e);
         }
-        return file.createNewFile();
-    }
-
-    private static boolean createNewDirectory(String path) {
-        return (new File(path)).mkdir();
     }
 
     private void createNextLogFile() throws IOException {
@@ -579,7 +537,7 @@
 
     @Override
     public long getReadableSmallestLSN() {
-        List<Long> logFileIds = getLogFileIds();
+        List<Long> logFileIds = getOrderedLogFileIds();
         if (!logFileIds.isEmpty()) {
             return logFileIds.get(0) * logFileSize;
         } else {
@@ -629,6 +587,22 @@
         fileChannel.close();
     }
 
+    private long getNextLogFileId() {
+        return getOnDiskMaxLogFileId() + 1;
+    }
+
+    private long getLogFileFirstLsn(long logFileId) {
+        return logFileId * logFileSize;
+    }
+
+    private long getOnDiskMaxLogFileId() {
+        final List<Long> logFileIds = getOrderedLogFileIds();
+        if (logFileIds.isEmpty()) {
+            return SMALLEST_LOG_FILE_ID;
+        }
+        return logFileIds.get(logFileIds.size() - 1);
+    }
+
     /**
      * This class is used to log FLUSH logs.
      * FLUSH logs are flushed on a different thread to avoid a possible deadlock in {@link LogBuffer} batchUnlock
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/AbstractCheckpointManager.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/AbstractCheckpointManager.java
index 0cbd6c6..e221da8 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/AbstractCheckpointManager.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/AbstractCheckpointManager.java
@@ -60,6 +60,7 @@
     private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
     public static final long SHARP_CHECKPOINT_LSN = -1;
     private static final FilenameFilter filter = (File dir, String name) -> name.startsWith(CHECKPOINT_FILENAME_PREFIX);
+    private static final long FIRST_CHECKPOINT_ID = 0;
     private final File checkpointDir;
     private final int historyToKeep;
     private final int lsnThreshold;
@@ -88,25 +89,118 @@
         lsnThreshold = checkpointProperties.getLsnThreshold();
         pollFrequency = checkpointProperties.getPollFrequency();
         // We must keep at least the latest checkpoint
-        historyToKeep = checkpointProperties.getHistoryToKeep() == 0 ? 1 : checkpointProperties.getHistoryToKeep();
+        historyToKeep = checkpointProperties.getHistoryToKeep() + 1;
         persistedResourceRegistry = txnSubsystem.getApplicationContext().getPersistedResourceRegistry();
     }
 
     @Override
-    public Checkpoint getLatest() throws ACIDException {
-        // Read all checkpointObjects from the existing checkpoint files
+    public Checkpoint getLatest() {
         LOGGER.log(Level.INFO, "Getting latest checkpoint");
+        final List<File> checkpointFiles = getCheckpointFiles();
+        if (checkpointFiles.isEmpty()) {
+            return null;
+        }
+        final List<Checkpoint> orderedCheckpoints = getOrderedCheckpoints(checkpointFiles);
+        if (orderedCheckpoints.isEmpty()) {
+            /*
+             * If all checkpoint files are corrupted, we have no option but to try to perform recovery.
+             * We will forge a checkpoint that forces recovery to start from the beginning of the log.
+             * This shouldn't happen unless a hardware corruption happens.
+             */
+            return forgeForceRecoveryCheckpoint();
+        }
+        return orderedCheckpoints.get(orderedCheckpoints.size() - 1);
+    }
+
+    @Override
+    public void start() {
+        checkpointer = new CheckpointThread(this, txnSubsystem.getLogManager(), lsnThreshold, pollFrequency);
+        checkpointer.start();
+    }
+
+    @Override
+    public void stop(boolean dumpState, OutputStream ouputStream) throws IOException {
+        checkpointer.shutdown();
+        checkpointer.interrupt();
+        try {
+            // Wait until checkpoint thread stops
+            checkpointer.join();
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+        }
+    }
+
+    @Override
+    public void dumpState(OutputStream os) throws IOException {
+        // Nothing to dump
+    }
+
+    public Path getCheckpointPath(long checkpointId) {
+        return Paths.get(checkpointDir.getAbsolutePath() + File.separator + CHECKPOINT_FILENAME_PREFIX
+                + Long.toString(checkpointId));
+    }
+
+    protected void capture(long minMCTFirstLSN, boolean sharp) throws HyracksDataException {
+        ILogManager logMgr = txnSubsystem.getLogManager();
+        ITransactionManager txnMgr = txnSubsystem.getTransactionManager();
+        final long nextCheckpointId = getNextCheckpointId();
+        final Checkpoint checkpointObject = new Checkpoint(nextCheckpointId, logMgr.getAppendLSN(), minMCTFirstLSN,
+                txnMgr.getMaxTxnId(), sharp, StorageConstants.VERSION);
+        persist(checkpointObject);
+        cleanup();
+    }
+
+    private Checkpoint forgeForceRecoveryCheckpoint() {
+        /*
+         * By setting the checkpoint first LSN (low watermark) to Long.MIN_VALUE, the recovery manager will start from
+         * the first available log.
+         * We set the storage version to the current version. If there is a version mismatch, it will be detected
+         * during recovery.
+         */
+        return new Checkpoint(Long.MIN_VALUE, Long.MIN_VALUE, Integer.MIN_VALUE, FIRST_CHECKPOINT_ID, false,
+                StorageConstants.VERSION);
+    }
+
+    private void persist(Checkpoint checkpoint) throws HyracksDataException {
+        // Get checkpoint file path
+        Path path = getCheckpointPath(checkpoint.getId());
+
+        if (LOGGER.isInfoEnabled()) {
+            File file = path.toFile();
+            LOGGER.log(Level.INFO, "Persisting checkpoint file to " + file + " which "
+                    + (file.exists() ? "already exists" : "doesn't exist yet"));
+        }
+        // Write checkpoint file to disk
+        try {
+            byte[] bytes = OBJECT_MAPPER.writeValueAsBytes(checkpoint.toJson(persistedResourceRegistry));
+            Files.write(path, bytes);
+        } catch (IOException e) {
+            LOGGER.log(Level.ERROR, "Failed to write checkpoint to disk", e);
+            throw HyracksDataException.create(e);
+        }
+        if (LOGGER.isInfoEnabled()) {
+            File file = path.toFile();
+            LOGGER.log(Level.INFO, "Completed persisting checkpoint file to " + file + " which now "
+                    + (file.exists() ? "exists" : " still doesn't exist"));
+        }
+    }
+
+    private List<File> getCheckpointFiles() {
         File[] checkpoints = checkpointDir.listFiles(filter);
         if (checkpoints == null || checkpoints.length == 0) {
             if (LOGGER.isInfoEnabled()) {
                 LOGGER.log(Level.INFO,
                         "Listing of files in the checkpoint dir returned " + (checkpoints == null ? "null" : "empty"));
             }
-            return null;
+            return Collections.emptyList();
         }
         if (LOGGER.isInfoEnabled()) {
             LOGGER.log(Level.INFO, "Listing of files in the checkpoint dir returned " + Arrays.toString(checkpoints));
         }
+        return Arrays.asList(checkpoints);
+    }
+
+    private List<Checkpoint> getOrderedCheckpoints(List<File> checkpoints) {
         List<Checkpoint> checkpointObjectList = new ArrayList<>();
         for (File file : checkpoints) {
             try {
@@ -134,106 +228,30 @@
                 }
             }
         }
-        /**
-         * If all checkpoint files are corrupted, we have no option but to try to perform recovery.
-         * We will forge a checkpoint that forces recovery to start from the beginning of the log.
-         * This shouldn't happen unless a hardware corruption happens.
-         */
-        if (checkpointObjectList.isEmpty()) {
-            LOGGER.error("All checkpoint files are corrupted. Forcing recovery from the beginning of the log");
-            checkpointObjectList.add(forgeForceRecoveryCheckpoint());
-        }
-
-        // Sort checkpointObjects in descending order by timeStamp to find out the most recent one.
         Collections.sort(checkpointObjectList);
-
-        // Return the most recent one (the first one in sorted list)
-        return checkpointObjectList.get(0);
-    }
-
-    @Override
-    public void start() {
-        checkpointer = new CheckpointThread(this, txnSubsystem.getLogManager(), lsnThreshold, pollFrequency);
-        checkpointer.start();
-    }
-
-    @Override
-    public void stop(boolean dumpState, OutputStream ouputStream) throws IOException {
-        checkpointer.shutdown();
-        checkpointer.interrupt();
-        try {
-            // Wait until checkpoint thread stops
-            checkpointer.join();
-        } catch (InterruptedException e) {
-            Thread.currentThread().interrupt();
-        }
-    }
-
-    @Override
-    public void dumpState(OutputStream os) throws IOException {
-        // Nothing to dump
-    }
-
-    public Path getCheckpointPath(long checkpointTimestamp) {
-        return Paths.get(checkpointDir.getAbsolutePath() + File.separator + CHECKPOINT_FILENAME_PREFIX
-                + Long.toString(checkpointTimestamp));
-    }
-
-    protected void capture(long minMCTFirstLSN, boolean sharp) throws HyracksDataException {
-        ILogManager logMgr = txnSubsystem.getLogManager();
-        ITransactionManager txnMgr = txnSubsystem.getTransactionManager();
-        Checkpoint checkpointObject = new Checkpoint(logMgr.getAppendLSN(), minMCTFirstLSN, txnMgr.getMaxTxnId(),
-                System.currentTimeMillis(), sharp, StorageConstants.VERSION);
-        persist(checkpointObject);
-        cleanup();
-    }
-
-    protected Checkpoint forgeForceRecoveryCheckpoint() {
-        /**
-         * By setting the checkpoint first LSN (low watermark) to Long.MIN_VALUE, the recovery manager will start from
-         * the first available log.
-         * We set the storage version to the current version. If there is a version mismatch, it will be detected
-         * during recovery.
-         */
-        return new Checkpoint(Long.MIN_VALUE, Long.MIN_VALUE, Integer.MIN_VALUE, System.currentTimeMillis(), false,
-                StorageConstants.VERSION);
-    }
-
-    private void persist(Checkpoint checkpoint) throws HyracksDataException {
-        // Get checkpoint file path
-        Path path = getCheckpointPath(checkpoint.getTimeStamp());
-
-        if (LOGGER.isInfoEnabled()) {
-            File file = path.toFile();
-            LOGGER.log(Level.INFO, "Persisting checkpoint file to " + file + " which "
-                    + (file.exists() ? "already exists" : "doesn't exist yet"));
-        }
-        // Write checkpoint file to disk
-        try {
-            byte[] bytes = OBJECT_MAPPER.writeValueAsBytes(checkpoint.toJson(persistedResourceRegistry));
-            Files.write(path, bytes);
-        } catch (IOException e) {
-            LOGGER.log(Level.ERROR, "Failed to write checkpoint to disk", e);
-            throw HyracksDataException.create(e);
-        }
-        if (LOGGER.isInfoEnabled()) {
-            File file = path.toFile();
-            LOGGER.log(Level.INFO, "Completed persisting checkpoint file to " + file + " which now "
-                    + (file.exists() ? "exists" : " still doesn't exist"));
-        }
+        return checkpointObjectList;
     }
 
     private void cleanup() {
-        File[] checkpointFiles = checkpointDir.listFiles(filter);
-        // Sort the filenames lexicographically to keep the latest checkpoint history files.
-        Arrays.sort(checkpointFiles);
-        for (int i = 0; i < checkpointFiles.length - historyToKeep; i++) {
-            if (LOGGER.isWarnEnabled()) {
-                LOGGER.warn("Deleting checkpoint file at: " + checkpointFiles[i].getAbsolutePath());
-            }
-            if (!checkpointFiles[i].delete() && LOGGER.isWarnEnabled()) {
-                LOGGER.warn("Could not delete checkpoint file at: " + checkpointFiles[i].getAbsolutePath());
+        final List<File> checkpointFiles = getCheckpointFiles();
+        final List<Checkpoint> orderedCheckpoints = getOrderedCheckpoints(checkpointFiles);
+        final int deleteCount = orderedCheckpoints.size() - historyToKeep;
+        for (int i = 0; i < deleteCount; i++) {
+            final Checkpoint checkpoint = orderedCheckpoints.get(i);
+            final Path checkpointPath = getCheckpointPath(checkpoint.getId());
+            LOGGER.warn("Deleting checkpoint file at: {}", checkpointPath);
+            if (!checkpointPath.toFile().delete()) {
+                LOGGER.warn("Could not delete checkpoint file at: {}", checkpointPath);
             }
         }
     }
+
+    private long getNextCheckpointId() {
+        final Checkpoint latest = getLatest();
+        if (latest == null) {
+            return FIRST_CHECKPOINT_ID;
+        }
+        return latest.getId() + 1;
+    }
+
 }
\ No newline at end of file
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/CheckpointManager.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/CheckpointManager.java
index 6efd0e5..ce523db 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/CheckpointManager.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/CheckpointManager.java
@@ -38,7 +38,7 @@
 public class CheckpointManager extends AbstractCheckpointManager {
 
     private static final Logger LOGGER = LogManager.getLogger();
-    private static final long NO_SECURED_LSN = -1l;
+    private static final long NO_SECURED_LSN = -1L;
     private final Map<TxnId, Long> securedLSNs;
 
     public CheckpointManager(ITransactionSubsystem txnSubsystem, CheckpointProperties checkpointProperties) {
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/PartitionManager.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/PartitionManager.java
index 9ac7168..7c8fb34 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/PartitionManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/PartitionManager.java
@@ -107,6 +107,7 @@
     public synchronized void registerPartitionRequest(PartitionId partitionId, NetworkOutputChannel writer) {
         if (failedJobsCache.getIfPresent(partitionId.getJobId()) != null) {
             writer.abort(AbstractChannelWriteInterface.REMOTE_ERROR_CODE);
+            return;
         }
         List<IPartition> pList = availablePartitionMap.get(partitionId);
         if (pList != null && !pList.isEmpty()) {
diff --git a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/AbstractChannelWriteInterface.java b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/AbstractChannelWriteInterface.java
index 0a28e93..5c927f9 100644
--- a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/AbstractChannelWriteInterface.java
+++ b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/AbstractChannelWriteInterface.java
@@ -75,7 +75,7 @@
         if (writableDataPresent) {
             return credits > 0;
         }
-        if (eos && !eosSent) {
+        if (isPendingCloseWrite()) {
             return true;
         }
         return ecode.get() == REMOTE_ERROR_CODE && !ecodeSent;
@@ -116,6 +116,10 @@
         return credits;
     }
 
+    protected boolean isPendingCloseWrite() {
+        return eos && !eosSent && !ecodeSent;
+    }
+
     private class CloseableBufferAcceptor implements ICloseableBufferAcceptor {
         @Override
         public void accept(ByteBuffer buffer) {
diff --git a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/FullFrameChannelWriteInterface.java b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/FullFrameChannelWriteInterface.java
index 628007d..3f4618b 100644
--- a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/FullFrameChannelWriteInterface.java
+++ b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/FullFrameChannelWriteInterface.java
@@ -58,7 +58,7 @@
             ecodeSent = true;
             ccb.reportLocalEOS();
             adjustChannelWritability();
-        } else if (eos && !eosSent) {
+        } else if (isPendingCloseWrite()) {
             writerState.getCommand().setChannelId(channelId);
             writerState.getCommand().setCommandType(MuxDemuxCommand.CommandType.CLOSE_CHANNEL);
             writerState.getCommand().setData(0);
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/api/ITreeIndexFrame.java b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/api/ITreeIndexFrame.java
index 7ac49c6..dc59612 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/api/ITreeIndexFrame.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/api/ITreeIndexFrame.java
@@ -36,7 +36,7 @@
          * Storage version #. Change this if you alter any tree frame formats to stop
          * possible corruption from old versions reading new formats.
          */
-        public static final int VERSION = 6;
+        public static final int VERSION = 7;
         public static final int TUPLE_COUNT_OFFSET = 0;
         public static final int FREE_SPACE_OFFSET = TUPLE_COUNT_OFFSET + 4;
         public static final int LEVEL_OFFSET = FREE_SPACE_OFFSET + 4;
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/IFrameTupleProcessor.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/IFrameTupleProcessor.java
index 3fbe6cd..b6192c1 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/IFrameTupleProcessor.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/IFrameTupleProcessor.java
@@ -43,4 +43,11 @@
      * Called once per batch before ending the batch process
      */
     void finish() throws HyracksDataException;
+
+    /**
+     * Called when a failure is encountered processing a frame
+     *
+     * @param th
+     */
+    void fail(Throwable th);
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndexFileManager.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndexFileManager.java
index 1f481c9..904029b 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndexFileManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndexFileManager.java
@@ -81,12 +81,13 @@
             (dir, name) -> !name.startsWith(".") && name.endsWith(BLOOM_FILTER_SUFFIX);
     protected static final Comparator<String> cmp = new FileNameComparator();
     private static final FilenameFilter dummyFilter = (dir, name) -> true;
-
+    private static final long UNINITALIZED_COMPONENT_SEQ = -1;
     protected final IIOManager ioManager;
     // baseDir should reflect dataset name and partition name and be absolute
     protected final FileReference baseDir;
     protected final Comparator<IndexComponentFileReference> recencyCmp = new RecencyComparator();
     protected final TreeIndexFactory<? extends ITreeIndex> treeFactory;
+    private long lastUsedComponentSeq = UNINITALIZED_COMPONENT_SEQ;
 
     public AbstractLSMIndexFileManager(IIOManager ioManager, FileReference file,
             TreeIndexFactory<? extends ITreeIndex> treeFactory) {
@@ -355,11 +356,18 @@
     }
 
     protected String getNextComponentSequence(FilenameFilter filenameFilter) throws HyracksDataException {
+        if (lastUsedComponentSeq == UNINITALIZED_COMPONENT_SEQ) {
+            lastUsedComponentSeq = getOnDiskLastUsedComponentSequence(filenameFilter);
+        }
+        return IndexComponentFileReference.getFlushSequence(++lastUsedComponentSeq);
+    }
+
+    private long getOnDiskLastUsedComponentSequence(FilenameFilter filenameFilter) throws HyracksDataException {
         long maxComponentSeq = -1;
         final String[] files = listDirFiles(baseDir, filenameFilter);
         for (String fileName : files) {
             maxComponentSeq = Math.max(maxComponentSeq, IndexComponentFileReference.of(fileName).getSequenceEnd());
         }
-        return IndexComponentFileReference.getFlushSequence(maxComponentSeq + 1);
+        return maxComponentSeq;
     }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java
index aa7be86..e9f6f20 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java
@@ -700,6 +700,9 @@
             try {
                 processFrame(accessor, tuple, processor);
                 frameOpCallback.frameCompleted();
+            } catch (Throwable th) {
+                processor.fail(th);
+                throw th;
             } finally {
                 processor.finish();
             }
diff --git a/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/ExitUtil.java b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/ExitUtil.java
index f9d9b1b..e2ae73a 100644
--- a/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/ExitUtil.java
+++ b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/ExitUtil.java
@@ -161,9 +161,12 @@
                 Field hooksField = clazz.getDeclaredField("hooks");
                 hooksField.setAccessible(true);
                 IdentityHashMap hooks = (IdentityHashMap) hooksField.get(null);
-                LOGGER.info("the following ({}) shutdown hooks have been registered: {}", hooks::size, hooks::toString);
+                if (hooks != null) {
+                    LOGGER.info("the following ({}) shutdown hooks have been registered: {}", hooks::size,
+                            hooks::toString);
+                }
             } catch (Exception e) {
-                LOGGER.warn("ignoring exception trying to determine number of shutdown hooks", e);
+                LOGGER.debug("ignoring exception trying to log shutdown hooks", e);
             }
         }
     }