Make job recovery memory configurable
- Make memory allocated per job during recovery configurable.
- Exclude bloom filter file from LSMBTree with no bloom filter.
Change-Id: Ief359eae2352408c0cbfd37b0e3a1e758c78e0dd
Reviewed-on: https://asterix-gerrit.ics.uci.edu/852
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Young-Seok Kim <kisskys@gmail.com>
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixTransactionProperties.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixTransactionProperties.java
index abc03e2..c4a5e8e 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixTransactionProperties.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixTransactionProperties.java
@@ -20,6 +20,9 @@
import java.util.Map;
+import org.apache.hyracks.util.StorageUtil;
+import org.apache.hyracks.util.StorageUtil.StorageUnit;
+
public class AsterixTransactionProperties extends AbstractAsterixProperties {
private static final String TXN_LOG_BUFFER_NUMPAGES_KEY = "txn.log.buffer.numpages";
@@ -55,6 +58,10 @@
private static final String TXN_COMMIT_PROFILER_REPORT_INTERVAL_KEY = "txn.commitprofiler.reportinterval";
private static final int TXN_COMMIT_PROFILER_REPORT_INTERVAL_DEFAULT = 5; // 5 seconds
+ private static final String TXN_JOB_RECOVERY_MEMORY_SIZE_KEY = "txn.job.recovery.memorysize";
+ private static final long TXN_JOB_RECOVERY_MEMORY_SIZE_DEFAULT = StorageUtil.getSizeInBytes(64L,
+ StorageUnit.MEGABYTE);
+
public AsterixTransactionProperties(AsterixPropertiesAccessor accessor) {
super(accessor);
}
@@ -122,4 +129,8 @@
TXN_COMMIT_PROFILER_REPORT_INTERVAL_DEFAULT, PropertyInterpreters.getIntegerPropertyInterpreter());
}
+ public long getJobRecoveryMemorySize() {
+ return accessor.getProperty(TXN_JOB_RECOVERY_MEMORY_SIZE_KEY, TXN_JOB_RECOVERY_MEMORY_SIZE_DEFAULT,
+ PropertyInterpreters.getLongPropertyInterpreter());
+ }
}
diff --git a/asterixdb/asterix-installer/src/main/resources/conf/asterix-configuration.xml b/asterixdb/asterix-installer/src/main/resources/conf/asterix-configuration.xml
index 17a4b3a..f6da341 100644
--- a/asterixdb/asterix-installer/src/main/resources/conf/asterix-configuration.xml
+++ b/asterixdb/asterix-installer/src/main/resources/conf/asterix-configuration.xml
@@ -196,6 +196,14 @@
</property>
<property>
+ <name>txn.job.recovery.memorysize</name>
+ <value>67108864</value>
+ <description>The memory allocated per job during recovery.
+ (Default = "67108864" // 64MB)
+ </description>
+ </property>
+
+ <property>
<name>compiler.sortmemory</name>
<value>33554432</value>
<description>The amount of memory in bytes given to sort operations.
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/RecoveryManager.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/RecoveryManager.java
index 0ec6b10..3e5c6cf 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/RecoveryManager.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/RecoveryManager.java
@@ -93,9 +93,8 @@
private final boolean replicationEnabled;
public static final long NON_SHARP_CHECKPOINT_TARGET_LSN = -1;
private final static String RECOVERY_FILES_DIR_NAME = "recovery_temp";
- private static final long MEGABYTE = 1024L * 1024L;
private Map<Integer, JobEntityCommits> jobId2WinnerEntitiesMap = null;
- private static final long MAX_CACHED_ENTITY_COMMITS_PER_JOB_SIZE = 4 * MEGABYTE; //4MB;
+ private final long cachedEntityCommitsPerJobSize;
private final PersistentLocalResourceRepository localResourceRepository;
/**
@@ -114,6 +113,7 @@
replicationEnabled = propertiesProvider.getReplicationProperties().isReplicationEnabled();
localResourceRepository = (PersistentLocalResourceRepository) txnSubsystem.getAsterixAppRuntimeContextProvider()
.getLocalResourceRepository();
+ cachedEntityCommitsPerJobSize = txnSubsystem.getTransactionProperties().getJobRecoveryMemorySize();
}
/**
@@ -422,8 +422,8 @@
}
}
- private static boolean needToFreeMemory() {
- return Runtime.getRuntime().freeMemory() < MAX_CACHED_ENTITY_COMMITS_PER_JOB_SIZE;
+ private boolean needToFreeMemory() {
+ return Runtime.getRuntime().freeMemory() < cachedEntityCommitsPerJobSize;
}
@Override
@@ -902,7 +902,7 @@
partitionMaxLSN = logRecord.getLSN();
currentPartitionSize += winnerEntity.getCurrentSize();
//if the memory budget for the current partition exceeded the limit, spill it to disk and free memory
- if (currentPartitionSize >= MAX_CACHED_ENTITY_COMMITS_PER_JOB_SIZE) {
+ if (currentPartitionSize >= cachedEntityCommitsPerJobSize) {
spillToDiskAndfreeMemory();
}
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTree.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTree.java
index 969828e..9e75360 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTree.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTree.java
@@ -581,7 +581,7 @@
protected LSMBTreeDiskComponent createDiskComponent(LSMBTreeDiskComponentFactory factory,
FileReference btreeFileRef, FileReference bloomFilterFileRef, boolean createComponent)
- throws HyracksDataException, IndexException {
+ throws HyracksDataException, IndexException {
// Create new BTree instance.
LSMBTreeDiskComponent component = (LSMBTreeDiskComponent) factory
.createLSMComponentInstance(new LSMComponentFileReferences(btreeFileRef, null, bloomFilterFileRef));
@@ -878,7 +878,9 @@
Set<String> files = new HashSet<String>();
LSMBTreeDiskComponent component = (LSMBTreeDiskComponent) lsmComponent;
files.add(component.getBTree().getFileReference().getFile().getAbsolutePath());
- files.add(component.getBloomFilter().getFileReference().getFile().getAbsolutePath());
+ if (hasBloomFilter) {
+ files.add(component.getBloomFilter().getFileReference().getFile().getAbsolutePath());
+ }
return files;
}
diff --git a/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/StorageUtil.java b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/StorageUtil.java
new file mode 100644
index 0000000..4607214
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/StorageUtil.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.util;
+
+public class StorageUtil {
+
+ private static final int BASE = 1024, KB = BASE, MB = KB * BASE, GB = MB * BASE, TB = GB * BASE, PB = TB * BASE;
+
+ public enum StorageUnit {
+ BYTE,
+ KILOBYTE,
+ MEGABYTE,
+ GIGABYTE,
+ TERABYTE,
+ PETABYTE
+ }
+
+ private StorageUtil() {
+ throw new AssertionError("This util class should not be initialized.");
+ }
+
+ public static int getSizeInBytes(final int size, final StorageUnit unit) {
+ switch (unit) {
+ case BYTE:
+ return size;
+ case KILOBYTE:
+ return size * KB;
+ case MEGABYTE:
+ return size * MB;
+ case GIGABYTE:
+ return size * GB;
+ case TERABYTE:
+ return size * TB;
+ default:
+ throw new IllegalStateException("Unsupported unti: " + unit);
+ }
+ }
+
+ public static long getSizeInBytes(final long size, final StorageUnit unit) {
+ switch (unit) {
+ case BYTE:
+ return size;
+ case KILOBYTE:
+ return size * KB;
+ case MEGABYTE:
+ return size * MB;
+ case GIGABYTE:
+ return size * GB;
+ case TERABYTE:
+ return size * TB;
+ case PETABYTE:
+ return size * PB;
+ default:
+ throw new IllegalStateException("Unsupported unti: " + unit);
+ }
+ }
+
+ /**
+ * Returns a human readable value in storage units rounded up to two decimal places.
+ * e.g. toHumanReadableSize(1024L * 1024L * 1024l * 10L *) + 1024l * 1024l * 59) returns 1.06 GB
+ *
+ * @param bytes
+ * @return Value in storage units.
+ */
+ public static String toHumanReadableSize(final long bytes) {
+ if (bytes < BASE) {
+ return bytes + " B";
+ }
+ final int baseValue = (63 - Long.numberOfLeadingZeros(bytes)) / 10;
+ return String.format("%.2f %sB", (double) bytes / (1L << (baseValue * 10)), " kMGTPE".charAt(baseValue));
+ }
+}