[ASTERIXDB-1995][STO] Abort write txn when index cannot be flushed

- user model changes: no
- storage format changes: no
- interface changes: no

Details:
- Fix LSM memory component state transition on flush/merge failure
- When index cannot be flushed, abort waiting threads
- Prevent NPE in MateralizerTaskState when file creation fails
- Check parent dirs creation for index metadata file

Change-Id: I28592c30c788f4a6f44db8b47a84bc77f6b3f8f3
Reviewed-on: https://asterix-gerrit.ics.uci.edu/1896
Sonar-Qube: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
BAD: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: abdullah alamoudi <bamousaa@gmail.com>
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java
index dea5259..2799765 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java
@@ -64,6 +64,8 @@
     public NodeControllerService[] ncs = new NodeControllerService[0];
     public IHyracksClientConnection hcc;
 
+    private static final String DEFAULT_STORAGE_PATH = joinPath("target", "io", "dir");
+    private static String storagePath = DEFAULT_STORAGE_PATH;
     private ConfigManager configManager;
     private List<String> nodeNames;
 
@@ -217,8 +219,16 @@
         }
     }
 
+    public static void setStoragePath(String path) {
+        storagePath = path;
+    }
+
+    public static void restoreDefaultStoragePath() {
+        storagePath = DEFAULT_STORAGE_PATH;
+    }
+
     protected String getDefaultStoragePath() {
-        return joinPath("target", "io", "dir");
+        return storagePath;
     }
 
     public void removeTestStorageFiles() {
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/storage/DiskIsFullTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/storage/DiskIsFullTest.java
new file mode 100644
index 0000000..58697a9
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/storage/DiskIsFullTest.java
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.test.storage;
+
+import static org.apache.hyracks.util.StorageUtil.StorageUnit.MEGABYTE;
+
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.asterix.api.common.AsterixHyracksIntegrationUtil;
+import org.apache.asterix.app.bootstrap.TestNodeController;
+import org.apache.asterix.app.data.gen.TupleGenerator;
+import org.apache.asterix.app.data.gen.TupleGenerator.GenerationFunction;
+import org.apache.asterix.common.config.DatasetConfig.DatasetType;
+import org.apache.asterix.common.dataflow.LSMInsertDeleteOperatorNodePushable;
+import org.apache.asterix.common.exceptions.ExceptionUtils;
+import org.apache.asterix.common.transactions.ITransactionContext;
+import org.apache.asterix.external.util.DataflowUtils;
+import org.apache.asterix.file.StorageComponentProvider;
+import org.apache.asterix.metadata.entities.Dataset;
+import org.apache.asterix.metadata.entities.Index;
+import org.apache.asterix.metadata.entities.InternalDatasetDetails;
+import org.apache.asterix.metadata.entities.InternalDatasetDetails.PartitioningStrategy;
+import org.apache.asterix.om.types.ARecordType;
+import org.apache.asterix.om.types.BuiltinType;
+import org.apache.asterix.om.types.IAType;
+import org.apache.asterix.test.common.TestHelper;
+import org.apache.commons.lang3.SystemUtils;
+import org.apache.hyracks.api.comm.VSizeFrame;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.exceptions.ErrorCode;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
+import org.apache.hyracks.storage.am.lsm.common.impls.NoMergePolicyFactory;
+import org.apache.hyracks.util.DiskUtil;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class DiskIsFullTest {
+
+    private static final IAType[] KEY_TYPES = { BuiltinType.AINT32 };
+    private static final ARecordType RECORD_TYPE = new ARecordType("TestRecordType", new String[] { "key", "value" },
+            new IAType[] { BuiltinType.AINT32, BuiltinType.AINT64 }, false);
+    private static final GenerationFunction[] RECORD_GEN_FUNCTION =
+            { GenerationFunction.DETERMINISTIC, GenerationFunction.DETERMINISTIC };
+    private static final boolean[] UNIQUE_RECORD_FIELDS = { true, false };
+    private static final ARecordType META_TYPE = null;
+    private static final GenerationFunction[] META_GEN_FUNCTION = null;
+    private static final boolean[] UNIQUE_META_FIELDS = null;
+    private static final int[] KEY_INDEXES = { 0 };
+    private static final int[] KEY_INDICATOR = { Index.RECORD_INDICATOR };
+    private static final List<Integer> KEY_INDICATOR_LIST = Arrays.asList(new Integer[] { Index.RECORD_INDICATOR });
+    private static final int DATASET_ID = 101;
+    private static final String DATAVERSE_NAME = "TestDV";
+    private static final String DATASET_NAME = "TestDS";
+    private static final String DATA_TYPE_NAME = "DUMMY";
+    private static final String NODE_GROUP_NAME = "DEFAULT";
+    private static final String TEST_DISK_NAME = "asterixdb_ram_disk";
+    private boolean shouldRun = true;
+
+    @Before
+    public void setUp() throws Exception {
+        if (!SystemUtils.IS_OS_MAC) {
+            System.out.println("Skipping test " + DiskIsFullTest.class.getName() + " due to unsupported OS");
+            shouldRun = false;
+            return;
+        }
+        System.out.println("SetUp: ");
+        TestHelper.deleteExistingInstanceFiles();
+        // create RAM disk
+        final Path ramDiskRoot = DiskUtil.mountRamDisk(TEST_DISK_NAME, 4, MEGABYTE);
+        // Use RAM disk for storage
+        AsterixHyracksIntegrationUtil.setStoragePath(ramDiskRoot.toAbsolutePath().toString());
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        if (!shouldRun) {
+            return;
+        }
+        System.out.println("TearDown");
+        TestHelper.deleteExistingInstanceFiles();
+        DiskUtil.unmountRamDisk(TEST_DISK_NAME);
+        AsterixHyracksIntegrationUtil.restoreDefaultStoragePath();
+    }
+
+    @Test
+    public void testDiskIsFull() {
+        if (!shouldRun) {
+            return;
+        }
+        HyracksDataException expectedException =
+                HyracksDataException.create(ErrorCode.CANNOT_MODIFY_INDEX_DISK_IS_FULL);
+        try {
+            TestNodeController nc = new TestNodeController(null, false);
+            nc.init();
+            StorageComponentProvider storageManager = new StorageComponentProvider();
+            List<List<String>> partitioningKeys = new ArrayList<>();
+            partitioningKeys.add(Collections.singletonList("key"));
+            Dataset dataset =
+                    new Dataset(DATAVERSE_NAME, DATASET_NAME, DATAVERSE_NAME, DATA_TYPE_NAME, NODE_GROUP_NAME, null,
+                            null,
+                            new InternalDatasetDetails(null, PartitioningStrategy.HASH, partitioningKeys, null, null,
+                                    null, false, null, false), null, DatasetType.INTERNAL, DATASET_ID, 0);
+            try {
+                nc.createPrimaryIndex(dataset, KEY_TYPES, RECORD_TYPE, META_TYPE, new NoMergePolicyFactory(), null,
+                        null, storageManager, KEY_INDEXES, KEY_INDICATOR_LIST);
+                IHyracksTaskContext ctx = nc.createTestContext(false);
+                nc.newJobId();
+                ITransactionContext txnCtx = nc.getTransactionManager().getTransactionContext(nc.getTxnJobId(), true);
+                // Prepare insert operation
+                LSMInsertDeleteOperatorNodePushable insertOp =
+                        nc.getInsertPipeline(ctx, dataset, KEY_TYPES, RECORD_TYPE, META_TYPE,
+                                new NoMergePolicyFactory(), null, null, KEY_INDEXES, KEY_INDICATOR_LIST, storageManager)
+                                .getLeft();
+                insertOp.open();
+                TupleGenerator tupleGenerator =
+                        new TupleGenerator(RECORD_TYPE, META_TYPE, KEY_INDEXES, KEY_INDICATOR, RECORD_GEN_FUNCTION,
+                                UNIQUE_RECORD_FIELDS, META_GEN_FUNCTION, UNIQUE_META_FIELDS);
+                VSizeFrame frame = new VSizeFrame(ctx);
+                FrameTupleAppender tupleAppender = new FrameTupleAppender(frame);
+                // Insert records until disk becomes full
+                int tupleCount = 100000;
+                while (tupleCount > 0) {
+                    ITupleReference tuple = tupleGenerator.next();
+                    try {
+                        DataflowUtils.addTupleToFrame(tupleAppender, tuple, insertOp);
+                    } catch (Throwable t) {
+                        final Throwable rootCause = ExceptionUtils.getRootCause(t);
+                        rootCause.printStackTrace();
+                        if (rootCause instanceof HyracksDataException) {
+                            HyracksDataException cause = (HyracksDataException) rootCause;
+                            Assert.assertEquals(cause.getErrorCode(), expectedException.getErrorCode());
+                            Assert.assertEquals(cause.getMessage(), expectedException.getMessage());
+                            return;
+                        } else {
+                            break;
+                        }
+                    }
+                    tupleCount--;
+                }
+                Assert.fail("Expected exception (" + expectedException + ") was not thrown");
+            } finally {
+                nc.deInit();
+            }
+        } catch (Throwable e) {
+            e.printStackTrace();
+            Assert.fail("Expected exception (" + expectedException + ") was not thrown");
+        }
+    }
+}
\ No newline at end of file
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
index e530bc3..b117cf1 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
@@ -18,6 +18,8 @@
  */
 package org.apache.asterix.transaction.management.resource;
 
+import static org.apache.hyracks.api.exceptions.ErrorCode.CANNOT_CREATE_FILE;
+
 import java.io.File;
 import java.io.FileInputStream;
 import java.io.FileOutputStream;
@@ -190,10 +192,12 @@
         FileReference resourceFile = ioManager.resolve(relativePath);
         if (resourceFile.getFile().exists()) {
             throw new HyracksDataException("Duplicate resource: " + resourceFile.getAbsolutePath());
-        } else {
-            resourceFile.getFile().getParentFile().mkdirs();
         }
-        resourceCache.put(resource.getPath(), resource);
+
+        final File parent = resourceFile.getFile().getParentFile();
+        if (!parent.exists() && !parent.mkdirs()) {
+            throw HyracksDataException.create(CANNOT_CREATE_FILE, parent.getAbsolutePath());
+        }
 
         try (FileOutputStream fos = new FileOutputStream(resourceFile.getFile());
                 ObjectOutputStream oosToFos = new ObjectOutputStream(fos)) {
@@ -203,6 +207,8 @@
             throw new HyracksDataException(e);
         }
 
+        resourceCache.put(resource.getPath(), resource);
+
         //if replication enabled, send resource metadata info to remote nodes
         if (isReplicationEnabled) {
             createReplicationJob(ReplicationOperation.REPLICATE, resourceFile);
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java
index 7dbade2..e6fbc6f 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java
@@ -121,6 +121,7 @@
     public static final int FOUND_MULTIPLE_TRANSACTIONS = 85;
     public static final int UNRECOGNIZED_INDEX_COMPONENT_FILE = 86;
     public static final int UNEQUAL_NUM_FILTERS_TREES = 87;
+    public static final int CANNOT_MODIFY_INDEX_DISK_IS_FULL = 88;
 
     // Compilation error codes.
     public static final int RULECOLLECTION_NOT_INSTANCE_OF_LIST = 10000;
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties b/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties
index cd38917..d2e05e3 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties
@@ -104,5 +104,6 @@
 85 = Found more than one transaction file in %1$s
 86 = Found an unrecognized index file %1$s
 87 = Unequal number of trees and filters found in %1$s
+88 = Cannot modify index (Disk is full)
 
 10000 = The given rule collection %1$s is not an instance of the List class.
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/MaterializerTaskState.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/MaterializerTaskState.java
index 918155d..31cbaad 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/MaterializerTaskState.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/MaterializerTaskState.java
@@ -66,7 +66,9 @@
     }
 
     public void close() throws HyracksDataException {
-        out.close();
+        if (out != null) {
+            out.close();
+        }
     }
 
     public void appendFrame(ByteBuffer buffer) throws HyracksDataException {
@@ -74,20 +76,25 @@
     }
 
     public void writeOut(IFrameWriter writer, IFrame frame, boolean failed) throws HyracksDataException {
-        RunFileReader in = out.createReader();
+        RunFileReader in = null;
+        if (out != null) {
+            in = out.createReader();
+        }
         writer.open();
         try {
             if (failed) {
                 writer.fail();
                 return;
             }
-            in.open();
-            try {
-                while (in.nextFrame(frame)) {
-                    writer.nextFrame(frame.getBuffer());
+            if (in != null) {
+                in.open();
+                try {
+                    while (in.nextFrame(frame)) {
+                        writer.nextFrame(frame.getBuffer());
+                    }
+                } finally {
+                    in.close();
                 }
-            } finally {
-                in.close();
             }
         } catch (Exception e) {
             writer.fail();
@@ -96,10 +103,10 @@
             try {
                 writer.close();
             } finally {
-                if (numConsumers.decrementAndGet() == 0) {
+                if (numConsumers.decrementAndGet() == 0 && out != null) {
                     out.getFileReference().delete();
                 }
             }
         }
     }
-}
+}
\ No newline at end of file
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMMemoryComponent.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMMemoryComponent.java
index 7cbe35f..1ee68d9 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMMemoryComponent.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMMemoryComponent.java
@@ -144,6 +144,11 @@
                     throw new IllegalStateException("Flush sees an illegal LSM memory compoenent state: " + state);
                 }
                 readerCount--;
+                if (failedOperation) {
+                    // if flush failed, return the component state to READABLE_UNWRITABLE
+                    state = ComponentState.READABLE_UNWRITABLE;
+                    return;
+                }
                 if (readerCount == 0) {
                     state = ComponentState.INACTIVE;
                 } else {
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java
index 50eac67..8ff907a 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java
@@ -47,6 +47,7 @@
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexOperationContext;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMMemoryComponent;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMMergePolicy;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMOperationTracker;
 import org.apache.hyracks.storage.am.lsm.common.api.LSMOperationType;
@@ -131,6 +132,10 @@
                     // Flush and merge operations should never reach this wait call, because they are always try operations.
                     // If they fail to enter the components, then it means that there are an ongoing flush/merge operation on
                     // the same components, so they should not proceed.
+                    if (opType == LSMOperationType.MODIFICATION) {
+                        // before waiting, make sure the index is in a modifiable state to avoid waiting forever.
+                        ensureIndexModifiable();
+                    }
                     opTracker.wait();
                 } catch (InterruptedException e) {
                     throw new HyracksDataException(e);
@@ -186,6 +191,7 @@
                 break;
             case MERGE:
                 lsmIndex.getIOOperationCallback().beforeOperation(LSMOperationType.MERGE);
+                break;
             default:
                 break;
         }
@@ -498,15 +504,17 @@
         }
 
         ILSMDiskComponent newComponent = null;
