[NO ISSUE][STO] Add concurrency control for filters
- user model changes: no
- storage format changes: no
- interface changes: no
Details:
Memory components' filters are mutable and will be updated upon writes.
However, previously there is no any concurrency control for concurrent
updates of filters. This patch adds a read write lock to address this
problem.
Change-Id: I77eb348b84447dc552841f23b3a922a4316fa305
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/5363
Contrib: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Luo Chen <cluo8@uci.edu>
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMComponentFilter.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMComponentFilter.java
index 6ccb114..12d92a8 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMComponentFilter.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMComponentFilter.java
@@ -19,6 +19,8 @@
package org.apache.hyracks.storage.am.lsm.common.impls;
import java.nio.ByteBuffer;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
import org.apache.hyracks.api.exceptions.HyracksDataException;
@@ -31,6 +33,7 @@
public class LSMComponentFilter implements ILSMComponentFilter {
+ private final ReadWriteLock lock = new ReentrantReadWriteLock();
private final IBinaryComparatorFactory[] filterCmpFactories;
private final ITreeIndexTupleWriter tupleWriter;
@@ -55,98 +58,123 @@
@Override
public void reset() {
- minTuple = null;
- maxTuple = null;
- minTupleBytes = null;
- maxTupleBytes = null;
- minTupleBuf = null;
- maxTupleBuf = null;
+ lock.writeLock().lock();
+ try {
+ minTuple = null;
+ maxTuple = null;
+ minTupleBytes = null;
+ maxTupleBytes = null;
+ minTupleBuf = null;
+ maxTupleBuf = null;
+ } finally {
+ lock.writeLock().unlock();
+ }
}
@Override
public void update(ITupleReference tuple, MultiComparator cmp, IExtendedModificationOperationCallback opCallback)
throws HyracksDataException {
- boolean logged = false;
- if (minTuple == null) {
- int numBytes = tupleWriter.bytesRequired(tuple);
- minTupleBytes = new byte[numBytes];
- opCallback.after(tuple);
- logged = true;
- tupleWriter.writeTuple(tuple, minTupleBytes, 0);
- minTupleBuf = ByteBuffer.wrap(minTupleBytes);
- minTuple = tupleWriter.createTupleReference();
- ((ITreeIndexTupleReference) minTuple).resetByTupleOffset(minTupleBuf.array(), 0);
- } else {
- int c = cmp.compare(tuple, minTuple);
- if (c < 0) {
+ lock.writeLock().lock();
+ try {
+ boolean logged = false;
+ if (minTuple == null) {
+ int numBytes = tupleWriter.bytesRequired(tuple);
+ minTupleBytes = new byte[numBytes];
opCallback.after(tuple);
logged = true;
- int numBytes = tupleWriter.bytesRequired(tuple);
- if (minTupleBytes.length < numBytes) {
- minTupleBytes = new byte[numBytes];
- tupleWriter.writeTuple(tuple, minTupleBytes, 0);
- minTupleBuf = ByteBuffer.wrap(minTupleBytes);
- } else {
- tupleWriter.writeTuple(tuple, minTupleBytes, 0);
- }
+ tupleWriter.writeTuple(tuple, minTupleBytes, 0);
+ minTupleBuf = ByteBuffer.wrap(minTupleBytes);
+ minTuple = tupleWriter.createTupleReference();
((ITreeIndexTupleReference) minTuple).resetByTupleOffset(minTupleBuf.array(), 0);
+ } else {
+ int c = cmp.compare(tuple, minTuple);
+ if (c < 0) {
+ opCallback.after(tuple);
+ logged = true;
+ int numBytes = tupleWriter.bytesRequired(tuple);
+ if (minTupleBytes.length < numBytes) {
+ minTupleBytes = new byte[numBytes];
+ tupleWriter.writeTuple(tuple, minTupleBytes, 0);
+ minTupleBuf = ByteBuffer.wrap(minTupleBytes);
+ } else {
+ tupleWriter.writeTuple(tuple, minTupleBytes, 0);
+ }
+ ((ITreeIndexTupleReference) minTuple).resetByTupleOffset(minTupleBuf.array(), 0);
+ }
}
- }
- if (maxTuple == null) {
- int numBytes = tupleWriter.bytesRequired(tuple);
- maxTupleBytes = new byte[numBytes];
- if (!logged) {
- opCallback.after(tuple);
- }
- tupleWriter.writeTuple(tuple, maxTupleBytes, 0);
- maxTupleBuf = ByteBuffer.wrap(maxTupleBytes);
- maxTuple = tupleWriter.createTupleReference();
- ((ITreeIndexTupleReference) maxTuple).resetByTupleOffset(maxTupleBuf.array(), 0);
- } else {
- int c = cmp.compare(tuple, maxTuple);
- if (c > 0) {
+ if (maxTuple == null) {
+ int numBytes = tupleWriter.bytesRequired(tuple);
+ maxTupleBytes = new byte[numBytes];
if (!logged) {
opCallback.after(tuple);
}
- int numBytes = tupleWriter.bytesRequired(tuple);
- if (maxTupleBytes.length < numBytes) {
- maxTupleBytes = new byte[numBytes];
- tupleWriter.writeTuple(tuple, maxTupleBytes, 0);
- maxTupleBuf = ByteBuffer.wrap(maxTupleBytes);
- } else {
- tupleWriter.writeTuple(tuple, maxTupleBytes, 0);
- }
+ tupleWriter.writeTuple(tuple, maxTupleBytes, 0);
+ maxTupleBuf = ByteBuffer.wrap(maxTupleBytes);
+ maxTuple = tupleWriter.createTupleReference();
((ITreeIndexTupleReference) maxTuple).resetByTupleOffset(maxTupleBuf.array(), 0);
+ } else {
+ int c = cmp.compare(tuple, maxTuple);
+ if (c > 0) {
+ if (!logged) {
+ opCallback.after(tuple);
+ }
+ int numBytes = tupleWriter.bytesRequired(tuple);
+ if (maxTupleBytes.length < numBytes) {
+ maxTupleBytes = new byte[numBytes];
+ tupleWriter.writeTuple(tuple, maxTupleBytes, 0);
+ maxTupleBuf = ByteBuffer.wrap(maxTupleBytes);
+ } else {
+ tupleWriter.writeTuple(tuple, maxTupleBytes, 0);
+ }
+ ((ITreeIndexTupleReference) maxTuple).resetByTupleOffset(maxTupleBuf.array(), 0);
+ }
}
+ } finally {
+ lock.writeLock().unlock();
}
}
@Override
public ITupleReference getMinTuple() {
- return minTuple;
+ lock.readLock().lock();
+ try {
+ return minTuple;
+ } finally {
+ lock.readLock().unlock();
+ }
}
@Override
public ITupleReference getMaxTuple() {
- return maxTuple;
+ lock.readLock().lock();
+ try {
+ return maxTuple;
+ } finally {
+ lock.readLock().unlock();
+ }
}
@Override
public boolean satisfy(ITupleReference minTuple, ITupleReference maxTuple, MultiComparator filterCmp)
throws HyracksDataException {
- if (maxTuple != null && this.minTuple != null) {
- int c = filterCmp.compare(maxTuple, this.minTuple);
- if (c < 0) {
- return false;
+ lock.readLock().lock();
+ try {
+ if (maxTuple != null && this.minTuple != null) {
+ int c = filterCmp.compare(maxTuple, this.minTuple);
+ if (c < 0) {
+ return false;
+ }
}
- }
- if (minTuple != null && this.maxTuple != null) {
- int c = filterCmp.compare(minTuple, this.maxTuple);
- if (c > 0) {
- return false;
+ if (minTuple != null && this.maxTuple != null) {
+ int c = filterCmp.compare(minTuple, this.maxTuple);
+ if (c > 0) {
+ return false;
+ }
}
+ return true;
+ } finally {
+ lock.readLock().unlock();
}
- return true;
}
}