Make job recovery memory configurable

- Make memory allocated per job during recovery configurable.
- Exclude bloom filter file from LSMBTree with no bloom filter.

Change-Id: Ief359eae2352408c0cbfd37b0e3a1e758c78e0dd
Reviewed-on: https://asterix-gerrit.ics.uci.edu/852
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Young-Seok Kim <kisskys@gmail.com>
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixTransactionProperties.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixTransactionProperties.java
index abc03e2..c4a5e8e 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixTransactionProperties.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixTransactionProperties.java
@@ -20,6 +20,9 @@
 
 import java.util.Map;
 
+import org.apache.hyracks.util.StorageUtil;
+import org.apache.hyracks.util.StorageUtil.StorageUnit;
+
 public class AsterixTransactionProperties extends AbstractAsterixProperties {
 
     private static final String TXN_LOG_BUFFER_NUMPAGES_KEY = "txn.log.buffer.numpages";
@@ -55,6 +58,10 @@
     private static final String TXN_COMMIT_PROFILER_REPORT_INTERVAL_KEY = "txn.commitprofiler.reportinterval";
     private static final int TXN_COMMIT_PROFILER_REPORT_INTERVAL_DEFAULT = 5; // 5 seconds
 
+    private static final String TXN_JOB_RECOVERY_MEMORY_SIZE_KEY = "txn.job.recovery.memorysize";
+    private static final long TXN_JOB_RECOVERY_MEMORY_SIZE_DEFAULT = StorageUtil.getSizeInBytes(64L,
+            StorageUnit.MEGABYTE);
+
     public AsterixTransactionProperties(AsterixPropertiesAccessor accessor) {
         super(accessor);
     }
@@ -122,4 +129,8 @@
                 TXN_COMMIT_PROFILER_REPORT_INTERVAL_DEFAULT, PropertyInterpreters.getIntegerPropertyInterpreter());
     }
 
+    public long getJobRecoveryMemorySize() {
+        return accessor.getProperty(TXN_JOB_RECOVERY_MEMORY_SIZE_KEY, TXN_JOB_RECOVERY_MEMORY_SIZE_DEFAULT,
+                PropertyInterpreters.getLongPropertyInterpreter());
+    }
 }
diff --git a/asterixdb/asterix-installer/src/main/resources/conf/asterix-configuration.xml b/asterixdb/asterix-installer/src/main/resources/conf/asterix-configuration.xml
index 17a4b3a..f6da341 100644
--- a/asterixdb/asterix-installer/src/main/resources/conf/asterix-configuration.xml
+++ b/asterixdb/asterix-installer/src/main/resources/conf/asterix-configuration.xml
@@ -196,6 +196,14 @@
   </property>
 
   <property>