+        boolean failedOperation = false;
         try {
             newComponent = lsmIndex.flush(operation);
             operation.getCallback().afterOperation(LSMOperationType.FLUSH, null, newComponent);
             lsmIndex.markAsValid(newComponent);
         } catch (Throwable e) {
+            failedOperation = true;
             e.printStackTrace();
             throw e;
         } finally {
-            exitComponents(ctx, LSMOperationType.FLUSH, newComponent, false);
+            exitComponents(ctx, LSMOperationType.FLUSH, newComponent, failedOperation);
             operation.getCallback().afterFinalize(LSMOperationType.FLUSH, newComponent);
         }
         if (LOGGER.isLoggable(Level.INFO)) {
@@ -545,15 +553,17 @@
         }
 
         ILSMDiskComponent newComponent = null;
+        boolean failedOperation = false;
         try {
             newComponent = lsmIndex.merge(operation);
             operation.getCallback().afterOperation(LSMOperationType.MERGE, ctx.getComponentHolder(), newComponent);
             lsmIndex.markAsValid(newComponent);
         } catch (Throwable e) {
+            failedOperation = true;
             e.printStackTrace();
             throw e;
         } finally {
-            exitComponents(ctx, LSMOperationType.MERGE, newComponent, false);
+            exitComponents(ctx, LSMOperationType.MERGE, newComponent, failedOperation);
             operation.getCallback().afterFinalize(LSMOperationType.MERGE, newComponent);
         }
         if (LOGGER.isLoggable(Level.INFO)) {
@@ -660,4 +670,23 @@
             exit(ctx);
         }
     }
