Removed some new byte[] creation and implemented explicit resource release

 - Removed some new byte[] creation (mainly for toByteArray() method)
 - Implemented Explicit resource release during a hash join
 - Refactorered Hash-join code to remove repetitive same condition check

Change-Id: I55195696a3db09c14b8debdd78f5f68d701b9129
Reviewed-on: https://asterix-gerrit.ics.uci.edu/1378
Sonar-Qube: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Ian Maxon <imaxon@apache.org>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
diff --git a/asterixdb/asterix-lang-aql/src/main/java/org/apache/asterix/lang/aql/util/AQLFormatPrintUtil.java b/asterixdb/asterix-lang-aql/src/main/java/org/apache/asterix/lang/aql/util/AQLFormatPrintUtil.java
index c97e642..bd5ea2d 100644
--- a/asterixdb/asterix-lang-aql/src/main/java/org/apache/asterix/lang/aql/util/AQLFormatPrintUtil.java
+++ b/asterixdb/asterix-lang-aql/src/main/java/org/apache/asterix/lang/aql/util/AQLFormatPrintUtil.java
@@ -50,7 +50,7 @@
             expr.accept(visitor, 0);
         }
         output.close();
-        return new String(bos.toByteArray());
+        return bos.toString();
     }
 
     public static String toSQLPPString(List<Statement> exprs) throws AsterixException {
@@ -61,6 +61,6 @@
             expr.accept(visitor, 0);
         }
         output.close();
-        return new String(bos.toByteArray());
+        return bos.toString();
     }
 }
diff --git a/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/util/SqlppAstPrintUtil.java b/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/util/SqlppAstPrintUtil.java
index f700439..0d840cd 100644
--- a/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/util/SqlppAstPrintUtil.java
+++ b/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/util/SqlppAstPrintUtil.java
@@ -96,7 +96,7 @@
             expr.accept(visitor, 0);
         }
         output.close();
-        return new String(bos.toByteArray());
+        return bos.toString();
     }
 
 }
diff --git a/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/util/SqlppFormatPrintUtil.java b/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/util/SqlppFormatPrintUtil.java
index ff0a8e1..2c8f582 100644
--- a/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/util/SqlppFormatPrintUtil.java
+++ b/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/util/SqlppFormatPrintUtil.java
@@ -90,7 +90,7 @@
             expr.accept(visitor, 0);
         }
         output.close();
-        return new String(bos.toByteArray());
+        return bos.toString();
     }
 
 }
diff --git a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/pointables/cast/ARecordCaster.java b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/pointables/cast/ARecordCaster.java
index 7e1fe46..d18b4d1 100644
--- a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/pointables/cast/ARecordCaster.java
+++ b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/pointables/cast/ARecordCaster.java
@@ -269,11 +269,8 @@
                         .deserialize(fieldType.getByteArray()[fieldType.getStartOffset()]);
                 ps.print(typeTag);
 
-                //collect the output message
-                byte[] output = fieldBos.toByteArray();
-
-                //throw the exception
-                throw new IllegalStateException("type mismatch: including an extra field " + new String(output));
+                //collect the output message and throw the exception
+                throw new IllegalStateException("type mismatch: including an extra field " + fieldBos.toString());
             }
         }
 
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/functions/ReplicationProtocol.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/functions/ReplicationProtocol.java
index 608e442..8a52529 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/functions/ReplicationProtocol.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/functions/ReplicationProtocol.java
@@ -19,7 +19,6 @@
 package org.apache.asterix.replication.functions;
 
 import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
@@ -30,6 +29,7 @@
 import org.apache.asterix.replication.management.NetworkingUtil;
 import org.apache.asterix.replication.storage.LSMComponentProperties;
 import org.apache.asterix.replication.storage.LSMIndexFileProperties;
