Fix Numeric Overflow For Units > GB

refactoring / cleanup
handle negative numbers

Change-Id: Id5a3896ebfb38fc565b3524beed940dfbbf81d4f
Reviewed-on: https://asterix-gerrit.ics.uci.edu/1084
Sonar-Qube: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Taewoo Kim <wangsaeu@yahoo.com>
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/PropertyInterpreters.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/PropertyInterpreters.java
index 4dd07d8..6703fcd 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/PropertyInterpreters.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/PropertyInterpreters.java
@@ -25,93 +25,41 @@
 public class PropertyInterpreters {
 
     public static IPropertyInterpreter<Integer> getIntegerPropertyInterpreter() {
-        return new IPropertyInterpreter<Integer>() {
-            @Override
-            public Integer interpret(String s) throws IllegalArgumentException {
-                try {
-                    return Integer.parseInt(s);
-                } catch (NumberFormatException e) {
-                    throw new IllegalArgumentException(e);
-                }
-            }
-        };
+        return Integer::parseInt;
     }
 
     public static IPropertyInterpreter<Boolean> getBooleanPropertyInterpreter() {
-        return new IPropertyInterpreter<Boolean>() {
-            @Override
-            public Boolean interpret(String s) throws IllegalArgumentException {
-                return Boolean.parseBoolean(s);
-            }
-        };
+        return Boolean::parseBoolean;
     }
 
     public static IPropertyInterpreter<Long> getLongPropertyInterpreter() {
-        return new IPropertyInterpreter<Long>() {
-            @Override
-            public Long interpret(String s) throws IllegalArgumentException {
-                try {
-                    return Long.parseLong(s);
-                } catch (NumberFormatException e) {
-                    throw new IllegalArgumentException(e);
-                }
-            }
-        };
+        return Long::parseLong;
     }
 
     public static IPropertyInterpreter<Level> getLevelPropertyInterpreter() {
-        return new IPropertyInterpreter<Level>() {
-            @Override
-            public Level interpret(String s) throws IllegalArgumentException {
-                return Level.parse(s);
-            }
-        };
+        return Level::parse;
     }
 
     public static IPropertyInterpreter<String> getStringPropertyInterpreter() {
-        return new IPropertyInterpreter<String>() {
-            @Override
-            public String interpret(String s) throws IllegalArgumentException {
-                return s;
-            }
-        };
+        return s -> s;
     }
 
     public static IPropertyInterpreter<Double> getDoublePropertyInterpreter() {
-        return new IPropertyInterpreter<Double>() {
-            @Override
-            public Double interpret(String s) throws IllegalArgumentException {
-                try {
-                    return Double.parseDouble(s);
-                } catch (NumberFormatException e) {
-                    throw new IllegalArgumentException(e);
-                }
-            }
-        };
+        return Double::parseDouble;
     }
 
     public static IPropertyInterpreter<Long> getLongBytePropertyInterpreter() {
-        return new IPropertyInterpreter<Long>() {
-            @Override
-            public Long interpret(String s) throws IllegalArgumentException {
-                try {
-                    return StorageUtil.getByteValue(s);
-                } catch (NumberFormatException e) {
-                    throw new IllegalArgumentException(e);
-                }
-            }
-        };
+        return StorageUtil::getByteValue;
     }
 
     public static IPropertyInterpreter<Integer> getIntegerBytePropertyInterpreter() {
-        return new IPropertyInterpreter<Integer>() {
-            @Override
-            public Integer interpret(String s) throws IllegalArgumentException {
-                try {
-                    return (int) StorageUtil.getByteValue(s);
-                } catch (NumberFormatException e) {
-                    throw new IllegalArgumentException(e);
-                }
+        return s -> {
+                long result = StorageUtil.getByteValue(s);
+            if (result > Integer.MAX_VALUE || result < Integer.MIN_VALUE) {
+                throw new IllegalArgumentException(
+                        "The given value: " + result + " is not within the int range.");
+            } else {
+                return (int) result;
             }
         };
     }
diff --git a/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/StorageUtil.java b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/StorageUtil.java
index 7025f15..31bce7a 100644
--- a/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/StorageUtil.java
+++ b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/StorageUtil.java
@@ -18,28 +18,48 @@
  */
 package org.apache.hyracks.util;
 
+import java.util.HashMap;
+import java.util.Map;
+
 public class StorageUtil {
 
-    private static final int BASE = 1024, KB = BASE, MB = KB * BASE, GB = MB * BASE, TB = GB * BASE, PB = TB * BASE;
+    private static final int BASE = 1024;
 
     public enum StorageUnit {
-        BYTE("B"),
-        KILOBYTE("KB"),
-        MEGABYTE("MB"),
-        GIGABYTE("GB"),
-        TERABYTE("TB"),
-        PETABYTE("PB");
+        BYTE("B", 1),
+        KILOBYTE("KB", BASE),
+        MEGABYTE("MB", KILOBYTE.multiplier * BASE),
+        GIGABYTE("GB", MEGABYTE.multiplier * BASE),
+        TERABYTE("TB", GIGABYTE.multiplier * BASE),
+        PETABYTE("PB", TERABYTE.multiplier * BASE);
 
         private final String unitTypeInLetter;
+        private final long multiplier;
+        private static final Map<String, StorageUnit> SUFFIX_TO_UNIT_MAP = new HashMap<>();
 
-        private StorageUnit(String unitTypeInLetter) {
+        static {
+            for (StorageUnit unit : values()) {
+                SUFFIX_TO_UNIT_MAP.put(unit.unitTypeInLetter, unit);
+            }
+        }
+
+        StorageUnit(String unitTypeInLetter, long multiplier) {
             this.unitTypeInLetter = unitTypeInLetter;
+            this.multiplier = multiplier;
         }
 
         @Override
         public String toString() {
             return this.unitTypeInLetter;
         }
+
+        public double toBytes(double value) {
+            return value * multiplier;
+        }
+
+        public static StorageUnit lookupBySuffix(String name) {
+            return SUFFIX_TO_UNIT_MAP.get(name);
+        }
     }
 
     private StorageUtil() {
@@ -47,119 +67,85 @@
     }
 
     public static int getSizeInBytes(final int size, final StorageUnit unit) {
-        double result = getSizeInBytes((double) size, unit);
-        if (result > Integer.MAX_VALUE) {
-            throw new IllegalStateException("The given value:" + result + " is not within the integer range.");
+        double result = unit.toBytes(size);
+        if (result > Integer.MAX_VALUE || result < Integer.MIN_VALUE) {
+            throw new IllegalArgumentException("The given value:" + result + " is not within the integer range.");
         } else {
             return (int) result;
         }
     }
 
     public static long getSizeInBytes(final long size, final StorageUnit unit) {
-        double result = getSizeInBytes((double) size, unit);
-        if (result > Long.MAX_VALUE) {
-            throw new IllegalStateException("The given value:" + result + " is not within the long range.");
+        double result = unit.toBytes(size);
+        if (result > Long.MAX_VALUE || result < Long.MIN_VALUE) {
+            throw new IllegalArgumentException("The given value:" + result + " is not within the long range.");
         } else {
             return (long) result;
         }
     }
 
-    public static double getSizeInBytes(final double size, final StorageUnit unit) {
-        switch (unit) {
-            case BYTE:
-                return size;
-            case KILOBYTE:
-                return size * KB;
-            case MEGABYTE:
-                return size * MB;
-            case GIGABYTE:
-                return size * GB;
-            case TERABYTE:
-                return size * TB;
-            case PETABYTE:
-                return size * PB;
-            default:
-                throw new IllegalStateException("Unsupported unit: " + unit);
-        }
-    }
-
     /**
      * Helper method to parse a byte unit string to its double value and unit
      * (e.g., 10,345.8MB becomes Pair<10345.8, StorageUnit.MB>.)
      *
-     * @throws HyracksException
+     * @throws IllegalArgumentException
      */
-    public static ByteValueStringInfo parseByteUnitString(String s) {
+    public static double getSizeInBytes(String s) {
         String sSpaceRemoved = s.replaceAll(" ", "");
         String sUpper = sSpaceRemoved.toUpperCase();
-        ByteValueStringInfo valueAndUnitType = new ByteValueStringInfo();
 
         // Default type
-        StorageUtil.StorageUnit unitType = StorageUnit.BYTE;
+        StorageUtil.StorageUnit unitType;
 
         // If the length is 1, it should only contain a digit number.
         if (sUpper.length() == 1) {
             if (Character.isDigit(sUpper.charAt(0))) {
                 unitType = StorageUnit.BYTE;
             } else {
-                throw new IllegalStateException(
-                        "The given string: " + s + " is not a byte unit string (e.g., 320KB or 1024).");
+                throw invalidFormatException(s);
             }
         } else if (sUpper.length() > 1) {
             String checkStr = sUpper.substring(sUpper.length() - 2);
-            boolean found = false;
-            for (StorageUnit unit : StorageUnit.values()) {
-                if (checkStr.equals(unit.toString())) {
-                    unitType = unit;
-                    found = true;
-                    break;
-                }
-            }
+            unitType = StorageUnit.lookupBySuffix(checkStr);
 
-            if (!found) {
+            if (unitType == null) {
                 // The last suffix should be at least "B" or a digit to be qualified as byte unit string.
                 char lastChar = sUpper.charAt(sUpper.length() - 1);
                 if (sUpper.substring(sUpper.length() - 1).equals(StorageUnit.BYTE.toString())
                         || Character.isDigit(lastChar)) {
                     unitType = StorageUnit.BYTE;
                 } else {
-                    throw new IllegalStateException(
-                            "The given string: " + s + " is not a byte unit string (e.g., 320KB or 1024).");
+                    throw invalidFormatException(s);
                 }
             }
         } else {
             // String length is zero. We can't parse this string.
-            throw new IllegalStateException(
-                    "The given string: " + s + " is not a byte unit string (e.g., 320KB or 1024).");
+            throw invalidFormatException(s);
         }
 
         // Strip all unit suffixes such as KB, MB ...
-        String sFinalVal = sUpper.replaceAll("[^\\.0123456789]", "");
+        String sFinalVal = sUpper.replaceAll("[^-\\.0123456789]", "");
 
-        // Return the digit and its unit type.
-        valueAndUnitType.value = Double.parseDouble(sFinalVal);
-        valueAndUnitType.unitType = unitType;
+        // Return the bytes.
+        return unitType.toBytes(Double.parseDouble(sFinalVal));
+    }
 
-        return valueAndUnitType;
+    private static IllegalArgumentException invalidFormatException(String s) {
+        return new IllegalArgumentException(
+                "The given string: " + s + " is not a byte unit string (e.g., 320KB or 1024).");
     }
 
     /**
      * Return byte value for the given string (e.g., 0.1KB, 100kb, 1mb, 3MB, 8.5GB ...)
      *
-     * @throws HyracksException
+     * @throws IllegalArgumentException
      */
     public static long getByteValue(String s) {
-        try {
-            ByteValueStringInfo valueAndUnitType = parseByteUnitString(s);
-
-            double result = getSizeInBytes(valueAndUnitType.value, valueAndUnitType.unitType);
-            if (result > Long.MAX_VALUE) {
-                throw new IllegalStateException("The given value:" + result + " is not within the long range.");
-            } else {
-                return (long) result;
-            }
-        } catch (NumberFormatException e) {
-            throw new IllegalArgumentException(e);
+        double result = getSizeInBytes(s);
+        if (result > Long.MAX_VALUE || result < Long.MIN_VALUE) {
+            throw new IllegalArgumentException("The given value:" + result + " is not within the long range.");
+        } else {
+            return (long) result;
         }
     }
 
@@ -177,14 +163,4 @@
         final int baseValue = (63 - Long.numberOfLeadingZeros(bytes)) / 10;
         return String.format("%.2f %sB", (double) bytes / (1L << (baseValue * 10)), " kMGTPE".charAt(baseValue));
     }
-
-    private static class ByteValueStringInfo {
-        double value;
-        StorageUnit unitType;
-
-        ByteValueStringInfo() {
-            value = 0.0;
-            unitType = StorageUnit.BYTE;
-        }
-    }
 }