ASTERIXDB-1673 and some small cleanups

- Fix potential object creation issue in ARecordCaster.
- refactor getFieldOffsetById() in ARecordSerializerDeserailzer.
- remove dead code in method set() in ARecordVisitablePointable.
- Introduce unrestable allocation in PointableAllocator as static method.

Change-Id: I3802a5a33b46b58967ffb4a28a70ebe9671bfabf
Reviewed-on: https://asterix-gerrit.ics.uci.edu/1260
Sonar-Qube: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Yingyi Bu <buyingyi@gmail.com>
diff --git a/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/serde/ARecordSerializerDeserializer.java b/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/serde/ARecordSerializerDeserializer.java
index 55c9d13..fb5d0c7 100644
--- a/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/serde/ARecordSerializerDeserializer.java
+++ b/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/serde/ARecordSerializerDeserializer.java
@@ -214,8 +214,7 @@
 
     @SuppressWarnings("unchecked")
     public static void serializeSimpleSchemalessRecord(List<Pair<String, String>> record, DataOutput dataOutput,
-            boolean writeTypeTag)
-            throws HyracksDataException {
+            boolean writeTypeTag) throws HyracksDataException {
         ISerializerDeserializer<AString> stringSerde = AqlSerializerDeserializerProvider.INSTANCE
                 .getSerializerDeserializer(BuiltinType.ASTRING);
         RecordBuilder confRecordBuilder = new RecordBuilder();
@@ -268,67 +267,39 @@
 
     public static final int getFieldOffsetById(byte[] serRecord, int offset, int fieldId, int nullBitmapSize,
             boolean isOpen) {
-        byte nullTestCode = (byte) (1 << (7 - 2 * (fieldId % 4)));
-        byte missingTestCode = (byte) (1 << (7 - 2 * (fieldId % 4) - 1));
+        final byte nullTestCode = (byte) (1 << (7 - 2 * (fieldId % 4)));
+        final byte missingTestCode = (byte) (1 << (7 - 2 * (fieldId % 4) - 1));
+
+        //early exit if not Record
+        if (serRecord[offset] != ATypeTag.SERIALIZED_RECORD_TYPE_TAG) {
+            return -1;
+        }
+
+        //advance to isExpanded or numberOfSchemaFields
+        int pointer = offset + 5;
+
         if (isOpen) {
-            if (serRecord[0 + offset] == ATypeTag.RECORD.serialize()) {
-                // 5 is the index of the byte that determines whether the record
-                // is expanded or not, i.e. it has an open part.
-                if (serRecord[5 + offset] == 1) { // true
-                    if (nullBitmapSize > 0) {
-                        // 14 = tag (1) + record Size (4) + isExpanded (1) +
-                        // offset of openPart (4) + number of closed fields (4)
-                        int pos = 14 + offset + fieldId / 4;
-                        if ((serRecord[pos] & nullTestCode) == 0) {
-                            // the field value is null
-                            return 0;
-                        }
-                        if ((serRecord[pos] & missingTestCode) == 0) {
-                            // the field value is missing
-                            return -1;
-                        }
-                    }
-                    return offset + AInt32SerializerDeserializer.getInt(serRecord,
-                            14 + offset + nullBitmapSize + (4 * fieldId));
-                } else {
-                    if (nullBitmapSize > 0) {
-                        // 9 = tag (1) + record Size (4) + isExpanded (1) +
-                        // number of closed fields (4)
-                        int pos = 10 + offset + fieldId / 4;
-                        if ((serRecord[pos] & nullTestCode) == 0) {
-                            // the field value is null
-                            return 0;
-                        }
-                        if ((serRecord[pos] & missingTestCode) == 0) {
-                            // the field value is missing
-                            return -1;
-                        }
-                    }
-                    return offset + AInt32SerializerDeserializer.getInt(serRecord,
-                            10 + offset + nullBitmapSize + (4 * fieldId));
-                }
-            } else {
+            final boolean isExpanded = serRecord[pointer] == 1;
+            //if isExpanded, advance to numberOfSchemaFields
+            pointer += 1 + (isExpanded ? 4 : 0);
+        }
+
+        //advance to nullBitmap
+        pointer += 4;
+
+        if (nullBitmapSize > 0) {
+            final int pos = pointer + fieldId / 4;
+            if ((serRecord[pos] & nullTestCode) == 0) {
+                // the field value is null
+                return 0;
+            }
+            if ((serRecord[pos] & missingTestCode) == 0) {
+                // the field value is missing
                 return -1;
             }
-        } else {
-            if (serRecord[offset] != ATypeTag.SERIALIZED_RECORD_TYPE_TAG) {
-                return Integer.MIN_VALUE;
-            }
-            if (nullBitmapSize > 0) {
-                // 9 = tag (1) + record Size (4) + number of closed fields
-                // (4)
-                int pos = 9 + offset + fieldId / 4;
-                if ((serRecord[pos] & nullTestCode) == 0) {
-                    // the field value is null
-                    return 0;
-                }
-                if ((serRecord[pos] & missingTestCode) == 0) {
-                    // the field value is missing
-                    return -1;
-                }
-            }
-            return offset + AInt32SerializerDeserializer.getInt(serRecord, 9 + offset + nullBitmapSize + (4 * fieldId));
         }
+
+        return offset + AInt32SerializerDeserializer.getInt(serRecord, pointer + nullBitmapSize + (4 * fieldId));
     }
 
     public static final int getFieldOffsetByName(byte[] serRecord, int start, int len, byte[] fieldName, int nstart)
diff --git a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/pointables/ARecordVisitablePointable.java b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/pointables/ARecordVisitablePointable.java
index 68fff2f..d3c18a9 100644
--- a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/pointables/ARecordVisitablePointable.java
+++ b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/pointables/ARecordVisitablePointable.java
@@ -75,17 +75,17 @@
 
     private final int numberOfSchemaFields;
     private final int[] fieldOffsets;
-    private final IVisitablePointable nullReference = AFlatValuePointable.FACTORY.create(null);
-    private final IVisitablePointable missingReference = AFlatValuePointable.FACTORY.create(null);
+    private final IVisitablePointable nullReference = PointableAllocator.allocateUnrestableEmpty();
+    private final IVisitablePointable missingReference = PointableAllocator.allocateUnrestableEmpty();
 
     private int closedPartTypeInfoSize = 0;
-    private int offsetArrayOffset;
     private ATypeTag typeTag;
 
     /**
      * private constructor, to prevent constructing it arbitrarily
      *
      * @param inputType
+     *            inputType should not be null. Use FULLY_OPEN_RECORD_TYPE instead.
      */
     public ARecordVisitablePointable(ARecordType inputType) {
         this.inputRecType = inputType;
@@ -165,27 +165,26 @@
 
         boolean isExpanded = false;
         int openPartOffset = 0;
-        int s = start;
-        int recordOffset = s;
-        if (inputRecType == null) {
-            openPartOffset = s + AInt32SerializerDeserializer.getInt(b, s + 6);
-            s += 8;
-            isExpanded = true;
-        } else {
-            if (inputRecType.isOpen()) {
-                isExpanded = b[s + 5] == 1 ? true : false;
-                if (isExpanded) {
-                    openPartOffset = s + AInt32SerializerDeserializer.getInt(b, s + 6);
-                    s += 10;
-                } else {
-                    s += 6;
-                }
-            } else {
-                s += 5;
+        int recordOffset = start;
+        int offsetArrayOffset;
+
+        //advance to either isExpanded or numberOfSchemaFields
+        int s = start + 5;
+        //inputRecType will never be null.
+        if (inputRecType.isOpen()) {
+            isExpanded = b[s] == 1;
+            //advance either to openPartOffset or numberOfSchemaFields
+            s += 1;
+            if (isExpanded) {
+                openPartOffset = start + AInt32SerializerDeserializer.getInt(b, s);
+                //advance to numberOfSchemaFields
+                s += 4;
             }
         }
+
         try {
             if (numberOfSchemaFields > 0) {
+                //advance to nullBitMap if hasOptionalFields, or fieldOffsets
                 s += 4;
                 int nullBitMapOffset = 0;
                 boolean hasOptionalFields = NonTaggedFormatUtil.hasOptionalField(inputRecType);
diff --git a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/pointables/PointableAllocator.java b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/pointables/PointableAllocator.java
index 0f66a57..c88b95c 100644
--- a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/pointables/PointableAllocator.java
+++ b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/pointables/PointableAllocator.java
@@ -63,6 +63,16 @@
     }
 
     /**
+     * This method should ONLY be used for long lasting IVisitablePointable.
+     *
+     * @return
+     *         a generic type IVisitablePointable.
+     */
+    public static IVisitablePointable allocateUnrestableEmpty() {
+        return AFlatValuePointable.FACTORY.create(null);
+    }
+
+    /**
      * allocate closed part value pointable
      *
      * @param type
@@ -98,8 +108,8 @@
                 if (listItemType.isDerivedType())
                     return allocateFieldValue(listItemType, b, offset + 1);
                 else
-                    return listValueAllocator.allocate(unorederedListTypeAllocator.allocate(TypeTagUtil
-                            .getBuiltinTypeByTag(listItemType)));
+                    return listValueAllocator.allocate(
+                            unorederedListTypeAllocator.allocate(TypeTagUtil.getBuiltinTypeByTag(listItemType)));
             }
         } else if (typeTag.equals(ATypeTag.ORDEREDLIST)) {
             ATypeTag listItemType = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(b[offset]);
@@ -109,8 +119,8 @@
                 if (listItemType.isDerivedType())
                     return allocateFieldValue(listItemType, b, offset + 1);
                 else
-                    return listValueAllocator.allocate(orederedListTypeAllocator.allocate(TypeTagUtil
-                            .getBuiltinTypeByTag(listItemType)));
+                    return listValueAllocator.allocate(
+                            orederedListTypeAllocator.allocate(TypeTagUtil.getBuiltinTypeByTag(listItemType)));
             }
         } else
             return flatValueAllocator.allocate(null);
diff --git a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/pointables/cast/ACastVisitor.java b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/pointables/cast/ACastVisitor.java
index 00c4075..e57fa06 100644
--- a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/pointables/cast/ACastVisitor.java
+++ b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/pointables/cast/ACastVisitor.java
@@ -110,8 +110,8 @@
             arg.first.set(accessor);
             return null;
         }
-        ATypeTag inputTypeTag =
-                EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(accessor.getByteArray()[accessor.getStartOffset()]);
+        ATypeTag inputTypeTag = EnumDeserializer.ATYPETAGDESERIALIZER
+                .deserialize(accessor.getByteArray()[accessor.getStartOffset()]);
         if (!needPromote(inputTypeTag, reqTypeTag)) {
             arg.first.set(accessor);
         } else {
diff --git a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/pointables/cast/AListCaster.java b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/pointables/cast/AListCaster.java
index 5dee990..7115827 100644
--- a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/pointables/cast/AListCaster.java
+++ b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/pointables/cast/AListCaster.java
@@ -43,11 +43,9 @@
  * ACastVisitor.
  */
 class AListCaster {
-    // pointable allocator
-    private final PointableAllocator allocator = new PointableAllocator();
 
     // for storing the cast result
-    private final IVisitablePointable itemTempReference = allocator.allocateEmpty();
+    private final IVisitablePointable itemTempReference = PointableAllocator.allocateUnrestableEmpty();
     private final Triple<IVisitablePointable, IAType, Boolean> itemVisitorArg = new Triple<>(itemTempReference, null,
             null);
 
diff --git a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/pointables/cast/ARecordCaster.java b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/pointables/cast/ARecordCaster.java
index 0293fcf..7e1fe46 100644
--- a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/pointables/cast/ARecordCaster.java
+++ b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/pointables/cast/ARecordCaster.java
@@ -71,18 +71,18 @@
     private final DataOutputStream dos = new DataOutputStream(bos);
 
     private final RecordBuilder recBuilder = new RecordBuilder();
-    private final IVisitablePointable nullTypeTag = allocator.allocateEmpty();
-    private final IVisitablePointable missingTypeTag = allocator.allocateEmpty();
+    private final IVisitablePointable nullTypeTag = PointableAllocator.allocateUnrestableEmpty();
+    private final IVisitablePointable missingTypeTag = PointableAllocator.allocateUnrestableEmpty();
 
-    private final IBinaryComparator fieldNameComparator =
-            PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY).createBinaryComparator();
+    private final IBinaryComparator fieldNameComparator = PointableBinaryComparatorFactory
+            .of(UTF8StringPointable.FACTORY).createBinaryComparator();
 
     private final ByteArrayAccessibleOutputStream outputBos = new ByteArrayAccessibleOutputStream();
     private final DataOutputStream outputDos = new DataOutputStream(outputBos);
 
-    private final IVisitablePointable fieldTempReference = allocator.allocateEmpty();
-    private final Triple<IVisitablePointable, IAType, Boolean> nestedVisitorArg =
-            new Triple<>(fieldTempReference, null, null);
+    private final IVisitablePointable fieldTempReference = PointableAllocator.allocateUnrestableEmpty();
+    private final Triple<IVisitablePointable, IAType, Boolean> nestedVisitorArg = new Triple<>(fieldTempReference, null,
+            null);
 
     private int numInputFields = 0;
 
@@ -155,6 +155,7 @@
     private void loadRequiredType(ARecordType reqType) throws IOException {
         reqFieldNames.clear();
         reqFieldTypeTags.clear();
+        allocator.reset();
 
         cachedReqType = reqType;
         int numSchemaFields = reqType.getFieldTypes().length;