Removed some new byte[] creation and implemented explicit resource release
- Removed some new byte[] creation (mainly for toByteArray() method)
- Implemented Explicit resource release during a hash join
- Refactorered Hash-join code to remove repetitive same condition check
Change-Id: I55195696a3db09c14b8debdd78f5f68d701b9129
Reviewed-on: https://asterix-gerrit.ics.uci.edu/1378
Sonar-Qube: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Ian Maxon <imaxon@apache.org>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
diff --git a/asterixdb/asterix-lang-aql/src/main/java/org/apache/asterix/lang/aql/util/AQLFormatPrintUtil.java b/asterixdb/asterix-lang-aql/src/main/java/org/apache/asterix/lang/aql/util/AQLFormatPrintUtil.java
index c97e642..bd5ea2d 100644
--- a/asterixdb/asterix-lang-aql/src/main/java/org/apache/asterix/lang/aql/util/AQLFormatPrintUtil.java
+++ b/asterixdb/asterix-lang-aql/src/main/java/org/apache/asterix/lang/aql/util/AQLFormatPrintUtil.java
@@ -50,7 +50,7 @@
expr.accept(visitor, 0);
}
output.close();
- return new String(bos.toByteArray());
+ return bos.toString();
}
public static String toSQLPPString(List<Statement> exprs) throws AsterixException {
@@ -61,6 +61,6 @@
expr.accept(visitor, 0);
}
output.close();
- return new String(bos.toByteArray());
+ return bos.toString();
}
}
diff --git a/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/util/SqlppAstPrintUtil.java b/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/util/SqlppAstPrintUtil.java
index f700439..0d840cd 100644
--- a/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/util/SqlppAstPrintUtil.java
+++ b/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/util/SqlppAstPrintUtil.java
@@ -96,7 +96,7 @@
expr.accept(visitor, 0);
}
output.close();
- return new String(bos.toByteArray());
+ return bos.toString();
}
}
diff --git a/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/util/SqlppFormatPrintUtil.java b/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/util/SqlppFormatPrintUtil.java
index ff0a8e1..2c8f582 100644
--- a/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/util/SqlppFormatPrintUtil.java
+++ b/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/util/SqlppFormatPrintUtil.java
@@ -90,7 +90,7 @@
expr.accept(visitor, 0);
}
output.close();
- return new String(bos.toByteArray());
+ return bos.toString();
}
}
diff --git a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/pointables/cast/ARecordCaster.java b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/pointables/cast/ARecordCaster.java
index 7e1fe46..d18b4d1 100644
--- a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/pointables/cast/ARecordCaster.java
+++ b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/pointables/cast/ARecordCaster.java
@@ -269,11 +269,8 @@
.deserialize(fieldType.getByteArray()[fieldType.getStartOffset()]);
ps.print(typeTag);
- //collect the output message
- byte[] output = fieldBos.toByteArray();
-
- //throw the exception
- throw new IllegalStateException("type mismatch: including an extra field " + new String(output));
+ //collect the output message and throw the exception
+ throw new IllegalStateException("type mismatch: including an extra field " + fieldBos.toString());
}
}
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/functions/ReplicationProtocol.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/functions/ReplicationProtocol.java
index 608e442..8a52529 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/functions/ReplicationProtocol.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/functions/ReplicationProtocol.java
@@ -19,7 +19,6 @@
package org.apache.asterix.replication.functions;
import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
@@ -30,6 +29,7 @@
import org.apache.asterix.replication.management.NetworkingUtil;
import org.apache.asterix.replication.storage.LSMComponentProperties;
import org.apache.asterix.replication.storage.LSMIndexFileProperties;
+import org.apache.hyracks.data.std.util.ExtendedByteArrayOutputStream;
public class ReplicationProtocol {
@@ -84,7 +84,7 @@
public static ByteBuffer writeLSMComponentPropertiesRequest(LSMComponentProperties lsmCompProp, ByteBuffer buffer)
throws IOException {
- ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+ ExtendedByteArrayOutputStream outputStream = new ExtendedByteArrayOutputStream();
try (DataOutputStream oos = new DataOutputStream(outputStream)) {
lsmCompProp.serialize(oos);
int requestSize = REPLICATION_REQUEST_HEADER_SIZE + oos.size();
@@ -95,7 +95,7 @@
}
buffer.putInt(ReplicationRequestType.LSM_COMPONENT_PROPERTIES.ordinal());
buffer.putInt(oos.size());
- buffer.put(outputStream.toByteArray());
+ buffer.put(outputStream.getByteArray(), 0, outputStream.getLength());
buffer.flip();
return buffer;
}
@@ -132,7 +132,7 @@
public static ByteBuffer writeFileReplicationRequest(ByteBuffer requestBuffer, LSMIndexFileProperties afp,
ReplicationRequestType requestType) throws IOException {
- ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+ ExtendedByteArrayOutputStream outputStream = new ExtendedByteArrayOutputStream();
try (DataOutputStream oos = new DataOutputStream(outputStream)) {
afp.serialize(oos);
int requestSize = REPLICATION_REQUEST_HEADER_SIZE + oos.size();
@@ -143,7 +143,7 @@
}
requestBuffer.putInt(requestType.ordinal());
requestBuffer.putInt(oos.size());
- requestBuffer.put(outputStream.toByteArray());
+ requestBuffer.put(outputStream.getByteArray(), 0, outputStream.getLength());
requestBuffer.flip();
return requestBuffer;
}
@@ -156,13 +156,13 @@
}
public static ByteBuffer writeReplicaEventRequest(ReplicaEvent event) throws IOException {
- ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+ ExtendedByteArrayOutputStream outputStream = new ExtendedByteArrayOutputStream();
try (DataOutputStream oos = new DataOutputStream(outputStream)) {
event.serialize(oos);
ByteBuffer buffer = ByteBuffer.allocate(REPLICATION_REQUEST_HEADER_SIZE + oos.size());
buffer.putInt(ReplicationRequestType.REPLICA_EVENT.ordinal());
buffer.putInt(oos.size());
- buffer.put(outputStream.toByteArray());
+ buffer.put(outputStream.getByteArray(), 0, outputStream.getLength());
buffer.flip();
return buffer;
}
@@ -177,7 +177,7 @@
public static ByteBuffer writeGetReplicaFilesRequest(ByteBuffer buffer, ReplicaFilesRequest request)
throws IOException {
- ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+ ExtendedByteArrayOutputStream outputStream = new ExtendedByteArrayOutputStream();
try (DataOutputStream oos = new DataOutputStream(outputStream)) {
request.serialize(oos);
@@ -189,7 +189,7 @@
}
buffer.putInt(ReplicationRequestType.GET_REPLICA_FILES.ordinal());
buffer.putInt(oos.size());
- buffer.put(outputStream.toByteArray());
+ buffer.put(outputStream.getByteArray(), 0, outputStream.getLength());
buffer.flip();
return buffer;
}
@@ -197,7 +197,7 @@
public static ByteBuffer writeGetReplicaIndexFlushRequest(ByteBuffer buffer, ReplicaIndexFlushRequest request)
throws IOException {
- ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+ ExtendedByteArrayOutputStream outputStream = new ExtendedByteArrayOutputStream();
try (DataOutputStream oos = new DataOutputStream(outputStream)) {
request.serialize(oos);
int requestSize = REPLICATION_REQUEST_HEADER_SIZE + oos.size();
@@ -208,7 +208,7 @@
}
buffer.putInt(ReplicationRequestType.FLUSH_INDEX.ordinal());
buffer.putInt(oos.size());
- buffer.put(outputStream.toByteArray());
+ buffer.put(outputStream.getByteArray(), 0, outputStream.getLength());
buffer.flip();
return buffer;
}
diff --git a/hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/util/ExtendedByteArrayOutputStream.java b/hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/util/ExtendedByteArrayOutputStream.java
new file mode 100644
index 0000000..59c7786
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/util/ExtendedByteArrayOutputStream.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.data.std.util;
+
+import java.io.ByteArrayOutputStream;
+
+/**
+ * This is an extended class of ByteArrayOutputStream class that can return the current buffer array and its length.
+ * Use this class to avoid a new byte[] creation when using toArray() method.
+ */
+public class ExtendedByteArrayOutputStream extends ByteArrayOutputStream {
+
+ public ExtendedByteArrayOutputStream() {
+ super();
+ }
+
+ public ExtendedByteArrayOutputStream(int size) {
+ super(size);
+ }
+
+ public synchronized byte[] getByteArray() {
+ return buf;
+ }
+
+ /**
+ * Returns the current length of this stream (not capacity).
+ */
+ public synchronized int getLength() {
+ return count;
+ }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/DeallocatableFramePool.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/DeallocatableFramePool.java
index 626edba..4499e32c 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/DeallocatableFramePool.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/DeallocatableFramePool.java
@@ -117,7 +117,12 @@
@Override
public void close() {
- buffers.clear();
+ for (Iterator<ByteBuffer> iter = buffers.iterator(); iter.hasNext();) {
+ ByteBuffer next = iter.next();
+ ctx.deallocateFrames(next.capacity());
+ iter.remove();
+ }
allocated = 0;
+ buffers.clear();
}
}
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/VPartitionTupleBufferManager.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/VPartitionTupleBufferManager.java
index 1ed34f6..4d4f279 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/VPartitionTupleBufferManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/VPartitionTupleBufferManager.java
@@ -210,6 +210,11 @@
@Override
public void close() {
+ for (IFrameBufferManager part : partitionArray) {
+ if (part != null) {
+ part.close();
+ }
+ }
framePool.close();
Arrays.fill(partitionArray, null);
}
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/InMemoryHashJoin.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/InMemoryHashJoin.java
index 0da9da4..ed7ae8e 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/InMemoryHashJoin.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/InMemoryHashJoin.java
@@ -57,6 +57,7 @@
private final TuplePointer storedTuplePointer;
private final boolean reverseOutputOrder; //Should we reverse the order of tuples, we are writing in output
private final IPredicateEvaluator predEvaluator;
+ private final boolean isTableCapacityNotZero;
private static final Logger LOGGER = Logger.getLogger(InMemoryHashJoin.class.getName());
@@ -97,6 +98,11 @@
missingTupleBuild = null;
}
reverseOutputOrder = reverse;
+ if (tableSize != 0) {
+ isTableCapacityNotZero = true;
+ } else {
+ isTableCapacityNotZero = false;
+ }
LOGGER.fine("InMemoryHashJoin has been created for a table size of " + tableSize + " for Thread ID "
+ Thread.currentThread().getId() + ".");
}
@@ -113,17 +119,17 @@
}
}
- void join(IFrameTupleAccessor accessorProbe, int tid, IFrameWriter writer) throws HyracksDataException {
- this.accessorProbe = accessorProbe;
+ /**
+ * Reads the given tuple from the probe side and joins it with tuples from the build side.
+ * This method assumes that the accessorProbe is already set to the current probe frame.
+ */
+ void join(int tid, IFrameWriter writer) throws HyracksDataException {
boolean matchFound = false;
- if (tableSize != 0) {
+ if (isTableCapacityNotZero) {
int entry = tpcProbe.partition(accessorProbe, tid, tableSize);
- int offset = 0;
- do {
- table.getTuplePointer(entry, offset++, storedTuplePointer);
- if (storedTuplePointer.getFrameIndex() < 0) {
- break;
- }
+ int tupleCount = table.getTupleCount(entry);
+ for (int i = 0; i < tupleCount; i++) {
+ table.getTuplePointer(entry, i, storedTuplePointer);
int bIndex = storedTuplePointer.getFrameIndex();
int tIndex = storedTuplePointer.getTupleIndex();
accessorBuild.reset(buffers.get(bIndex));
@@ -135,7 +141,7 @@
appendToResult(tid, tIndex, writer);
}
}
- } while (true);
+ }
}
if (!matchFound && isLeftOuter) {
FrameUtils.appendConcatToWriter(writer, appender, accessorProbe, tid,
@@ -148,19 +154,31 @@
accessorProbe.reset(buffer);
int tupleCount0 = accessorProbe.getTupleCount();
for (int i = 0; i < tupleCount0; ++i) {
- join(accessorProbe, i, writer);
+ join(i, writer);
}
}
+ public void resetAccessorProbe(IFrameTupleAccessor newAccessorProbe) {
+ accessorProbe.reset(newAccessorProbe.getBuffer());
+ }
+
public void closeJoin(IFrameWriter writer) throws HyracksDataException {
appender.write(writer, true);
int nFrames = buffers.size();
+ int totalSize = 0;
+ for (int i = 0; i < nFrames; i++) {
+ totalSize += buffers.get(i).capacity();
+ }
buffers.clear();
- ctx.deallocateFrames(nFrames);
+ ctx.deallocateFrames(totalSize);
LOGGER.fine("InMemoryHashJoin has finished using " + nFrames + " frames for Thread ID "
+ Thread.currentThread().getId() + ".");
}
+ public void closeTable() throws HyracksDataException {
+ table.close();
+ }
+
private boolean evaluatePredicate(int tIx1, int tIx2) {
if (reverseOutputOrder) { //Role Reversal Optimization is triggered
return (predEvaluator == null) || predEvaluator.evaluate(accessorBuild, tIx2, accessorProbe, tIx1);
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoin.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoin.java
index b80059b..0770784 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoin.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoin.java
@@ -20,7 +20,6 @@
import java.nio.ByteBuffer;
import java.util.BitSet;
-import java.util.logging.Logger;
import org.apache.commons.io.FileUtils;
import org.apache.hyracks.api.comm.IFrame;
@@ -361,6 +360,7 @@
inMemJoiner.join(buffer, writer);
return;
}
+ inMemJoiner.resetAccessorProbe(accessorProbe);
for (int i = 0; i < tupleCount; ++i) {
int pid = probeHpc.partition(accessorProbe, i, numOfPartitions);
@@ -380,7 +380,7 @@
bufferManager.clearPartition(victim);
}
} else { //pid is Resident
- inMemJoiner.join(accessorProbe, i, writer);
+ inMemJoiner.join(i, writer);
}
probePSizeInTups[pid]++;
}
@@ -405,9 +405,13 @@
}
public void closeProbe(IFrameWriter writer) throws HyracksDataException {
- //We do NOT join the spilled partitions here, that decision is made at the descriptor level (which join technique to use)
+ //We do NOT join the spilled partitions here, that decision is made at the descriptor level
+ //(which join technique to use)
inMemJoiner.closeJoin(writer);
+ inMemJoiner.closeTable();
closeAllSpilledPartitions(SIDE.PROBE);
+ bufferManager.close();
+ inMemJoiner = null;
bufferManager = null;
}
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java
index 183d7f6..e308dd8 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java
@@ -667,6 +667,7 @@
}
pReader.close();
joiner.closeJoin(writer);
+ joiner.closeTable();
}
private void applyNestedLoopJoin(RecordDescriptor outerRd, RecordDescriptor innerRd, int memorySize,
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/structures/SerializableHashTable.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/structures/SerializableHashTable.java
index b42cdb7..9584f26 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/structures/SerializableHashTable.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/structures/SerializableHashTable.java
@@ -173,13 +173,18 @@
@Override
public void close() {
int nFrames = contents.size();
- for (int i = 0; i < headers.length; i++)
- headers[i] = null;
+ int hFrames = 0;
+ for (int i = 0; i < headers.length; i++) {
+ if (headers[i] != null) {
+ hFrames++;
+ headers[i] = null;
+ }
+ }
contents.clear();
frameCurrentIndex.clear();
tupleCount = 0;
currentLargestFrameIndex = 0;
- ctx.deallocateFrames(nFrames);
+ ctx.deallocateFrames((nFrames + hFrames) * frameCapacity * 4);
}
private void insertNewEntry(IntSerDeBuffer header, int headerOffset, int entryCapacity, TuplePointer pointer)