[NO ISSUE][STO] Misc fixes in storage
- user model changes: no
- storage format changes: no
- interface changes: no
Details:
- Blocking IO callback used to wait for any notification on the
callback before returning. The behaviour was fixed to only return
if the completion flag was set on afterFinalize.
- Reading and writing to and from memory component's didn't do any
locking and so, this could read to concurrency issues.
- Reading metadata values used to rely on pointables which can be
problematic because then the caller will need to latch/pin the
page correctly. To avoid this, readers of metadata pages will
always take a copy of the metadata.
Change-Id: I4bdc4c16a9c126d311378e56651632bbb4a50864
Reviewed-on: https://asterix-gerrit.ics.uci.edu/2548
Sonar-Qube: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: abdullah alamoudi <bamousaa@gmail.com>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Contrib: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/FlushMetadataOnlyTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/FlushMetadataOnlyTest.java
index f9421a1..1251d91 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/FlushMetadataOnlyTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/FlushMetadataOnlyTest.java
@@ -28,7 +28,7 @@
import org.apache.asterix.test.common.TestHelper;
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.job.JobId;
-import org.apache.hyracks.data.std.primitive.VoidPointable;
+import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
import org.apache.hyracks.data.std.util.DataUtils;
import org.apache.hyracks.storage.am.common.api.IIndexDataflowHelper;
import org.apache.hyracks.storage.am.common.dataflow.IndexDataflowHelperFactory;
@@ -106,7 +106,7 @@
StorageTestUtils.flush(dsLifecycleMgr, lsmBtree, false);
// assert one disk component
Assert.assertEquals(1, lsmBtree.getDiskComponents().size());
- VoidPointable pointable = VoidPointable.FACTORY.createPointable();
+ ArrayBackedValueStorage pointable = new ArrayBackedValueStorage();
ComponentUtils.get(lsmBtree, key, pointable);
Assert.assertTrue(DataUtils.equals(pointable, value));
// ensure that we can search this component
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/LogMarkerTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/LogMarkerTest.java
index 0a968c8..2121327 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/LogMarkerTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/LogMarkerTest.java
@@ -56,6 +56,7 @@
import org.apache.hyracks.api.test.FrameWriterTestUtils.FrameWriterOperation;
import org.apache.hyracks.api.util.HyracksConstants;
import org.apache.hyracks.data.std.primitive.LongPointable;
+import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
import org.apache.hyracks.dataflow.common.io.MessagingFrameTupleAppender;
@@ -156,9 +157,10 @@
iHelperFactory.create(ctx.getJobletContext().getServiceContext(), 0);
dataflowHelper.open();
LSMBTree btree = (LSMBTree) dataflowHelper.getIndexInstance();
- LongPointable longPointable = LongPointable.FACTORY.createPointable();
- ComponentUtils.get(btree, ComponentUtils.MARKER_LSN_KEY, longPointable);
- long lsn = longPointable.getLong();
+ ArrayBackedValueStorage buffer = new ArrayBackedValueStorage();
+
+ ComponentUtils.get(btree, ComponentUtils.MARKER_LSN_KEY, buffer);
+ long lsn = LongPointable.getLong(buffer.getByteArray(), buffer.getStartOffset());
int numOfMarkers = 0;
LogReader logReader = (LogReader) nc.getTransactionSubsystem().getLogManager().getLogReader(false);
long expectedMarkerId = markerId - 1;
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/AbstractLSMIOOperationCallback.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/AbstractLSMIOOperationCallback.java
index b9f0cc7..f027979 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/AbstractLSMIOOperationCallback.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/AbstractLSMIOOperationCallback.java
@@ -28,6 +28,7 @@
import org.apache.asterix.common.storage.ResourceReference;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.data.std.primitive.LongPointable;
+import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
import org.apache.hyracks.storage.am.common.api.IMetadataPageManager;
import org.apache.hyracks.storage.am.common.freepage.MutableArrayValueReference;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent;
@@ -68,6 +69,7 @@
protected ILSMComponentId[] nextComponentIds;
protected final ILSMComponentIdGenerator idGenerator;
+ protected final ArrayBackedValueStorage buffer = new ArrayBackedValueStorage(Long.BYTES);
private final IIndexCheckpointManagerProvider indexCheckpointManagerProvider;
private final Map<ILSMComponentId, Long> componentLsnMap = new HashMap<>();
@@ -128,7 +130,7 @@
}
LongPointable markerLsn = LongPointable.FACTORY
.createPointable(ComponentUtils.getLong(opCtx.getComponentsToBeMerged().get(0).getMetadata(),
- ComponentUtils.MARKER_LSN_KEY, ComponentUtils.NOT_FOUND));
+ ComponentUtils.MARKER_LSN_KEY, ComponentUtils.NOT_FOUND, buffer));
opCtx.getNewComponent().getMetadata().put(ComponentUtils.MARKER_LSN_KEY, markerLsn);
} else if (opCtx.getIoOperationType() == LSMIOOperationType.FLUSH) {
// advance memory component indexes
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/PrimaryIndexLogMarkerCallback.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/PrimaryIndexLogMarkerCallback.java
index abe474b..a2ab15f 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/PrimaryIndexLogMarkerCallback.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/PrimaryIndexLogMarkerCallback.java
@@ -23,6 +23,7 @@
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.data.std.primitive.LongPointable;
+import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMMemoryComponent;
@@ -34,6 +35,7 @@
public class PrimaryIndexLogMarkerCallback implements ILogMarkerCallback {
private final LongPointable pointable = LongPointable.FACTORY.createPointable();
private final ILSMIndex index;
+ private final ArrayBackedValueStorage buffer = new ArrayBackedValueStorage(Long.BYTES);
/**
* @param index:
@@ -53,7 +55,7 @@
long lsn;
try {
lsn = ComponentUtils.getLong(index.getCurrentMemoryComponent().getMetadata(), ComponentUtils.MARKER_LSN_KEY,
- ComponentUtils.NOT_FOUND);
+ ComponentUtils.NOT_FOUND, buffer);
} catch (HyracksDataException e) {
// Should never happen since this is a memory component
throw new IllegalStateException(e);
@@ -76,7 +78,7 @@
for (ILSMDiskComponent c : diskComponents) {
try {
long lsn = ComponentUtils.getLong(c.getMetadata(), ComponentUtils.MARKER_LSN_KEY,
- ComponentUtils.NOT_FOUND);
+ ComponentUtils.NOT_FOUND, buffer);
if (lsn != ComponentUtils.NOT_FOUND) {
return lsn;
}
@@ -101,7 +103,7 @@
if (c.isReadable()) {
try {
lsn = ComponentUtils.getLong(c.getMetadata(), ComponentUtils.MARKER_LSN_KEY,
- ComponentUtils.NOT_FOUND);
+ ComponentUtils.NOT_FOUND, buffer);
} catch (HyracksDataException e) {
// Should never happen since this is a memory component
throw new IllegalStateException(e);
@@ -117,7 +119,11 @@
@Override
public void after(long lsn) {
pointable.setLong(lsn);
- index.getCurrentMemoryComponent().getMetadata().put(ComponentUtils.MARKER_LSN_KEY, pointable);
+ try {
+ index.getCurrentMemoryComponent().getMetadata().put(ComponentUtils.MARKER_LSN_KEY, pointable);
+ } catch (HyracksDataException e) {
+ throw new IllegalStateException(e);
+ }
}
public ILSMIndex getIndex() {
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties b/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties
index 8635efd..1846062 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties
@@ -115,7 +115,7 @@
96 = Illegal attempt to enter empty component
97 = Illegal attempt to exit empty component
98 = A flush operation has failed
-99 = A merge operation has failed
+99 = A merge operation has failed. The component %1$s was found in the list of index components
100 = Failed to shutdown event processor for %1$s
101 = Page %1$s does not exist in file %2$s
102 = Failed to open virtual buffer cache since it is already open
diff --git a/hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/util/ArrayBackedValueStorage.java b/hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/util/ArrayBackedValueStorage.java
index e075f4e..d5a4481 100644
--- a/hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/util/ArrayBackedValueStorage.java
+++ b/hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/util/ArrayBackedValueStorage.java
@@ -22,12 +22,22 @@
import java.io.IOException;
import java.util.Objects;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.data.std.api.IMutableValueStorage;
+import org.apache.hyracks.data.std.api.IPointable;
import org.apache.hyracks.data.std.api.IValueReference;
-public class ArrayBackedValueStorage implements IMutableValueStorage {
+public class ArrayBackedValueStorage implements IMutableValueStorage, IPointable {
- private final GrowableArray data = new GrowableArray();
+ private final GrowableArray data;
+
+ public ArrayBackedValueStorage(int size) {
+ data = new GrowableArray(size);
+ }
+
+ public ArrayBackedValueStorage() {
+ data = new GrowableArray();
+ }
@Override
public void reset() {
@@ -54,16 +64,15 @@
return data.getLength();
}
- //TODO: don't swallow, but throw the exception
- public void append(IValueReference value) {
+ public void append(IValueReference value) throws HyracksDataException {
try {
data.append(value);
} catch (IOException e) {
- e.printStackTrace();
+ throw HyracksDataException.create(e);
}
}
- public void assign(IValueReference value) {
+ public void assign(IValueReference value) throws HyracksDataException {
reset();
append(value);
}
@@ -89,4 +98,31 @@
return Objects.equals(data, other.data);
}
+ @Override
+ public void set(byte[] bytes, int start, int length) {
+ reset();
+ if (bytes != null) {
+ try {
+ data.append(bytes, start, length);
+ } catch (IOException e) {
+ throw new IllegalStateException(e);
+ }
+ }
+ }
+
+ @Override
+ public void set(IValueReference pointer) {
+ try {
+ assign(pointer);
+ } catch (HyracksDataException e) {
+ throw new IllegalStateException(e);
+ }
+ }
+
+ public byte[] toByteArray() {
+ byte[] byteArray = new byte[getLength()];
+ System.arraycopy(getByteArray(), getStartOffset(), byteArray, 0, getLength());
+ return byteArray;
+ }
+
}
diff --git a/hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/util/GrowableArray.java b/hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/util/GrowableArray.java
index 994d286..12486d9 100644
--- a/hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/util/GrowableArray.java
+++ b/hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/util/GrowableArray.java
@@ -26,8 +26,18 @@
import org.apache.hyracks.data.std.api.IValueReference;
public class GrowableArray implements IDataOutputProvider {
- private final ByteArrayAccessibleOutputStream baaos = new ByteArrayAccessibleOutputStream();
- private final RewindableDataOutputStream dos = new RewindableDataOutputStream(baaos);
+ private final ByteArrayAccessibleOutputStream baaos;
+ private final RewindableDataOutputStream dos;
+
+ public GrowableArray() {
+ baaos = new ByteArrayAccessibleOutputStream();
+ dos = new RewindableDataOutputStream(baaos);
+ }
+
+ public GrowableArray(int size) {
+ baaos = new ByteArrayAccessibleOutputStream(size);
+ dos = new RewindableDataOutputStream(baaos);
+ }
@Override
public DataOutput getDataOutput() {
@@ -65,7 +75,11 @@
}
public void append(IValueReference value) throws IOException {
- dos.write(value.getByteArray(), value.getStartOffset(), value.getLength());
+ append(value.getByteArray(), value.getStartOffset(), value.getLength());
+ }
+
+ public void append(byte[] data, int offset, int length) throws IOException {
+ dos.write(data, offset, length);
}
public void setSize(int bytesRequired) {
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/IComponentMetadata.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/IComponentMetadata.java
index 5dee557..fa69d7a 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/IComponentMetadata.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/IComponentMetadata.java
@@ -19,8 +19,8 @@
package org.apache.hyracks.storage.am.lsm.common.api;
import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.data.std.api.IPointable;
import org.apache.hyracks.data.std.api.IValueReference;
+import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
public interface IComponentMetadata {
@@ -41,14 +41,5 @@
* @param value
* @throws HyracksDataException
*/
- void get(IValueReference key, IPointable value) throws HyracksDataException;
-
- /**
- * Get the value
- *
- * @param key
- * @return
- * @throws HyracksDataException
- */
- IValueReference get(IValueReference key) throws HyracksDataException;
+ void get(IValueReference key, ArrayBackedValueStorage value) throws HyracksDataException;
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMDiskComponent.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMDiskComponent.java
index 26c7b0d..c4616d3 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMDiskComponent.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMDiskComponent.java
@@ -19,6 +19,7 @@
package org.apache.hyracks.storage.am.lsm.common.impls;
import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
import org.apache.hyracks.storage.am.common.api.IMetadataPageManager;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentFilter;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentId;
@@ -36,6 +37,7 @@
private static final Logger LOGGER = LogManager.getLogger();
private final DiskComponentMetadata metadata;
+ private final ArrayBackedValueStorage buffer = new ArrayBackedValueStorage(Long.BYTES);
// a variable cache of componentId stored in metadata.
// since componentId is immutable, we do not want to read from metadata every time the componentId
@@ -121,7 +123,7 @@
}
synchronized (this) {
if (componentId == null) {
- componentId = LSMComponentIdUtils.readFrom(metadata);
+ componentId = LSMComponentIdUtils.readFrom(metadata, buffer);
}
}
if (componentId.missing()) {
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/BlockingIOOperationCallbackWrapper.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/BlockingIOOperationCallbackWrapper.java
index 042720c..a8ee286 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/BlockingIOOperationCallbackWrapper.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/BlockingIOOperationCallbackWrapper.java
@@ -34,7 +34,7 @@
}
public synchronized void waitForIO() throws InterruptedException {
- if (!notified) {
+ while (!notified) {
wait();
}
notified = false;
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/DiskComponentMetadata.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/DiskComponentMetadata.java
index d1244ce..649989c 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/DiskComponentMetadata.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/DiskComponentMetadata.java
@@ -19,9 +19,8 @@
package org.apache.hyracks.storage.am.lsm.common.impls;
import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.data.std.api.IPointable;
import org.apache.hyracks.data.std.api.IValueReference;
-import org.apache.hyracks.data.std.primitive.VoidPointable;
+import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
import org.apache.hyracks.storage.am.common.api.IMetadataPageManager;
import org.apache.hyracks.storage.am.lsm.common.api.IComponentMetadata;
@@ -39,17 +38,10 @@
}
@Override
- public void get(IValueReference key, IPointable value) throws HyracksDataException {
+ public void get(IValueReference key, ArrayBackedValueStorage value) throws HyracksDataException {
mdpManager.get(mdpManager.createMetadataFrame(), key, value);
}
- @Override
- public IValueReference get(IValueReference key) throws HyracksDataException {
- VoidPointable value = VoidPointable.FACTORY.createPointable();
- get(key, value);
- return value;
- }
-
public void put(MemoryComponentMetadata metadata) throws HyracksDataException {
metadata.copy(mdpManager);
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/EmptyDiskComponentMetadata.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/EmptyDiskComponentMetadata.java
index 7d1925b..d0fe8a9 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/EmptyDiskComponentMetadata.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/EmptyDiskComponentMetadata.java
@@ -19,8 +19,8 @@
package org.apache.hyracks.storage.am.lsm.common.impls;
import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.data.std.api.IPointable;
import org.apache.hyracks.data.std.api.IValueReference;
+import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
public class EmptyDiskComponentMetadata extends DiskComponentMetadata {
public static final EmptyDiskComponentMetadata INSTANCE = new EmptyDiskComponentMetadata();
@@ -35,12 +35,7 @@
}
@Override
- public void get(IValueReference key, IPointable value) throws HyracksDataException {
- throw new IllegalStateException("Attempt to read metadata of empty component");
- }
-
- @Override
- public IValueReference get(IValueReference key) throws HyracksDataException {
+ public void get(IValueReference key, ArrayBackedValueStorage value) throws HyracksDataException {
throw new IllegalStateException("Attempt to read metadata of empty component");
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java
index eed8f6e..59f48d4 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java
@@ -601,6 +601,7 @@
public void scheduleMerge(ILSMIndexOperationContext ctx, ILSMIOOperationCallback callback)
throws HyracksDataException {
if (!getAndEnterComponents(ctx, LSMOperationType.MERGE, true)) {
+ LOGGER.info("Failed to enter components for merge operation. Calling finalize");
ctx.setIoOperationType(LSMIOOperationType.MERGE);
callback.afterFinalize(ctx);
return;
@@ -871,10 +872,12 @@
scheduleMerge(ctx, ioCallback);
}
IOOperationUtils.waitForIoOperation(ioCallback);
- // ensure that merge has succeeded
- for (ILSMDiskComponent component : toBeDeleted) {
- if (lsmIndex.getDiskComponents().contains(component)) {
- throw HyracksDataException.create(ErrorCode.A_MERGE_OPERATION_HAS_FAILED);
+ synchronized (opTracker) {
+ // ensure that merge has succeeded
+ for (ILSMDiskComponent component : toBeDeleted) {
+ if (lsmIndex.getDiskComponents().contains(component)) {
+ throw HyracksDataException.create(ErrorCode.A_MERGE_OPERATION_HAS_FAILED, component.toString());
+ }
}
}
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/MemoryComponentMetadata.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/MemoryComponentMetadata.java
index 3179790..e73fa0a 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/MemoryComponentMetadata.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/MemoryComponentMetadata.java
@@ -20,10 +20,10 @@
import java.util.ArrayList;
import java.util.List;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.data.std.api.IPointable;
import org.apache.hyracks.data.std.api.IValueReference;
import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
import org.apache.hyracks.storage.am.common.api.IMetadataPageManager;
@@ -35,61 +35,94 @@
public class MemoryComponentMetadata implements IComponentMetadata {
private static final Logger LOGGER = LogManager.getLogger();
- private static final byte[] empty = new byte[0];
+ private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
private final List<org.apache.commons.lang3.tuple.Pair<IValueReference, ArrayBackedValueStorage>> store =
new ArrayList<>();
/**
* Note: for memory metadata, it is expected that the key will be constant
+ *
+ * @throws HyracksDataException
*/
@Override
- public void put(IValueReference key, IValueReference value) {
- ArrayBackedValueStorage stored = get(key);
- if (stored == null) {
- stored = new ArrayBackedValueStorage();
- store.add(Pair.of(key, stored));
+ public void put(IValueReference key, IValueReference value) throws HyracksDataException {
+ lock.writeLock().lock();
+ try {
+ ArrayBackedValueStorage stored = get(key);
+ if (stored == null) {
+ stored = new ArrayBackedValueStorage();
+ store.add(Pair.of(key, stored));
+ }
+ stored.assign(value);
+ } finally {
+ lock.writeLock().unlock();
}
- stored.assign(value);
}
/**
* Note: for memory metadata, it is expected that the key will be constant
+ *
+ * @throws HyracksDataException
*/
@Override
- public void get(IValueReference key, IPointable value) {
- value.set(empty, 0, 0);
- ArrayBackedValueStorage stored = get(key);
- if (stored != null) {
- value.set(stored);
+ public void get(IValueReference key, ArrayBackedValueStorage value) throws HyracksDataException {
+ lock.readLock().lock();
+ try {
+ value.reset();
+ ArrayBackedValueStorage stored = get(key);
+ if (stored != null) {
+ value.append(stored);
+ }
+ } finally {
+ lock.readLock().unlock();
}
}
- @Override
- public ArrayBackedValueStorage get(IValueReference key) {
- for (Pair<IValueReference, ArrayBackedValueStorage> pair : store) {
- if (pair.getKey().equals(key)) {
- return pair.getValue();
+ private ArrayBackedValueStorage get(IValueReference key) {
+ lock.readLock().lock();
+ try {
+ for (Pair<IValueReference, ArrayBackedValueStorage> pair : store) {
+ if (pair.getKey().equals(key)) {
+ return pair.getValue();
+ }
}
+ return null;
+ } finally {
+ lock.readLock().unlock();
}
- return null;
}
public void copy(IMetadataPageManager mdpManager) throws HyracksDataException {
- LOGGER.log(Level.INFO, "Copying Metadata into a different component");
- ITreeIndexMetadataFrame frame = mdpManager.createMetadataFrame();
- for (Pair<IValueReference, ArrayBackedValueStorage> pair : store) {
- if (LOGGER.isInfoEnabled()) {
- LOGGER.log(Level.INFO, "Copying " + pair.getKey() + " : " + pair.getValue().getLength() + " bytes");
+ lock.readLock().lock();
+ try {
+ LOGGER.log(Level.INFO, "Copying Metadata into a different component");
+ ITreeIndexMetadataFrame frame = mdpManager.createMetadataFrame();
+ for (Pair<IValueReference, ArrayBackedValueStorage> pair : store) {
+ if (LOGGER.isInfoEnabled()) {
+ LOGGER.log(Level.INFO, "Copying " + pair.getKey() + " : " + pair.getValue().getLength() + " bytes");
+ }
+ mdpManager.put(frame, pair.getKey(), pair.getValue());
}
- mdpManager.put(frame, pair.getKey(), pair.getValue());
+ } finally {
+ lock.readLock().unlock();
}
}
public void copy(DiskComponentMetadata metadata) throws HyracksDataException {
- metadata.put(this);
+ lock.readLock().lock();
+ try {
+ metadata.put(this);
+ } finally {
+ lock.readLock().unlock();
+ }
}
public void reset() {
- store.clear();
+ lock.writeLock().lock();
+ try {
+ store.clear();
+ } finally {
+ lock.writeLock().unlock();
+ }
}
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/util/ComponentUtils.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/util/ComponentUtils.java
index 94a3702..4b7f338 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/util/ComponentUtils.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/util/ComponentUtils.java
@@ -24,6 +24,7 @@
import org.apache.hyracks.data.std.api.IPointable;
import org.apache.hyracks.data.std.api.IValueReference;
import org.apache.hyracks.data.std.primitive.LongPointable;
+import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
import org.apache.hyracks.storage.am.bloomfilter.impls.BloomFilter;
import org.apache.hyracks.storage.am.common.api.ITreeIndex;
import org.apache.hyracks.storage.am.common.freepage.MutableArrayValueReference;
@@ -59,10 +60,10 @@
* @throws HyracksDataException
* If the comopnent was a disk component and an IO error was encountered
*/
- public static long getLong(IComponentMetadata metadata, IValueReference key, long defaultValue)
- throws HyracksDataException {
- IValueReference value = metadata.get(key);
- return value == null || value.getLength() == 0 ? defaultValue
+ public static long getLong(IComponentMetadata metadata, IValueReference key, long defaultValue,
+ ArrayBackedValueStorage value) throws HyracksDataException {
+ metadata.get(key, value);
+ return value.getLength() == 0 ? defaultValue
: LongPointable.getLong(value.getByteArray(), value.getStartOffset());
}
@@ -73,31 +74,36 @@
*
* @param index
* @param key
- * @param pointable
+ * @param value
* @throws HyracksDataException
*/
- public static void get(ILSMIndex index, IValueReference key, IPointable pointable) throws HyracksDataException {
+ public static void get(ILSMIndex index, IValueReference key, ArrayBackedValueStorage value)
+ throws HyracksDataException {
boolean loggable = LOGGER.isDebugEnabled();
+ value.reset();
if (loggable) {
LOGGER.log(Level.DEBUG, "Getting " + key + " from index " + index);
}
// Lock the opTracker to ensure index components don't change
synchronized (index.getOperationTracker()) {
- index.getCurrentMemoryComponent().getMetadata().get(key, pointable);
- if (pointable.getLength() == 0) {
+ ILSMMemoryComponent cmc = index.getCurrentMemoryComponent();
+ if (cmc.isReadable()) {
+ index.getCurrentMemoryComponent().getMetadata().get(key, value);
+ }
+ if (value.getLength() == 0) {
if (loggable) {
LOGGER.log(Level.DEBUG, key + " was not found in mutable memory component of " + index);
}
// was not found in the in current mutable component, search in the other in memory components
- fromImmutableMemoryComponents(index, key, pointable);
- if (pointable.getLength() == 0) {
+ fromImmutableMemoryComponents(index, key, value);
+ if (value.getLength() == 0) {
if (loggable) {
LOGGER.log(Level.DEBUG, key + " was not found in all immmutable memory components of " + index);
}
// was not found in the in all in memory components, search in the disk components
- fromDiskComponents(index, key, pointable);
+ fromDiskComponents(index, key, value);
if (loggable) {
- if (pointable.getLength() == 0) {
+ if (value.getLength() == 0) {
LOGGER.log(Level.DEBUG, key + " was not found in all disk components of " + index);
} else {
LOGGER.log(Level.DEBUG, key + " was found in disk components of " + index);
@@ -134,7 +140,7 @@
}
}
- private static void fromDiskComponents(ILSMIndex index, IValueReference key, IPointable pointable)
+ private static void fromDiskComponents(ILSMIndex index, IValueReference key, ArrayBackedValueStorage value)
throws HyracksDataException {
boolean loggable = LOGGER.isDebugEnabled();
if (loggable) {
@@ -144,15 +150,16 @@
if (loggable) {
LOGGER.log(Level.DEBUG, "Getting " + key + " from disk components " + c);
}
- c.getMetadata().get(key, pointable);
- if (pointable.getLength() != 0) {
+ c.getMetadata().get(key, value);
+ if (value.getLength() != 0) {
// Found
return;
}
}
}
- private static void fromImmutableMemoryComponents(ILSMIndex index, IValueReference key, IPointable pointable) {
+ private static void fromImmutableMemoryComponents(ILSMIndex index, IValueReference key,
+ ArrayBackedValueStorage value) throws HyracksDataException {
boolean loggable = LOGGER.isDebugEnabled();
if (loggable) {
LOGGER.log(Level.DEBUG, "Getting " + key + " from immutable memory components of " + index);
@@ -174,8 +181,8 @@
}
ILSMMemoryComponent c = index.getMemoryComponents().get(next);
if (c.isReadable()) {
- c.getMetadata().get(key, pointable);
- if (pointable.getLength() != 0) {
+ c.getMetadata().get(key, value);
+ if (value.getLength() != 0) {
// Found
return;
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/util/LSMComponentIdUtils.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/util/LSMComponentIdUtils.java
index 3c88543..6d4b0a7 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/util/LSMComponentIdUtils.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/util/LSMComponentIdUtils.java
@@ -20,6 +20,7 @@
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.data.std.primitive.LongPointable;
+import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
import org.apache.hyracks.storage.am.common.freepage.MutableArrayValueReference;
import org.apache.hyracks.storage.am.lsm.common.api.IComponentMetadata;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentId;
@@ -37,9 +38,10 @@
}
- public static ILSMComponentId readFrom(IComponentMetadata metadata) throws HyracksDataException {
- long minId = ComponentUtils.getLong(metadata, COMPONENT_ID_MIN_KEY, LSMComponentId.NOT_FOUND);
- long maxId = ComponentUtils.getLong(metadata, COMPONENT_ID_MAX_KEY, LSMComponentId.NOT_FOUND);
+ public static ILSMComponentId readFrom(IComponentMetadata metadata, ArrayBackedValueStorage buffer)
+ throws HyracksDataException {
+ long minId = ComponentUtils.getLong(metadata, COMPONENT_ID_MIN_KEY, LSMComponentId.NOT_FOUND, buffer);
+ long maxId = ComponentUtils.getLong(metadata, COMPONENT_ID_MAX_KEY, LSMComponentId.NOT_FOUND, buffer);
if (minId == LSMComponentId.NOT_FOUND || maxId == LSMComponentId.NOT_FOUND) {
return LSMComponentId.MISSING_COMPONENT_ID;
} else {