+import org.apache.hyracks.data.std.util.ExtendedByteArrayOutputStream;
 
 public class ReplicationProtocol {
 
@@ -84,7 +84,7 @@
 
     public static ByteBuffer writeLSMComponentPropertiesRequest(LSMComponentProperties lsmCompProp, ByteBuffer buffer)
             throws IOException {
-        ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+        ExtendedByteArrayOutputStream outputStream = new ExtendedByteArrayOutputStream();
         try (DataOutputStream oos = new DataOutputStream(outputStream)) {
             lsmCompProp.serialize(oos);
             int requestSize = REPLICATION_REQUEST_HEADER_SIZE + oos.size();
@@ -95,7 +95,7 @@
             }
             buffer.putInt(ReplicationRequestType.LSM_COMPONENT_PROPERTIES.ordinal());
             buffer.putInt(oos.size());
-            buffer.put(outputStream.toByteArray());
+            buffer.put(outputStream.getByteArray(), 0, outputStream.getLength());
             buffer.flip();
             return buffer;
         }
@@ -132,7 +132,7 @@
 
     public static ByteBuffer writeFileReplicationRequest(ByteBuffer requestBuffer, LSMIndexFileProperties afp,
             ReplicationRequestType requestType) throws IOException {
-        ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+        ExtendedByteArrayOutputStream outputStream = new ExtendedByteArrayOutputStream();
         try (DataOutputStream oos = new DataOutputStream(outputStream)) {
             afp.serialize(oos);
             int requestSize = REPLICATION_REQUEST_HEADER_SIZE + oos.size();
@@ -143,7 +143,7 @@
             }
             requestBuffer.putInt(requestType.ordinal());
             requestBuffer.putInt(oos.size());
-            requestBuffer.put(outputStream.toByteArray());
+            requestBuffer.put(outputStream.getByteArray(), 0, outputStream.getLength());
             requestBuffer.flip();
             return requestBuffer;
         }
@@ -156,13 +156,13 @@
     }
 
     public static ByteBuffer writeReplicaEventRequest(ReplicaEvent event) throws IOException {
-        ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+        ExtendedByteArrayOutputStream outputStream = new ExtendedByteArrayOutputStream();
         try (DataOutputStream oos = new DataOutputStream(outputStream)) {
             event.serialize(oos);
             ByteBuffer buffer = ByteBuffer.allocate(REPLICATION_REQUEST_HEADER_SIZE + oos.size());
             buffer.putInt(ReplicationRequestType.REPLICA_EVENT.ordinal());
             buffer.putInt(oos.size());
-            buffer.put(outputStream.toByteArray());
+            buffer.put(outputStream.getByteArray(), 0, outputStream.getLength());
             buffer.flip();
             return buffer;
         }
@@ -177,7 +177,7 @@
 
     public static ByteBuffer writeGetReplicaFilesRequest(ByteBuffer buffer, ReplicaFilesRequest request)
             throws IOException {
-        ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+        ExtendedByteArrayOutputStream outputStream = new ExtendedByteArrayOutputStream();
         try (DataOutputStream oos = new DataOutputStream(outputStream)) {
             request.serialize(oos);
 
@@ -189,7 +189,7 @@
             }
             buffer.putInt(ReplicationRequestType.GET_REPLICA_FILES.ordinal());
             buffer.putInt(oos.size());
-            buffer.put(outputStream.toByteArray());
+            buffer.put(outputStream.getByteArray(), 0, outputStream.getLength());
             buffer.flip();
             return buffer;
         }
@@ -197,7 +197,7 @@
 
     public static ByteBuffer writeGetReplicaIndexFlushRequest(ByteBuffer buffer, ReplicaIndexFlushRequest request)
             throws IOException {
-        ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+        ExtendedByteArrayOutputStream outputStream = new ExtendedByteArrayOutputStream();
         try (DataOutputStream oos = new DataOutputStream(outputStream)) {
             request.serialize(oos);
             int requestSize = REPLICATION_REQUEST_HEADER_SIZE + oos.size();
@@ -208,7 +208,7 @@
             }
             buffer.putInt(ReplicationRequestType.FLUSH_INDEX.ordinal());
             buffer.putInt(oos.size());
-            buffer.put(outputStream.toByteArray());
+            buffer.put(outputStream.getByteArray(), 0, outputStream.getLength());
             buffer.flip();
             return buffer;
         }
diff --git a/hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/util/ExtendedByteArrayOutputStream.java b/hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/util/ExtendedByteArrayOutputStream.java
new file mode 100644
index 0000000..59c7786
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/util/ExtendedByteArrayOutputStream.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.data.std.util;
+
+import java.io.ByteArrayOutputStream;
+
+/**
+ * This is an extended class of ByteArrayOutputStream class that can return the current buffer array and its length.
+ * Use this class to avoid a new byte[] creation when using toArray() method.
+ */
+public class ExtendedByteArrayOutputStream extends ByteArrayOutputStream {
+
+    public ExtendedByteArrayOutputStream() {
+        super();
+    }
+
+    public ExtendedByteArrayOutputStream(int size) {
+        super(size);
+    }
+
+    public synchronized byte[] getByteArray() {
+        return buf;
+    }
+
+    /**
+     * Returns the current length of this stream (not capacity).
+     */
+    public synchronized int getLength() {
+        return count;
+    }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/DeallocatableFramePool.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/DeallocatableFramePool.java
index 626edba..4499e32c 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/DeallocatableFramePool.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/DeallocatableFramePool.java
@@ -117,7 +117,12 @@
 
     @Override
     public void close() {
-        buffers.clear();
+        for (Iterator<ByteBuffer> iter = buffers.iterator(); iter.hasNext();) {
+            ByteBuffer next = iter.next();
+            ctx.deallocateFrames(next.capacity());
+            iter.remove();
+        }
         allocated = 0;
+        buffers.clear();
     }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/VPartitionTupleBufferManager.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/VPartitionTupleBufferManager.java
index 1ed34f6..4d4f279 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/VPartitionTupleBufferManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/VPartitionTupleBufferManager.java
@@ -210,6 +210,11 @@
 
     @Override
     public void close() {
+        for (IFrameBufferManager part : partitionArray) {
+            if (part != null) {
+                part.close();
+            }
+        }
         framePool.close();
         Arrays.fill(partitionArray, null);
     }
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/InMemoryHashJoin.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/InMemoryHashJoin.java
index 0da9da4..ed7ae8e 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/InMemoryHashJoin.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/InMemoryHashJoin.java
@@ -57,6 +57,7 @@
     private final TuplePointer storedTuplePointer;
     private final boolean reverseOutputOrder; //Should we reverse the order of tuples, we are writing in output
     private final IPredicateEvaluator predEvaluator;
+    private final boolean isTableCapacityNotZero;
 
     private static final Logger LOGGER = Logger.getLogger(InMemoryHashJoin.class.getName());
 
@@ -97,6 +98,11 @@
             missingTupleBuild = null;
         }
         reverseOutputOrder = reverse;
+        if (tableSize != 0) {
+            isTableCapacityNotZero = true;
+        } else {
+            isTableCapacityNotZero = false;
+        }
         LOGGER.fine("InMemoryHashJoin has been created for a table size of " + tableSize + " for Thread ID "
                 + Thread.currentThread().getId() + ".");
     }
