[ASTERIXDB-1995][STO] Abort write txn when index cannot be flushed
- user model changes: no
- storage format changes: no
- interface changes: no
Details:
- Fix LSM memory component state transition on flush/merge failure
- When index cannot be flushed, abort waiting threads
- Prevent NPE in MateralizerTaskState when file creation fails
- Check parent dirs creation for index metadata file
Change-Id: I28592c30c788f4a6f44db8b47a84bc77f6b3f8f3
Reviewed-on: https://asterix-gerrit.ics.uci.edu/1896
Sonar-Qube: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
BAD: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: abdullah alamoudi <bamousaa@gmail.com>
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java
index 7dbade2..e6fbc6f 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java
@@ -121,6 +121,7 @@
public static final int FOUND_MULTIPLE_TRANSACTIONS = 85;
public static final int UNRECOGNIZED_INDEX_COMPONENT_FILE = 86;
public static final int UNEQUAL_NUM_FILTERS_TREES = 87;
+ public static final int CANNOT_MODIFY_INDEX_DISK_IS_FULL = 88;
// Compilation error codes.
public static final int RULECOLLECTION_NOT_INSTANCE_OF_LIST = 10000;
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties b/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties
index cd38917..d2e05e3 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties
@@ -104,5 +104,6 @@
85 = Found more than one transaction file in %1$s
86 = Found an unrecognized index file %1$s
87 = Unequal number of trees and filters found in %1$s
+88 = Cannot modify index (Disk is full)
10000 = The given rule collection %1$s is not an instance of the List class.
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/MaterializerTaskState.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/MaterializerTaskState.java
index 918155d..31cbaad 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/MaterializerTaskState.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/MaterializerTaskState.java
@@ -66,7 +66,9 @@
}
public void close() throws HyracksDataException {
- out.close();
+ if (out != null) {
+ out.close();
+ }
}
public void appendFrame(ByteBuffer buffer) throws HyracksDataException {
@@ -74,20 +76,25 @@
}
public void writeOut(IFrameWriter writer, IFrame frame, boolean failed) throws HyracksDataException {
- RunFileReader in = out.createReader();
+ RunFileReader in = null;
+ if (out != null) {
+ in = out.createReader();
+ }
writer.open();
try {
if (failed) {
writer.fail();
return;
}
- in.open();
- try {
- while (in.nextFrame(frame)) {
- writer.nextFrame(frame.getBuffer());
+ if (in != null) {
+ in.open();
+ try {
+ while (in.nextFrame(frame)) {
+ writer.nextFrame(frame.getBuffer());
+ }
+ } finally {
+ in.close();
}
- } finally {
- in.close();
}
} catch (Exception e) {
writer.fail();
@@ -96,10 +103,10 @@
try {
writer.close();
} finally {
- if (numConsumers.decrementAndGet() == 0) {
+ if (numConsumers.decrementAndGet() == 0 && out != null) {
out.getFileReference().delete();
}
}
}
}
-}
+}
\ No newline at end of file
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMMemoryComponent.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMMemoryComponent.java
index 7cbe35f..1ee68d9 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMMemoryComponent.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMMemoryComponent.java
@@ -144,6 +144,11 @@
throw new IllegalStateException("Flush sees an illegal LSM memory compoenent state: " + state);
}
readerCount--;
+ if (failedOperation) {
+ // if flush failed, return the component state to READABLE_UNWRITABLE
+ state = ComponentState.READABLE_UNWRITABLE;
+ return;
+ }
if (readerCount == 0) {
state = ComponentState.INACTIVE;
} else {
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java
index 50eac67..8ff907a 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java
@@ -47,6 +47,7 @@
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexOperationContext;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMMemoryComponent;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMMergePolicy;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMOperationTracker;
import org.apache.hyracks.storage.am.lsm.common.api.LSMOperationType;
@@ -131,6 +132,10 @@
// Flush and merge operations should never reach this wait call, because they are always try operations.
// If they fail to enter the components, then it means that there are an ongoing flush/merge operation on
// the same components, so they should not proceed.
+ if (opType == LSMOperationType.MODIFICATION) {
+ // before waiting, make sure the index is in a modifiable state to avoid waiting forever.
+ ensureIndexModifiable();
+ }
opTracker.wait();
} catch (InterruptedException e) {
throw new HyracksDataException(e);
@@ -186,6 +191,7 @@
break;
case MERGE:
lsmIndex.getIOOperationCallback().beforeOperation(LSMOperationType.MERGE);
+ break;
default:
break;
}
@@ -498,15 +504,17 @@
}
ILSMDiskComponent newComponent = null;
+ boolean failedOperation = false;
try {
newComponent = lsmIndex.flush(operation);
operation.getCallback().afterOperation(LSMOperationType.FLUSH, null, newComponent);
lsmIndex.markAsValid(newComponent);
} catch (Throwable e) {
+ failedOperation = true;
e.printStackTrace();
throw e;
} finally {
- exitComponents(ctx, LSMOperationType.FLUSH, newComponent, false);
+ exitComponents(ctx, LSMOperationType.FLUSH, newComponent, failedOperation);
operation.getCallback().afterFinalize(LSMOperationType.FLUSH, newComponent);
}
if (LOGGER.isLoggable(Level.INFO)) {
@@ -545,15 +553,17 @@
}
ILSMDiskComponent newComponent = null;
+ boolean failedOperation = false;
try {
newComponent = lsmIndex.merge(operation);
operation.getCallback().afterOperation(LSMOperationType.MERGE, ctx.getComponentHolder(), newComponent);
lsmIndex.markAsValid(newComponent);
} catch (Throwable e) {
+ failedOperation = true;
e.printStackTrace();
throw e;
} finally {
- exitComponents(ctx, LSMOperationType.MERGE, newComponent, false);
+ exitComponents(ctx, LSMOperationType.MERGE, newComponent, failedOperation);
operation.getCallback().afterFinalize(LSMOperationType.MERGE, newComponent);
}
if (LOGGER.isLoggable(Level.INFO)) {
@@ -660,4 +670,23 @@
exit(ctx);
}
}
+
+ /***
+ * Ensures the index is in a modifiable state
+ * @throws HyracksDataException if the index is not in a modifiable state
+ */
+ private void ensureIndexModifiable() throws HyracksDataException {
+ // find if there is any memory component which is in a writable state or eventually will be in a writable state
+ for (ILSMMemoryComponent memoryComponent : lsmIndex.getMemoryComponents()) {
+ switch (memoryComponent.getState()) {
+ case INACTIVE:
+ case READABLE_WRITABLE:
+ case READABLE_UNWRITABLE_FLUSHING:
+ return;
+ default:
+ // continue to the next component
+ }
+ }
+ throw HyracksDataException.create(ErrorCode.CANNOT_MODIFY_INDEX_DISK_IS_FULL);
+ }
}
diff --git a/hyracks-fullstack/hyracks/hyracks-util/pom.xml b/hyracks-fullstack/hyracks/hyracks-util/pom.xml
index 5a68df0..3b03fce 100644
--- a/hyracks-fullstack/hyracks/hyracks-util/pom.xml
+++ b/hyracks-fullstack/hyracks/hyracks-util/pom.xml
@@ -64,6 +64,10 @@
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.apache.commons</groupId>
+ <artifactId>commons-lang3</artifactId>
+ </dependency>
</dependencies>
</project>
diff --git a/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/DiskUtil.java b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/DiskUtil.java
new file mode 100644
index 0000000..9a65d72
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/DiskUtil.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.util;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.commons.lang3.SystemUtils;
+
+public class DiskUtil {
+
+ private static final Logger LOGGER = Logger.getLogger(DiskUtil.class.getName());
+
+ private DiskUtil() {
+ throw new AssertionError("Util class should not be initialized.");
+ }
+
+ /**
+ * Mounts a RAM disk
+ *
+ * @param name
+ * @param size
+ * @param unit
+ * @return The root of the mounted disk
+ * @throws IOException
+ * @throws InterruptedException
+ */
+ public static Path mountRamDisk(String name, int size, StorageUtil.StorageUnit unit)
+ throws IOException, InterruptedException {
+ if (SystemUtils.IS_OS_MAC) {
+ return mountMacRamDisk(name, (StorageUtil.getIntSizeInBytes(size, unit) * 2) / StorageUtil.BASE);
+ } else if (SystemUtils.IS_OS_LINUX) {
+ return mountLinuxRamDisk(name, size + unit.getLinuxUnitTypeInLetter());
+ }
+ throw new UnsupportedOperationException("Unsupported OS: " + System.getProperty("os.name"));
+ }
+
+ /**
+ * Unmounts a disk
+ *
+ * @param name
+ * @throws IOException
+ * @throws InterruptedException
+ */
+ public static void unmountRamDisk(String name) throws IOException, InterruptedException {
+ if (SystemUtils.IS_OS_MAC) {
+ unmountMacRamDisk(name);
+ } else if (SystemUtils.IS_OS_LINUX) {
+ unmountLinuxRamDisk(name);
+ }
+ }
+
+ private static Path mountMacRamDisk(String name, long size) throws IOException, InterruptedException {
+ final String cmd = "diskutil erasevolume HFS+ '" + name + "' `hdiutil attach -nomount ram://" + size + "`";
+ final ProcessBuilder pb = new ProcessBuilder("/bin/sh", "-c", cmd);
+ final Process p = pb.start();
+ watchProcess(p);
+ p.waitFor();
+ return Paths.get("/Volumes", name);
+ }
+
+ private static void unmountMacRamDisk(String name) throws InterruptedException, IOException {
+ final String cmd = "diskutil unmount " + name;
+ final ProcessBuilder pb = new ProcessBuilder("/bin/sh", "-c", cmd);
+ final Process p = pb.start();
+ watchProcess(p);
+ p.waitFor();
+ }
+
+ private static Path mountLinuxRamDisk(String name, String size) throws IOException, InterruptedException {
+ Path root = Paths.get("/tmp", name);
+ if (!Files.exists(root)) {
+ Files.createFile(root);
+ }
+ final String cmd = "mount -o size=" + size + " -t tmpfs none /tmp/" + name;
+ final ProcessBuilder pb = new ProcessBuilder("bash", "-c", cmd);
+ final Process p = pb.start();
+ watchProcess(p);
+ p.waitFor();
+ return root;
+ }
+
+ private static void unmountLinuxRamDisk(String name) throws InterruptedException, IOException {
+ final String cmd = "umount /tmp/" + name;
+ final ProcessBuilder pb = new ProcessBuilder("bash", "-c", cmd);
+ final Process p = pb.start();
+ watchProcess(p);
+ p.waitFor();
+ }
+
+ private static void watchProcess(Process p) {
+ new Thread(() -> {
+ final BufferedReader input = new BufferedReader(new InputStreamReader(p.getInputStream()));
+ String line;
+ try {
+ while ((line = input.readLine()) != null) {
+ LOGGER.info(line);
+ }
+ } catch (IOException e) {
+ LOGGER.log(Level.WARNING, e.getMessage(), e);
+ }
+ }).start();
+ }
+}
\ No newline at end of file
diff --git a/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/StorageUtil.java b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/StorageUtil.java
index 9001e1b..dbfe6f9 100644
--- a/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/StorageUtil.java
+++ b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/StorageUtil.java
@@ -23,17 +23,18 @@
public class StorageUtil {
- private static final int BASE = 1024;
+ public static final int BASE = 1024;
public enum StorageUnit {
- BYTE("B", 1),
- KILOBYTE("KB", BASE),
- MEGABYTE("MB", KILOBYTE.multiplier * BASE),
- GIGABYTE("GB", MEGABYTE.multiplier * BASE),
- TERABYTE("TB", GIGABYTE.multiplier * BASE),
- PETABYTE("PB", TERABYTE.multiplier * BASE);
+ BYTE("B", "b", 1),
+ KILOBYTE("KB", "kb", BASE),
+ MEGABYTE("MB", "m", KILOBYTE.multiplier * BASE),
+ GIGABYTE("GB", "g", MEGABYTE.multiplier * BASE),
+ TERABYTE("TB", "t", GIGABYTE.multiplier * BASE),
+ PETABYTE("PB", "p", TERABYTE.multiplier * BASE);
private final String unitTypeInLetter;
+ private final String linuxUnitTypeInLetter;
private final long multiplier;
private static final Map<String, StorageUnit> SUFFIX_TO_UNIT_MAP = new HashMap<>();
@@ -43,8 +44,9 @@
}
}
- StorageUnit(String unitTypeInLetter, long multiplier) {
+ StorageUnit(String unitTypeInLetter, String linuxUnitTypeInLetter, long multiplier) {
this.unitTypeInLetter = unitTypeInLetter;
+ this.linuxUnitTypeInLetter = linuxUnitTypeInLetter;
this.multiplier = multiplier;
}
@@ -57,6 +59,10 @@
return value * multiplier;
}
+ public String getLinuxUnitTypeInLetter() {
+ return linuxUnitTypeInLetter;
+ }
+
public static StorageUnit lookupBySuffix(String name) {
return SUFFIX_TO_UNIT_MAP.get(name);
}