+
+    /***
+     * Ensures the index is in a modifiable state
+     * @throws HyracksDataException if the index is not in a modifiable state
+     */
+    private void ensureIndexModifiable() throws HyracksDataException {
+        // find if there is any memory component which is in a writable state or eventually will be in a writable state
+        for (ILSMMemoryComponent memoryComponent : lsmIndex.getMemoryComponents()) {
+            switch (memoryComponent.getState()) {
+                case INACTIVE:
+                case READABLE_WRITABLE:
+                case READABLE_UNWRITABLE_FLUSHING:
+                    return;
+                default:
+                    // continue to the next component
+            }
+        }
+        throw HyracksDataException.create(ErrorCode.CANNOT_MODIFY_INDEX_DISK_IS_FULL);
+    }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-util/pom.xml b/hyracks-fullstack/hyracks/hyracks-util/pom.xml
index 5a68df0..3b03fce 100644
--- a/hyracks-fullstack/hyracks/hyracks-util/pom.xml
+++ b/hyracks-fullstack/hyracks/hyracks-util/pom.xml
@@ -64,6 +64,10 @@
       <groupId>com.fasterxml.jackson.core</groupId>
       <artifactId>jackson-core</artifactId>
     </dependency>
+    <dependency>
+      <groupId>org.apache.commons</groupId>
+      <artifactId>commons-lang3</artifactId>
+    </dependency>
   </dependencies>
 
 </project>