@@ -113,17 +119,17 @@
         }
     }
 
-    void join(IFrameTupleAccessor accessorProbe, int tid, IFrameWriter writer) throws HyracksDataException {
-        this.accessorProbe = accessorProbe;
+    /**
+     * Reads the given tuple from the probe side and joins it with tuples from the build side.
+     * This method assumes that the accessorProbe is already set to the current probe frame.
+     */
+    void join(int tid, IFrameWriter writer) throws HyracksDataException {
         boolean matchFound = false;
-        if (tableSize != 0) {
+        if (isTableCapacityNotZero) {
             int entry = tpcProbe.partition(accessorProbe, tid, tableSize);
-            int offset = 0;
-            do {
-                table.getTuplePointer(entry, offset++, storedTuplePointer);
-                if (storedTuplePointer.getFrameIndex() < 0) {
-                    break;
-                }
+            int tupleCount = table.getTupleCount(entry);
+            for (int i = 0; i < tupleCount; i++) {
+                table.getTuplePointer(entry, i, storedTuplePointer);
                 int bIndex = storedTuplePointer.getFrameIndex();
                 int tIndex = storedTuplePointer.getTupleIndex();
                 accessorBuild.reset(buffers.get(bIndex));
@@ -135,7 +141,7 @@
                         appendToResult(tid, tIndex, writer);
                     }
                 }
-            } while (true);
+            }
         }
         if (!matchFound && isLeftOuter) {
             FrameUtils.appendConcatToWriter(writer, appender, accessorProbe, tid,
@@ -148,19 +154,31 @@
         accessorProbe.reset(buffer);
         int tupleCount0 = accessorProbe.getTupleCount();
         for (int i = 0; i < tupleCount0; ++i) {
-            join(accessorProbe, i, writer);
+            join(i, writer);
         }
     }
 
+    public void resetAccessorProbe(IFrameTupleAccessor newAccessorProbe) {
+        accessorProbe.reset(newAccessorProbe.getBuffer());
+    }
+
     public void closeJoin(IFrameWriter writer) throws HyracksDataException {
         appender.write(writer, true);
         int nFrames = buffers.size();
+        int totalSize = 0;
+        for (int i = 0; i < nFrames; i++) {
+            totalSize += buffers.get(i).capacity();
+        }
         buffers.clear();
-        ctx.deallocateFrames(nFrames);
+        ctx.deallocateFrames(totalSize);
         LOGGER.fine("InMemoryHashJoin has finished using " + nFrames + " frames for Thread ID "
                 + Thread.currentThread().getId() + ".");
     }
 
+    public void closeTable() throws HyracksDataException {
+        table.close();
+    }
+
     private boolean evaluatePredicate(int tIx1, int tIx2) {
         if (reverseOutputOrder) { //Role Reversal Optimization is triggered
             return (predEvaluator == null) || predEvaluator.evaluate(accessorBuild, tIx2, accessorProbe, tIx1);
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoin.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoin.java
index b80059b..0770784 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoin.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoin.java
@@ -20,7 +20,6 @@
 
 import java.nio.ByteBuffer;
 import java.util.BitSet;
-import java.util.logging.Logger;
 
 import org.apache.commons.io.FileUtils;
 import org.apache.hyracks.api.comm.IFrame;
@@ -361,6 +360,7 @@
             inMemJoiner.join(buffer, writer);
             return;
         }
+        inMemJoiner.resetAccessorProbe(accessorProbe);
         for (int i = 0; i < tupleCount; ++i) {
             int pid = probeHpc.partition(accessorProbe, i, numOfPartitions);
 
@@ -380,7 +380,7 @@
                         bufferManager.clearPartition(victim);
                     }
                 } else { //pid is Resident
-                    inMemJoiner.join(accessorProbe, i, writer);
+                    inMemJoiner.join(i, writer);
                 }
                 probePSizeInTups[pid]++;
             }
@@ -405,9 +405,13 @@
     }
 
     public void closeProbe(IFrameWriter writer) throws HyracksDataException {
-        //We do NOT join the spilled partitions here, that decision is made at the descriptor level (which join technique to use)
+        //We do NOT join the spilled partitions here, that decision is made at the descriptor level
+        //(which join technique to use)
         inMemJoiner.closeJoin(writer);
+        inMemJoiner.closeTable();
         closeAllSpilledPartitions(SIDE.PROBE);
+        bufferManager.close();
+        inMemJoiner = null;
         bufferManager = null;
     }
 
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java
index 183d7f6..e308dd8 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java
@@ -667,6 +667,7 @@
                     }
                     pReader.close();
                     joiner.closeJoin(writer);
+                    joiner.closeTable();
                 }
 
                 private void applyNestedLoopJoin(RecordDescriptor outerRd, RecordDescriptor innerRd, int memorySize,
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/structures/SerializableHashTable.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/structures/SerializableHashTable.java
index b42cdb7..9584f26 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/structures/SerializableHashTable.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/structures/SerializableHashTable.java
@@ -173,13 +173,18 @@
     @Override
     public void close() {
         int nFrames = contents.size();
-        for (int i = 0; i < headers.length; i++)
-            headers[i] = null;
+        int hFrames = 0;
+        for (int i = 0; i < headers.length; i++) {
+            if (headers[i] != null) {
+                hFrames++;
+                headers[i] = null;
+            }
+        }
         contents.clear();
         frameCurrentIndex.clear();
         tupleCount = 0;
         currentLargestFrameIndex = 0;
-        ctx.deallocateFrames(nFrames);
+        ctx.deallocateFrames((nFrames + hFrames) * frameCapacity * 4);
     }
 
     private void insertNewEntry(IntSerDeBuffer header, int headerOffset, int entryCapacity, TuplePointer pointer)