Fix for issue 923

Change-Id: I87053315fc7650682fcbedd573b1155c17810073
Reviewed-on: https://asterix-gerrit.ics.uci.edu/347
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Young-Seok Kim <kisskys@gmail.com>
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogManager.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogManager.java
index 90a812b..2146eff 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogManager.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogManager.java
@@ -65,6 +65,7 @@
     private LogPage appendPage;
     private LogFlusher logFlusher;
     private Future<Object> futureLogFlusher;
+    private static final long SMALLEST_LOG_FILE_ID = 0;
 
     public LogManager(TransactionSubsystem txnSubsystem) throws ACIDException {
         this.txnSubsystem = txnSubsystem;
@@ -77,7 +78,7 @@
         logFilePrefix = logManagerProperties.getLogFilePrefix();
         flushLSN = new MutableLong();
         appendLSN = new AtomicLong();
-        initializeLogManager(0);
+        initializeLogManager(SMALLEST_LOG_FILE_ID);
     }
 
     private void initializeLogManager(long nextLogFileId) {
@@ -96,7 +97,7 @@
         logFlusher = new LogFlusher(this, emptyQ, flushQ);
         futureLogFlusher = txnSubsystem.getAsterixAppRuntimeContextProvider().getThreadExecutor().submit(logFlusher);
     }
-    
+
     @Override
     public void log(ILogRecord logRecord) throws ACIDException {
         if (logRecord.getLogSize() > logPageSize) {
@@ -104,7 +105,7 @@
         }
 
         syncLog(logRecord);
-        
+
         if ((logRecord.getLogType() == LogType.JOB_COMMIT || logRecord.getLogType() == LogType.ABORT)
                 && !logRecord.isFlushed()) {
             synchronized (logRecord) {
@@ -121,12 +122,12 @@
 
     private synchronized void syncLog(ILogRecord logRecord) throws ACIDException {
         ITransactionContext txnCtx = null;
-        
-        if(logRecord.getLogType() != LogType.FLUSH)
-        {       
+
+        if (logRecord.getLogType() != LogType.FLUSH) {
             txnCtx = logRecord.getTxnCtx();
             if (txnCtx.getTxnState() == ITransactionManager.ABORTED && logRecord.getLogType() != LogType.ABORT) {
-                throw new ACIDException("Aborted job(" + txnCtx.getJobId() + ") tried to write non-abort type log record.");
+                throw new ACIDException("Aborted job(" + txnCtx.getJobId()
+                        + ") tried to write non-abort type log record.");
             }
 
         }
@@ -142,9 +143,8 @@
             logRecord.setPrevLSN(txnCtx.getLastLSN());
         }
         appendPage.append(logRecord, appendLSN.get());
-        
-        if(logRecord.getLogType() == LogType.FLUSH)
-        {
+
+        if (logRecord.getLogType() == LogType.FLUSH) {
             logRecord.setLSN(appendLSN.get());
         }
         appendLSN.addAndGet(logRecord.getLogSize());
@@ -292,22 +292,22 @@
         initializeLogManager(lastMaxLogFileId + 1);
     }
 
-    public void deleteOldLogFiles(long checkpointLSN){
+    public void deleteOldLogFiles(long checkpointLSN) {
 
         Long checkpointLSNLogFileID = getLogFileId(checkpointLSN);
         List<Long> logFileIds = getLogFileIds();
-        for (Long id : logFileIds) {
-
-            if(id < checkpointLSNLogFileID)
-            {
-                File file = new File(getLogFilePath(id));
-                if (!file.delete()) {
-                    throw new IllegalStateException("Failed to delete a file: " + file.getAbsolutePath());
+        if (logFileIds != null) {
+            for (Long id : logFileIds) {
+                if (id < checkpointLSNLogFileID) {
+                    File file = new File(getLogFilePath(id));
+                    if (!file.delete()) {
+                        throw new IllegalStateException("Failed to delete a file: " + file.getAbsolutePath());
+                    }
                 }
             }
         }
     }
-    
+
     private void terminateLogFlusher() {
         if (LOGGER.isLoggable(Level.INFO)) {
             LOGGER.info("Terminating LogFlusher thread ...");
@@ -336,13 +336,17 @@
             }
         }
         List<Long> logFileIds = getLogFileIds();
-        for (Long id : logFileIds) {
-            File file = new File(getLogFilePath(id));
-            if (!file.delete()) {
-                throw new IllegalStateException("Failed to delete a file: " + file.getAbsolutePath());
+        if (logFileIds != null) {
+            for (Long id : logFileIds) {
+                File file = new File(getLogFilePath(id));
+                if (!file.delete()) {
+                    throw new IllegalStateException("Failed to delete a file: " + file.getAbsolutePath());
+                }
             }
+            return logFileIds.get(logFileIds.size() - 1);
+        } else {
+            throw new IllegalStateException("Couldn't find any log files.");
         }
-        return logFileIds.get(logFileIds.size() - 1);
     }
 
     private List<Long> getLogFileIds() {
@@ -425,7 +429,11 @@
 
     public long getReadableSmallestLSN() {
         List<Long> logFileIds = getLogFileIds();
-        return logFileIds.get(0) * logFileSize;
+        if (logFileIds != null) {
+            return logFileIds.get(0) * logFileSize;
+        }else{
+            throw new IllegalStateException("Couldn't find any log files.");
+        }
     }
 }