diff --git a/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/DiskUtil.java b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/DiskUtil.java
new file mode 100644
index 0000000..9a65d72
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/DiskUtil.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.util;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.commons.lang3.SystemUtils;
+
+public class DiskUtil {
+
+    private static final Logger LOGGER = Logger.getLogger(DiskUtil.class.getName());
+
+    private DiskUtil() {
+        throw new AssertionError("Util class should not be initialized.");
+    }
+
+    /**
+     * Mounts a RAM disk
+     *
+     * @param name
+     * @param size
+     * @param unit
+     * @return The root of the mounted disk
+     * @throws IOException
+     * @throws InterruptedException
+     */
+    public static Path mountRamDisk(String name, int size, StorageUtil.StorageUnit unit)
+            throws IOException, InterruptedException {
+        if (SystemUtils.IS_OS_MAC) {
+            return mountMacRamDisk(name, (StorageUtil.getIntSizeInBytes(size, unit) * 2) / StorageUtil.BASE);
+        } else if (SystemUtils.IS_OS_LINUX) {
+            return mountLinuxRamDisk(name, size + unit.getLinuxUnitTypeInLetter());
+        }
+        throw new UnsupportedOperationException("Unsupported OS: " + System.getProperty("os.name"));
+    }
+
+    /**
+     * Unmounts a disk
+     *
+     * @param name
+     * @throws IOException
+     * @throws InterruptedException
+     */
+    public static void unmountRamDisk(String name) throws IOException, InterruptedException {
+        if (SystemUtils.IS_OS_MAC) {
+            unmountMacRamDisk(name);
+        } else if (SystemUtils.IS_OS_LINUX) {
+            unmountLinuxRamDisk(name);
+        }
+    }
+
+    private static Path mountMacRamDisk(String name, long size) throws IOException, InterruptedException {
+        final String cmd = "diskutil erasevolume HFS+ '" + name + "' `hdiutil attach -nomount ram://" + size + "`";
+        final ProcessBuilder pb = new ProcessBuilder("/bin/sh", "-c", cmd);
+        final Process p = pb.start();
+        watchProcess(p);
+        p.waitFor();
+        return Paths.get("/Volumes", name);
+    }
+
+    private static void unmountMacRamDisk(String name) throws InterruptedException, IOException {
+        final String cmd = "diskutil unmount " + name;
+        final ProcessBuilder pb = new ProcessBuilder("/bin/sh", "-c", cmd);
+        final Process p = pb.start();
+        watchProcess(p);
+        p.waitFor();
+    }
+
+    private static Path mountLinuxRamDisk(String name, String size) throws IOException, InterruptedException {
+        Path root = Paths.get("/tmp", name);
+        if (!Files.exists(root)) {
+            Files.createFile(root);
+        }
+        final String cmd = "mount -o size=" + size + " -t tmpfs none /tmp/" + name;
+        final ProcessBuilder pb = new ProcessBuilder("bash", "-c", cmd);
+        final Process p = pb.start();
+        watchProcess(p);
+        p.waitFor();
+        return root;
+    }
+
+    private static void unmountLinuxRamDisk(String name) throws InterruptedException, IOException {
+        final String cmd = "umount /tmp/" + name;
+        final ProcessBuilder pb = new ProcessBuilder("bash", "-c", cmd);
+        final Process p = pb.start();
+        watchProcess(p);
+        p.waitFor();
+    }
+
+    private static void watchProcess(Process p) {
+        new Thread(() -> {
+            final BufferedReader input = new BufferedReader(new InputStreamReader(p.getInputStream()));
+            String line;
+            try {
+                while ((line = input.readLine()) != null) {
+                    LOGGER.info(line);
+                }
+            } catch (IOException e) {
+                LOGGER.log(Level.WARNING, e.getMessage(), e);
+            }
+        }).start();
+    }
+}
\ No newline at end of file
diff --git a/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/StorageUtil.java b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/StorageUtil.java
index 9001e1b..dbfe6f9 100644
--- a/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/StorageUtil.java
+++ b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/StorageUtil.java
@@ -23,17 +23,18 @@
 
 public class StorageUtil {
 
-    private static final int BASE = 1024;
+    public static final int BASE = 1024;
 
     public enum StorageUnit {
-        BYTE("B", 1),
-        KILOBYTE("KB", BASE),
-        MEGABYTE("MB", KILOBYTE.multiplier * BASE),
-        GIGABYTE("GB", MEGABYTE.multiplier * BASE),
-        TERABYTE("TB", GIGABYTE.multiplier * BASE),
-        PETABYTE("PB", TERABYTE.multiplier * BASE);
+        BYTE("B", "b", 1),
+        KILOBYTE("KB", "kb", BASE),
+        MEGABYTE("MB", "m", KILOBYTE.multiplier * BASE),
+        GIGABYTE("GB", "g", MEGABYTE.multiplier * BASE),
+        TERABYTE("TB", "t", GIGABYTE.multiplier * BASE),
+        PETABYTE("PB", "p", TERABYTE.multiplier * BASE);
 
         private final String unitTypeInLetter;
+        private final String linuxUnitTypeInLetter;
         private final long multiplier;
         private static final Map<String, StorageUnit> SUFFIX_TO_UNIT_MAP = new HashMap<>();
 
@@ -43,8 +44,9 @@
             }
         }
 
-        StorageUnit(String unitTypeInLetter, long multiplier) {
+        StorageUnit(String unitTypeInLetter, String linuxUnitTypeInLetter, long multiplier) {
             this.unitTypeInLetter = unitTypeInLetter;
+            this.linuxUnitTypeInLetter = linuxUnitTypeInLetter;
             this.multiplier = multiplier;
         }
 
@@ -57,6 +59,10 @@
             return value * multiplier;
         }
 
+        public String getLinuxUnitTypeInLetter() {
+            return linuxUnitTypeInLetter;
+        }
+
         public static StorageUnit lookupBySuffix(String name) {
             return SUFFIX_TO_UNIT_MAP.get(name);
         }