+    <name>txn.job.recovery.memorysize</name>
+    <value>67108864</value>
+    <description>The memory allocated per job during recovery.
+     (Default = "67108864" // 64MB)
+    </description>
+  </property>
+
+  <property>
     <name>compiler.sortmemory</name>
     <value>33554432</value>
     <description>The amount of memory in bytes given to sort operations.
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/RecoveryManager.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/RecoveryManager.java
index 0ec6b10..3e5c6cf 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/RecoveryManager.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/RecoveryManager.java
@@ -93,9 +93,8 @@
     private final boolean replicationEnabled;
     public static final long NON_SHARP_CHECKPOINT_TARGET_LSN = -1;
     private final static String RECOVERY_FILES_DIR_NAME = "recovery_temp";
-    private static final long MEGABYTE = 1024L * 1024L;
     private Map<Integer, JobEntityCommits> jobId2WinnerEntitiesMap = null;
-    private static final long MAX_CACHED_ENTITY_COMMITS_PER_JOB_SIZE = 4 * MEGABYTE; //4MB;
+    private final long cachedEntityCommitsPerJobSize;
     private final PersistentLocalResourceRepository localResourceRepository;
 
     /**
@@ -114,6 +113,7 @@
         replicationEnabled = propertiesProvider.getReplicationProperties().isReplicationEnabled();
         localResourceRepository = (PersistentLocalResourceRepository) txnSubsystem.getAsterixAppRuntimeContextProvider()
                 .getLocalResourceRepository();
+        cachedEntityCommitsPerJobSize = txnSubsystem.getTransactionProperties().getJobRecoveryMemorySize();
     }
 
     /**
@@ -422,8 +422,8 @@
         }
     }
 
-    private static boolean needToFreeMemory() {
-        return Runtime.getRuntime().freeMemory() < MAX_CACHED_ENTITY_COMMITS_PER_JOB_SIZE;
+    private boolean needToFreeMemory() {
+        return Runtime.getRuntime().freeMemory() < cachedEntityCommitsPerJobSize;
     }
 
     @Override
@@ -902,7 +902,7 @@
             partitionMaxLSN = logRecord.getLSN();
             currentPartitionSize += winnerEntity.getCurrentSize();
             //if the memory budget for the current partition exceeded the limit, spill it to disk and free memory
-            if (currentPartitionSize >= MAX_CACHED_ENTITY_COMMITS_PER_JOB_SIZE) {
+            if (currentPartitionSize >= cachedEntityCommitsPerJobSize) {
                 spillToDiskAndfreeMemory();
             }
         }
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTree.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTree.java
index 969828e..9e75360 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTree.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTree.java
@@ -581,7 +581,7 @@
 
     protected LSMBTreeDiskComponent createDiskComponent(LSMBTreeDiskComponentFactory factory,
             FileReference btreeFileRef, FileReference bloomFilterFileRef, boolean createComponent)
-            throws HyracksDataException, IndexException {
+                    throws HyracksDataException, IndexException {
         // Create new BTree instance.
         LSMBTreeDiskComponent component = (LSMBTreeDiskComponent) factory
                 .createLSMComponentInstance(new LSMComponentFileReferences(btreeFileRef, null, bloomFilterFileRef));
@@ -878,7 +878,9 @@
         Set<String> files = new HashSet<String>();
         LSMBTreeDiskComponent component = (LSMBTreeDiskComponent) lsmComponent;
         files.add(component.getBTree().getFileReference().getFile().getAbsolutePath());
-        files.add(component.getBloomFilter().getFileReference().getFile().getAbsolutePath());
+        if (hasBloomFilter) {
+            files.add(component.getBloomFilter().getFileReference().getFile().getAbsolutePath());
+        }
         return files;
     }
 
diff --git a/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/StorageUtil.java b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/StorageUtil.java
new file mode 100644
index 0000000..4607214
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/StorageUtil.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.util;
+
+public class StorageUtil {
+
+    private static final int BASE = 1024, KB = BASE, MB = KB * BASE, GB = MB * BASE, TB = GB * BASE, PB = TB * BASE;
+
+    public enum StorageUnit {
+        BYTE,
+        KILOBYTE,
+        MEGABYTE,
+        GIGABYTE,
+        TERABYTE,
+        PETABYTE
+    }
+
+    private StorageUtil() {
+        throw new AssertionError("This util class should not be initialized.");
+    }
+
+    public static int getSizeInBytes(final int size, final StorageUnit unit) {
+        switch (unit) {
+            case BYTE:
+                return size;
+            case KILOBYTE:
+                return size * KB;
+            case MEGABYTE:
+                return size * MB;
+            case GIGABYTE:
+                return size * GB;
+            case TERABYTE:
+                return size * TB;
+            default:
+                throw new IllegalStateException("Unsupported unti: " + unit);
+        }
+    }
+
+    public static long getSizeInBytes(final long size, final StorageUnit unit) {
+        switch (unit) {
+            case BYTE:
+                return size;
+            case KILOBYTE:
+                return size * KB;
+            case MEGABYTE:
+                return size * MB;
+            case GIGABYTE:
+                return size * GB;
+            case TERABYTE:
+                return size * TB;
+            case PETABYTE:
+                return size * PB;
+            default:
+                throw new IllegalStateException("Unsupported unti: " + unit);
+        }
+    }
+
+    /**
+     * Returns a human readable value in storage units rounded up to two decimal places.
+     * e.g. toHumanReadableSize(1024L * 1024L * 1024l * 10L *) + 1024l * 1024l * 59) returns 1.06 GB
+     *
+     * @param bytes
+     * @return Value in storage units.
+     */
+    public static String toHumanReadableSize(final long bytes) {
+        if (bytes < BASE) {
+            return bytes + " B";
+        }
+        final int baseValue = (63 - Long.numberOfLeadingZeros(bytes)) / 10;
+        return String.format("%.2f %sB", (double) bytes / (1L << (baseValue * 10)), " kMGTPE".charAt(baseValue));
+    }
+}