[NO ISSUE] Report all BufferCache write failures.
- user model changes: no
- storage format changes: no
- interface changes: yes
+ IPageWriteFailureCallback: used to notify async
IO caller when something goes wrong.
Details:
- Before this change, it is possible for failures to
be lost and for bulkload operations to not be
aware of failure to write some pages. This can be
dangerous.
- To avoid this, when sending a page to be written
a PageWriteFailureCallback is associated with the
page to notify the caller that a failure took place.
Change-Id: I97fd3dccff85dab84d644359be6f66b15ee708ef
Reviewed-on: https://asterix-gerrit.ics.uci.edu/2787
Sonar-Qube: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Contrib: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Luo Chen <cluo8@uci.edu>
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-bloomfilter/src/main/java/org/apache/hyracks/storage/am/bloomfilter/impls/BloomFilter.java b/hyracks-fullstack/hyracks/hyracks-storage-am-bloomfilter/src/main/java/org/apache/hyracks/storage/am/bloomfilter/impls/BloomFilter.java
index 6c16bd1..2dce1cd 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-bloomfilter/src/main/java/org/apache/hyracks/storage/am/bloomfilter/impls/BloomFilter.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-bloomfilter/src/main/java/org/apache/hyracks/storage/am/bloomfilter/impls/BloomFilter.java
@@ -29,6 +29,7 @@
import org.apache.hyracks.storage.common.buffercache.IBufferCache;
import org.apache.hyracks.storage.common.buffercache.ICachedPage;
import org.apache.hyracks.storage.common.buffercache.IFIFOPageQueue;
+import org.apache.hyracks.storage.common.buffercache.PageWriteFailureCallback;
import org.apache.hyracks.storage.common.file.BufferedFileHandle;
public class BloomFilter {
@@ -275,7 +276,7 @@
return new BloomFilterBuilder(numElements, numHashes, numBitsPerElement);
}
- public class BloomFilterBuilder implements IIndexBulkLoader {
+ public class BloomFilterBuilder extends PageWriteFailureCallback implements IIndexBulkLoader {
private final long[] hashes = BloomFilter.createHashArray();
private final long estimatedNumElements;
private final int numHashes;
@@ -286,6 +287,7 @@
private final ICachedPage[] pages;
private ICachedPage metaDataPage = null;
+ @SuppressWarnings("squid:S1181") // Catch Throwable Must return all confiscated pages
public BloomFilterBuilder(long estimatedNumElemenets, int numHashes, int numBitsPerElement)
throws HyracksDataException {
if (!isActivated) {
@@ -303,11 +305,22 @@
actualNumElements = 0;
pages = new ICachedPage[numPages];
int currentPageId = 1;
- while (currentPageId <= numPages) {
- ICachedPage page = bufferCache.confiscatePage(BufferedFileHandle.getDiskPageId(fileId, currentPageId));
- initPage(page.getBuffer().array());
- pages[currentPageId - 1] = page;
- ++currentPageId;
+ try {
+ while (currentPageId <= numPages) {
+ ICachedPage page =
+ bufferCache.confiscatePage(BufferedFileHandle.getDiskPageId(fileId, currentPageId));
+ initPage(page.getBuffer().array());
+ pages[currentPageId - 1] = page;
+ ++currentPageId;
+ }
+ } catch (Throwable th) {
+ // return confiscated pages
+ for (int i = 0; i < currentPageId; i++) {
+ if (pages[i] != null) {
+ bufferCache.returnPage(pages[i]);
+ }
+ }
+ throw th;
}
}
@@ -364,11 +377,14 @@
@Override
public void end() throws HyracksDataException {
allocateAndInitMetaDataPage();
- queue.put(metaDataPage);
+ queue.put(metaDataPage, this);
for (ICachedPage p : pages) {
- queue.put(p);
+ queue.put(p, this);
}
bufferCache.finishQueue();
+ if (hasFailed()) {
+ throw HyracksDataException.create(getFailure());
+ }
BloomFilter.this.numBits = numBits;
BloomFilter.this.numHashes = numHashes;
BloomFilter.this.numElements = actualNumElements;
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/impls/BTree.java b/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/impls/BTree.java
index fb8770e..4fc8af9 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/impls/BTree.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/impls/BTree.java
@@ -1012,9 +1012,7 @@
try {
int tupleSize = Math.max(leafFrame.getBytesRequiredToWriteTuple(tuple),
interiorFrame.getBytesRequiredToWriteTuple(tuple));
-
NodeFrontier leafFrontier = nodeFrontiers.get(0);
-
int spaceNeeded = tupleWriter.bytesRequired(tuple) + slotSize;
int spaceUsed = leafFrame.getBuffer().capacity() - leafFrame.getTotalFreeSpace();
@@ -1045,12 +1043,11 @@
((IBTreeLeafFrame) leafFrame).setNextLeaf(leafFrontier.pageId);
- queue.put(leafFrontier.page);
+ queue.put(leafFrontier.page, this);
for (ICachedPage c : pagesToWrite) {
- queue.put(c);
+ queue.put(c, this);
}
pagesToWrite.clear();
-
splitKey.setRightPage(leafFrontier.pageId);
}
if (tupleSize > maxTupleSize) {
@@ -1155,7 +1152,7 @@
ICachedPage lastLeaf = nodeFrontiers.get(level).page;
int lastLeafPage = nodeFrontiers.get(level).pageId;
lastLeaf.setDiskPageId(BufferedFileHandle.getDiskPageId(getFileId(), nodeFrontiers.get(level).pageId));
- queue.put(lastLeaf);
+ queue.put(lastLeaf, this);
nodeFrontiers.get(level).page = null;
persistFrontiers(level + 1, lastLeafPage);
return;
@@ -1170,9 +1167,8 @@
((IBTreeInteriorFrame) interiorFrame).setRightmostChildPageId(rightPage);
int finalPageId = freePageManager.takePage(metaFrame);
frontier.page.setDiskPageId(BufferedFileHandle.getDiskPageId(getFileId(), finalPageId));
- queue.put(frontier.page);
+ queue.put(frontier.page, this);
frontier.pageId = finalPageId;
-
persistFrontiers(level + 1, finalPageId);
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/api/IPageManager.java b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/api/IPageManager.java
index b7987f8..f8a929d 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/api/IPageManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/api/IPageManager.java
@@ -19,6 +19,7 @@
package org.apache.hyracks.storage.am.common.api;
import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.storage.common.buffercache.IPageWriteFailureCallback;
public interface IPageManager {
@@ -42,12 +43,14 @@
* 3. When we need to have a persisted state.
*
* Note: This method will not force indexes to disk driver using fsync
+ *
* @throws HyracksDataException
*/
- void close() throws HyracksDataException;
+ void close(IPageWriteFailureCallback callback) throws HyracksDataException;
/**
* Create a metadata frame to be used for reading and writing to metadata pages
+ *
* @return a new metadata frame
*/
ITreeIndexMetadataFrame createMetadataFrame();
@@ -87,6 +90,7 @@
/**
* Get the location of a block of free pages to use for index operations
* This is used for records that are larger than a normal page
+ *
* @param frame
* A metadata frame to use to wrap metadata pages
* @return The starting page location, or -1 if a block of free pages could be found or allocated
@@ -107,6 +111,7 @@
/**
* Add a page back to the pool of free pages within an index file
+ *
* @param frame
* A metadata frame to use to wrap metadata pages
* @param page
@@ -129,6 +134,7 @@
/**
* Check whether the index is empty or not.
+ *
* @param frame
* interior frame
* @param rootPage
@@ -140,6 +146,7 @@
/**
* Get the root page of the id
+ *
* @return the root page
* @throws HyracksDataException
*/
@@ -147,6 +154,7 @@
/**
* Get the first page to start the bulk load
+ *
* @return
* @throws HyracksDataException
*/
@@ -154,6 +162,7 @@
/**
* Set the root page id and finalize the bulk load operation
+ *
* @param rootPage
* @throws HyracksDataException
*/
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/freepage/AppendOnlyLinkedMetadataPageManager.java b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/freepage/AppendOnlyLinkedMetadataPageManager.java
index 5c389d2..97e7ed7 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/freepage/AppendOnlyLinkedMetadataPageManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/freepage/AppendOnlyLinkedMetadataPageManager.java
@@ -32,6 +32,7 @@
import org.apache.hyracks.storage.common.buffercache.IBufferCache;
import org.apache.hyracks.storage.common.buffercache.ICachedPage;
import org.apache.hyracks.storage.common.buffercache.IFIFOPageQueue;
+import org.apache.hyracks.storage.common.buffercache.IPageWriteFailureCallback;
import org.apache.hyracks.storage.common.file.BufferedFileHandle;
public class AppendOnlyLinkedMetadataPageManager implements IMetadataPageManager {
@@ -207,7 +208,7 @@
}
@Override
- public void close() throws HyracksDataException {
+ public void close(IPageWriteFailureCallback callback) throws HyracksDataException {
if (ready) {
IFIFOPageQueue queue = bufferCache.createFIFOQueue();
ITreeIndexMetadataFrame metaFrame = frameFactory.createFrame();
@@ -220,7 +221,9 @@
}
int finalMetaPage = getMaxPageId(metaFrame) + 1;
confiscatedPage.setDiskPageId(BufferedFileHandle.getDiskPageId(fileId, finalMetaPage));
- queue.put(confiscatedPage);
+ // WARNING: flushing the metadata page should be done after releasing the write latch; otherwise, the page
+ // won't be flushed to disk because it won't be dirty until the write latch has been released.
+ queue.put(confiscatedPage, callback);
bufferCache.finishQueue();
metadataPage = getMetadataPageId();
ready = false;
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/freepage/LinkedMetaDataPageManager.java b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/freepage/LinkedMetaDataPageManager.java
index 951d824..e348e24 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/freepage/LinkedMetaDataPageManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/freepage/LinkedMetaDataPageManager.java
@@ -28,8 +28,14 @@
import org.apache.hyracks.storage.am.common.api.ITreeIndexMetadataFrameFactory;
import org.apache.hyracks.storage.common.buffercache.IBufferCache;
import org.apache.hyracks.storage.common.buffercache.ICachedPage;
+import org.apache.hyracks.storage.common.buffercache.IPageWriteFailureCallback;
import org.apache.hyracks.storage.common.file.BufferedFileHandle;
+/**
+ * @deprecated
+ * This class must not be used. Instead, use {@link AppendOnlyLinkedMetadataPageManager}
+ */
+@Deprecated
public class LinkedMetaDataPageManager implements IMetadataPageManager {
private final IBufferCache bufferCache;
private int fileId = -1;
@@ -238,7 +244,7 @@
}
@Override
- public void close() throws HyracksDataException {
+ public void close(IPageWriteFailureCallback callback) throws HyracksDataException {
if (ready) {
ICachedPage metaNode =
bufferCache.pin(BufferedFileHandle.getDiskPageId(fileId, getMetadataPageId()), false);
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/impls/AbstractTreeIndex.java b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/impls/AbstractTreeIndex.java
index 905c99d..b77f14f 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/impls/AbstractTreeIndex.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/impls/AbstractTreeIndex.java
@@ -26,19 +26,19 @@
import org.apache.hyracks.api.exceptions.ErrorCode;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.io.FileReference;
-import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
import org.apache.hyracks.storage.am.common.api.IPageManager;
import org.apache.hyracks.storage.am.common.api.ITreeIndex;
-import org.apache.hyracks.storage.am.common.api.ITreeIndexAccessor;
import org.apache.hyracks.storage.am.common.api.ITreeIndexFrame;
import org.apache.hyracks.storage.am.common.api.ITreeIndexFrameFactory;
import org.apache.hyracks.storage.am.common.api.ITreeIndexMetadataFrame;
import org.apache.hyracks.storage.am.common.api.ITreeIndexTupleWriter;
import org.apache.hyracks.storage.common.IIndexBulkLoader;
import org.apache.hyracks.storage.common.MultiComparator;
+import org.apache.hyracks.storage.common.buffercache.HaltOnFailureCallback;
import org.apache.hyracks.storage.common.buffercache.IBufferCache;
import org.apache.hyracks.storage.common.buffercache.ICachedPage;
import org.apache.hyracks.storage.common.buffercache.IFIFOPageQueue;
+import org.apache.hyracks.storage.common.buffercache.PageWriteFailureCallback;
import org.apache.hyracks.storage.common.file.BufferedFileHandle;
public abstract class AbstractTreeIndex implements ITreeIndex {
@@ -95,7 +95,7 @@
freePageManager.open(fileId);
freePageManager.init(interiorFrameFactory, leafFrameFactory);
setRootPage();
- freePageManager.close();
+ freePageManager.close(HaltOnFailureCallback.INSTANCE);
failed = false;
} finally {
bufferCache.closeFile(fileId);
@@ -132,7 +132,7 @@
if (!isActive) {
throw HyracksDataException.create(ErrorCode.CANNOT_DEACTIVATE_INACTIVE_INDEX);
}
- freePageManager.close();
+ freePageManager.close(HaltOnFailureCallback.INSTANCE);
bufferCache.closeFile(fileId);
isActive = false;
}
@@ -227,7 +227,7 @@
return fieldCount;
}
- public abstract class AbstractTreeIndexBulkLoader implements IIndexBulkLoader {
+ public abstract class AbstractTreeIndexBulkLoader extends PageWriteFailureCallback implements IIndexBulkLoader {
protected final MultiComparator cmp;
protected final int slotSize;
protected final int leafMaxBytes;
@@ -297,6 +297,9 @@
@Override
public void end() throws HyracksDataException {
bufferCache.finishQueue();
+ if (hasFailed()) {
+ throw HyracksDataException.create(getFailure());
+ }
freePageManager.setRootPageId(rootPage);
}
@@ -317,31 +320,6 @@
public void setLeafFrame(ITreeIndexFrame leafFrame) {
this.leafFrame = leafFrame;
}
-
- }
-
- public class TreeIndexInsertBulkLoader implements IIndexBulkLoader {
- ITreeIndexAccessor accessor;
-
- public TreeIndexInsertBulkLoader() throws HyracksDataException {
- accessor = (ITreeIndexAccessor) createAccessor(NoOpIndexAccessParameters.INSTANCE);
- }
-
- @Override
- public void add(ITupleReference tuple) throws HyracksDataException {
- accessor.insert(tuple);
- }
-
- @Override
- public void end() throws HyracksDataException {
- // do nothing
- }
-
- @Override
- public void abort() {
-
- }
-
}
@Override
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/ExternalBTree.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/ExternalBTree.java
index a504f7e..8be75be 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/ExternalBTree.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/ExternalBTree.java
@@ -60,6 +60,7 @@
import org.apache.hyracks.storage.common.ISearchOperationCallback;
import org.apache.hyracks.storage.common.ISearchPredicate;
import org.apache.hyracks.storage.common.buffercache.IBufferCache;
+import org.apache.hyracks.storage.common.buffercache.ICachedPage;
import org.apache.hyracks.util.trace.ITracer;
/**
@@ -462,12 +463,15 @@
if (isTransaction) {
// Since this is a transaction component, validate and
// deactivate. it could later be added or deleted
- component.markAsValid(durable);
- ioOpCallback.afterFinalize(loadOp);
+ try {
+ component.markAsValid(durable, loadOp);
+ } finally {
+ ioOpCallback.afterFinalize(loadOp);
+ }
component.deactivate();
} else {
ioOpCallback.afterFinalize(loadOp);
- getHarness().addBulkLoadedComponent(component);
+ getHarness().addBulkLoadedComponent(loadOp);
}
}
} finally {
@@ -490,6 +494,21 @@
ioOpCallback.completed(loadOp);
}
}
+
+ @Override
+ public void writeFailed(ICachedPage page, Throwable failure) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public boolean hasFailed() {
+ return componentBulkLoader.hasFailed();
+ }
+
+ @Override
+ public Throwable getFailure() {
+ return componentBulkLoader.getFailure();
+ }
}
// The accessor for disk only indexes don't use modification callback and always carry the target index version with them
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/ExternalBTreeWithBuddy.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/ExternalBTreeWithBuddy.java
index b727a39..5bcf30d 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/ExternalBTreeWithBuddy.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/ExternalBTreeWithBuddy.java
@@ -65,6 +65,7 @@
import org.apache.hyracks.storage.common.ISearchOperationCallback;
import org.apache.hyracks.storage.common.ISearchPredicate;
import org.apache.hyracks.storage.common.buffercache.IBufferCache;
+import org.apache.hyracks.storage.common.buffercache.ICachedPage;
import org.apache.hyracks.util.trace.ITracer;
public class ExternalBTreeWithBuddy extends AbstractLSMIndex implements ITreeIndex, ITwoPCIndex {
@@ -544,12 +545,15 @@
if (isTransaction) {
// Since this is a transaction component, validate and
// deactivate. it could later be added or deleted
- component.markAsValid(durable);
- ioOpCallback.afterFinalize(loadOp);
+ try {
+ component.markAsValid(durable, loadOp);
+ } finally {
+ ioOpCallback.afterFinalize(loadOp);
+ }
component.deactivate();
} else {
ioOpCallback.afterFinalize(loadOp);
- getHarness().addBulkLoadedComponent(component);
+ getHarness().addBulkLoadedComponent(loadOp);
}
}
} finally {
@@ -574,6 +578,21 @@
ioOpCallback.completed(loadOp);
}
}
+
+ @Override
+ public void writeFailed(ICachedPage page, Throwable failure) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public boolean hasFailed() {
+ return componentBulkLoader.hasFailed();
+ }
+
+ @Override
+ public Throwable getFailure() {
+ return componentBulkLoader.getFailure();
+ }
}
@Override
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/AbstractLSMWithBloomFilterDiskComponent.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/AbstractLSMWithBloomFilterDiskComponent.java
index 329a54b..6b6e5e0 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/AbstractLSMWithBloomFilterDiskComponent.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/AbstractLSMWithBloomFilterDiskComponent.java
@@ -30,6 +30,7 @@
import org.apache.hyracks.storage.am.lsm.common.impls.IChainedComponentBulkLoader;
import org.apache.hyracks.storage.am.lsm.common.util.ComponentUtils;
import org.apache.hyracks.storage.common.buffercache.IBufferCache;
+import org.apache.hyracks.storage.common.buffercache.IPageWriteFailureCallback;
public abstract class AbstractLSMWithBloomFilterDiskComponent extends AbstractLSMDiskComponent {
public AbstractLSMWithBloomFilterDiskComponent(AbstractLSMIndex lsmIndex, IMetadataPageManager mdPageManager,
@@ -42,11 +43,11 @@
public abstract IBufferCache getBloomFilterBufferCache();
@Override
- public void markAsValid(boolean persist) throws HyracksDataException {
+ public void markAsValid(boolean persist, IPageWriteFailureCallback callback) throws HyracksDataException {
// The order of forcing the dirty page to be flushed is critical. The
// bloom filter must be always done first.
ComponentUtils.markAsValid(getBloomFilterBufferCache(), getBloomFilter(), persist);
- super.markAsValid(persist);
+ super.markAsValid(persist, callback);
}
@Override
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/AbstractLSMWithBuddyDiskComponent.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/AbstractLSMWithBuddyDiskComponent.java
index cace9e5..f7feb78 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/AbstractLSMWithBuddyDiskComponent.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/AbstractLSMWithBuddyDiskComponent.java
@@ -26,6 +26,7 @@
import org.apache.hyracks.storage.am.lsm.common.impls.IndexWithBuddyBulkLoader;
import org.apache.hyracks.storage.am.lsm.common.util.ComponentUtils;
import org.apache.hyracks.storage.common.IIndexBulkLoader;
+import org.apache.hyracks.storage.common.buffercache.IPageWriteFailureCallback;
public abstract class AbstractLSMWithBuddyDiskComponent extends AbstractLSMWithBloomFilterDiskComponent {
@@ -37,9 +38,9 @@
public abstract AbstractTreeIndex getBuddyIndex();
@Override
- public void markAsValid(boolean persist) throws HyracksDataException {
- super.markAsValid(persist);
- ComponentUtils.markAsValid(getBuddyIndex(), persist);
+ public void markAsValid(boolean persist, IPageWriteFailureCallback callback) throws HyracksDataException {
+ super.markAsValid(persist, callback);
+ ComponentUtils.markAsValid(getBuddyIndex(), persist, callback);
}
@Override
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMDiskComponent.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMDiskComponent.java
index 1500f37..543779c 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMDiskComponent.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMDiskComponent.java
@@ -26,6 +26,7 @@
import org.apache.hyracks.storage.am.lsm.common.impls.ChainedLSMDiskComponentBulkLoader;
import org.apache.hyracks.storage.am.lsm.common.impls.DiskComponentMetadata;
import org.apache.hyracks.storage.am.lsm.common.impls.IChainedComponentBulkLoader;
+import org.apache.hyracks.storage.common.buffercache.IPageWriteFailureCallback;
public interface ILSMDiskComponent extends ILSMComponent {
@@ -68,9 +69,11 @@
*
* @param persist
* whether the call should force data to disk before returning
+ * @param callback
+ * callback for when a page write operation fails
* @throws HyracksDataException
*/
- void markAsValid(boolean persist) throws HyracksDataException;
+ void markAsValid(boolean persist, IPageWriteFailureCallback callback) throws HyracksDataException;
/**
* Activates the component
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMHarness.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMHarness.java
index c4a0352..9e8c568 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMHarness.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMHarness.java
@@ -150,12 +150,12 @@
/**
* Add bulk loaded component
*
- * @param index
- * the new component
+ * @param ioOperation
+ * the io operation that added the new component
* @throws HyracksDataException
* @throws IndexException
*/
- void addBulkLoadedComponent(ILSMDiskComponent index) throws HyracksDataException;
+ void addBulkLoadedComponent(ILSMIOOperation ioOperation) throws HyracksDataException;
/**
* Get index operation tracker
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIOOperation.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIOOperation.java
index 3245455..0e13933 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIOOperation.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIOOperation.java
@@ -26,8 +26,9 @@
import org.apache.hyracks.api.io.IODeviceHandle;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation.LSMIOOperationStatus;
import org.apache.hyracks.storage.am.lsm.common.impls.LSMComponentFileReferences;
+import org.apache.hyracks.storage.common.buffercache.IPageWriteFailureCallback;
-public interface ILSMIOOperation extends Callable<LSMIOOperationStatus> {
+public interface ILSMIOOperation extends Callable<LSMIOOperationStatus>, IPageWriteFailureCallback {
/**
* Represents the io operation type
@@ -94,6 +95,7 @@
/**
* @return the failure in the io operation if any, null otherwise
*/
+ @Override
Throwable getFailure();
/**
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/freepage/VirtualFreePageManager.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/freepage/VirtualFreePageManager.java
index 08c75dc..9d62c0d 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/freepage/VirtualFreePageManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/freepage/VirtualFreePageManager.java
@@ -28,6 +28,7 @@
import org.apache.hyracks.storage.am.common.api.ITreeIndexMetadataFrame;
import org.apache.hyracks.storage.common.buffercache.IBufferCache;
import org.apache.hyracks.storage.common.buffercache.ICachedPage;
+import org.apache.hyracks.storage.common.buffercache.IPageWriteFailureCallback;
import org.apache.hyracks.storage.common.file.BufferedFileHandle;
public class VirtualFreePageManager implements IPageManager {
@@ -88,7 +89,7 @@
}
@Override
- public void close() {
+ public void close(IPageWriteFailureCallback callback) {
// Method doesn't make sense for this free page manager.
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractIoOperation.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractIoOperation.java
index fc9a362..3d76755 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractIoOperation.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractIoOperation.java
@@ -30,6 +30,7 @@
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
import org.apache.hyracks.storage.am.lsm.common.api.IoOperationCompleteListener;
+import org.apache.hyracks.storage.common.buffercache.ICachedPage;
public abstract class AbstractIoOperation implements ILSMIOOperation {
@@ -37,7 +38,7 @@
protected final FileReference target;
protected final ILSMIOOperationCallback callback;
protected final String indexIdentifier;
- private Throwable failure;
+ private volatile Throwable failure;
private LSMIOOperationStatus status = LSMIOOperationStatus.SUCCESS;
private ILSMDiskComponent newComponent;
private boolean completed = false;
@@ -146,4 +147,14 @@
completeListeners.add(listener);
}
}
+
+ @Override
+ public void writeFailed(ICachedPage page, Throwable failure) {
+ setFailure(failure);
+ }
+
+ @Override
+ public boolean hasFailed() {
+ return status == LSMIOOperationStatus.FAILURE;
+ }
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMDiskComponent.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMDiskComponent.java
index aa312fb..a88a19a 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMDiskComponent.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMDiskComponent.java
@@ -30,6 +30,7 @@
import org.apache.hyracks.storage.am.lsm.common.util.ComponentUtils;
import org.apache.hyracks.storage.am.lsm.common.util.LSMComponentIdUtils;
import org.apache.hyracks.storage.common.MultiComparator;
+import org.apache.hyracks.storage.common.buffercache.IPageWriteFailureCallback;
import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -156,8 +157,8 @@
* @throws HyracksDataException
*/
@Override
- public void markAsValid(boolean persist) throws HyracksDataException {
- ComponentUtils.markAsValid(getMetadataHolder(), persist);
+ public void markAsValid(boolean persist, IPageWriteFailureCallback callback) throws HyracksDataException {
+ ComponentUtils.markAsValid(getMetadataHolder(), persist, callback);
if (LOGGER.isInfoEnabled()) {
LOGGER.log(Level.INFO, "Marked as valid component with id: " + getId());
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/BloomFilterBulkLoader.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/BloomFilterBulkLoader.java
index 29ca388..a9c70e0 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/BloomFilterBulkLoader.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/BloomFilterBulkLoader.java
@@ -21,6 +21,7 @@
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
import org.apache.hyracks.storage.common.IIndexBulkLoader;
+import org.apache.hyracks.storage.common.buffercache.ICachedPage;
public class BloomFilterBulkLoader implements IChainedComponentBulkLoader {
@@ -65,4 +66,19 @@
endedBloomFilterLoad = true;
}
}
+
+ @Override
+ public void writeFailed(ICachedPage page, Throwable failure) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public boolean hasFailed() {
+ return bulkLoader.hasFailed();
+ }
+
+ @Override
+ public Throwable getFailure() {
+ return bulkLoader.getFailure();
+ }
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/ChainedLSMDiskComponentBulkLoader.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/ChainedLSMDiskComponentBulkLoader.java
index 6e0606a..ab59b59 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/ChainedLSMDiskComponentBulkLoader.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/ChainedLSMDiskComponentBulkLoader.java
@@ -27,6 +27,7 @@
import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponentBulkLoader;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation.LSMIOOperationStatus;
+import org.apache.hyracks.storage.common.buffercache.ICachedPage;
import org.apache.hyracks.util.annotations.CriticalPath;
/**
@@ -96,8 +97,9 @@
public void cleanupArtifacts() throws HyracksDataException {
if (!cleanedUpArtifacts) {
cleanedUpArtifacts = true;
- for (IChainedComponentBulkLoader lsmOperation : bulkloaderChain) {
- lsmOperation.cleanupArtifacts();
+ final int bulkloadersCount = bulkloaderChain.size();
+ for (int i = 0; i < bulkloadersCount; i++) {
+ bulkloaderChain.get(i).cleanupArtifacts();;
}
}
diskComponent.deactivateAndDestroy();
@@ -106,8 +108,9 @@
@Override
public void end() throws HyracksDataException {
if (!cleanedUpArtifacts) {
- for (IChainedComponentBulkLoader lsmOperation : bulkloaderChain) {
- lsmOperation.end();
+ final int bulkloadersCount = bulkloaderChain.size();
+ for (int i = 0; i < bulkloadersCount; i++) {
+ bulkloaderChain.get(i).end();
}
if (isEmptyComponent && cleanupEmptyComponent) {
cleanupArtifacts();
@@ -118,8 +121,9 @@
@Override
public void abort() throws HyracksDataException {
operation.setStatus(LSMIOOperationStatus.FAILURE);
- for (IChainedComponentBulkLoader lsmOperation : bulkloaderChain) {
- lsmOperation.abort();
+ final int bulkloadersCount = bulkloaderChain.size();
+ for (int i = 0; i < bulkloadersCount; i++) {
+ bulkloaderChain.get(i).abort();
}
}
@@ -127,4 +131,31 @@
public ILSMIOOperation getOperation() {
return operation;
}
+
+ @Override
+ public void writeFailed(ICachedPage page, Throwable failure) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public boolean hasFailed() {
+ final int bulkloadersCount = bulkloaderChain.size();
+ for (int i = 0; i < bulkloadersCount; i++) {
+ if (bulkloaderChain.get(i).hasFailed()) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ @Override
+ public Throwable getFailure() {
+ final int bulkloadersCount = bulkloaderChain.size();
+ for (int i = 0; i < bulkloadersCount; i++) {
+ if (bulkloaderChain.get(i).hasFailed()) {
+ return bulkloaderChain.get(i).getFailure();
+ }
+ }
+ return null;
+ }
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/EmptyComponent.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/EmptyComponent.java
index 4c2ddb6..a2d9fdd 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/EmptyComponent.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/EmptyComponent.java
@@ -31,6 +31,7 @@
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation.LSMIOOperationType;
import org.apache.hyracks.storage.am.lsm.common.api.LSMOperationType;
import org.apache.hyracks.storage.common.IIndex;
+import org.apache.hyracks.storage.common.buffercache.IPageWriteFailureCallback;
public class EmptyComponent implements ILSMDiskComponent {
public static final EmptyComponent INSTANCE = new EmptyComponent();
@@ -105,7 +106,7 @@
}
@Override
- public void markAsValid(boolean persist) throws HyracksDataException {
+ public void markAsValid(boolean persist, IPageWriteFailureCallback callback) throws HyracksDataException {
// No Op
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/ExternalIndexHarness.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/ExternalIndexHarness.java
index ab70ba1..854e541 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/ExternalIndexHarness.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/ExternalIndexHarness.java
@@ -192,9 +192,18 @@
}
}
+ @SuppressWarnings("squid:S1181")
@Override
- public void addBulkLoadedComponent(ILSMDiskComponent c) throws HyracksDataException {
- c.markAsValid(lsmIndex.isDurable());
+ public void addBulkLoadedComponent(ILSMIOOperation ioOperation) throws HyracksDataException {
+ ILSMDiskComponent c = ioOperation.getNewComponent();
+ try {
+ c.markAsValid(lsmIndex.isDurable(), ioOperation);
+ } catch (Throwable th) {
+ ioOperation.setFailure(th);
+ }
+ if (ioOperation.hasFailed()) {
+ throw HyracksDataException.create(ioOperation.getFailure());
+ }
synchronized (opTracker) {
lsmIndex.addDiskComponent(c);
if (replicationEnabled) {
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/FilterBulkLoader.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/FilterBulkLoader.java
index 43d2b0d..880f5be 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/FilterBulkLoader.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/FilterBulkLoader.java
@@ -26,6 +26,7 @@
import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentFilter;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentFilterManager;
import org.apache.hyracks.storage.common.MultiComparator;
+import org.apache.hyracks.storage.common.buffercache.ICachedPage;
public class FilterBulkLoader implements IChainedComponentBulkLoader {
@@ -79,4 +80,19 @@
filterTuple.reset(tuple);
filter.update(filterTuple, filterCmp, NoOpOperationCallback.INSTANCE);
}
+
+ @Override
+ public void writeFailed(ICachedPage page, Throwable failure) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public boolean hasFailed() {
+ return false;
+ }
+
+ @Override
+ public Throwable getFailure() {
+ return null;
+ }
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/IChainedComponentBulkLoader.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/IChainedComponentBulkLoader.java
index 90ef127..1361c79 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/IChainedComponentBulkLoader.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/IChainedComponentBulkLoader.java
@@ -20,8 +20,9 @@
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
+import org.apache.hyracks.storage.common.buffercache.IPageWriteFailureCallback;
-public interface IChainedComponentBulkLoader {
+public interface IChainedComponentBulkLoader extends IPageWriteFailureCallback {
/**
* Adds a tuple to the bulkloaded component
*
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/IndexWithBuddyBulkLoader.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/IndexWithBuddyBulkLoader.java
index 4fb2919..394126d 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/IndexWithBuddyBulkLoader.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/IndexWithBuddyBulkLoader.java
@@ -22,6 +22,7 @@
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
import org.apache.hyracks.storage.common.IIndexBulkLoader;
+import org.apache.hyracks.storage.common.buffercache.ICachedPage;
public class IndexWithBuddyBulkLoader implements IChainedComponentBulkLoader {
@@ -69,4 +70,24 @@
bulkLoader.abort();
buddyBTreeBulkLoader.abort();
}
+
+ @Override
+ public void writeFailed(ICachedPage page, Throwable failure) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public boolean hasFailed() {
+ return bulkLoader.hasFailed() || buddyBTreeBulkLoader.hasFailed();
+ }
+
+ @Override
+ public Throwable getFailure() {
+ if (bulkLoader.hasFailed()) {
+ return bulkLoader.getFailure();
+ } else if (buddyBTreeBulkLoader.hasFailed()) {
+ return buddyBTreeBulkLoader.getFailure();
+ }
+ return null;
+ }
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java
index 4d840b0..3eea0a9 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java
@@ -537,7 +537,7 @@
operation.setNewComponent(newComponent);
operation.getCallback().afterOperation(operation);
if (newComponent != null) {
- newComponent.markAsValid(lsmIndex.isDurable());
+ newComponent.markAsValid(lsmIndex.isDurable(), operation);
}
} catch (Throwable e) { // NOSONAR Must catch all
operation.setStatus(LSMIOOperationStatus.FAILURE);
@@ -613,9 +613,18 @@
return operation;
}
+ @SuppressWarnings("squid:S1181")
@Override
- public void addBulkLoadedComponent(ILSMDiskComponent c) throws HyracksDataException {
- c.markAsValid(lsmIndex.isDurable());
+ public void addBulkLoadedComponent(ILSMIOOperation ioOperation) throws HyracksDataException {
+ ILSMDiskComponent c = ioOperation.getNewComponent();
+ try {
+ c.markAsValid(lsmIndex.isDurable(), ioOperation);
+ } catch (Throwable th) {
+ ioOperation.setFailure(th);
+ }
+ if (ioOperation.hasFailed()) {
+ throw HyracksDataException.create(ioOperation.getFailure());
+ }
synchronized (opTracker) {
lsmIndex.addDiskComponent(c);
if (replicationEnabled) {
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMIndexBulkLoader.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMIndexBulkLoader.java
index 84857f4..977697b 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMIndexBulkLoader.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMIndexBulkLoader.java
@@ -23,6 +23,7 @@
import org.apache.hyracks.storage.am.common.impls.AbstractTreeIndex.AbstractTreeIndexBulkLoader;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMTreeTupleWriter;
import org.apache.hyracks.storage.common.IIndexBulkLoader;
+import org.apache.hyracks.storage.common.buffercache.ICachedPage;
public class LSMIndexBulkLoader implements IChainedComponentBulkLoader {
private final IIndexBulkLoader bulkLoader;
@@ -64,4 +65,19 @@
public void abort() throws HyracksDataException {
bulkLoader.abort();
}
+
+ @Override
+ public void writeFailed(ICachedPage page, Throwable failure) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public boolean hasFailed() {
+ return bulkLoader.hasFailed();
+ }
+
+ @Override
+ public Throwable getFailure() {
+ return bulkLoader.getFailure();
+ }
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMIndexDiskComponentBulkLoader.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMIndexDiskComponentBulkLoader.java
index 10074f9..3a43ba7 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMIndexDiskComponentBulkLoader.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMIndexDiskComponentBulkLoader.java
@@ -25,6 +25,7 @@
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation.LSMIOOperationStatus;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexOperationContext;
import org.apache.hyracks.storage.common.IIndexBulkLoader;
+import org.apache.hyracks.storage.common.buffercache.ICachedPage;
public class LSMIndexDiskComponentBulkLoader implements IIndexBulkLoader {
private final AbstractLSMIndex lsmIndex;
@@ -79,7 +80,7 @@
}
if (opCtx.getIoOperation().getStatus() == LSMIOOperationStatus.SUCCESS
&& opCtx.getIoOperation().getNewComponent().getComponentSize() > 0) {
- lsmIndex.getHarness().addBulkLoadedComponent(opCtx.getIoOperation().getNewComponent());
+ lsmIndex.getHarness().addBulkLoadedComponent(opCtx.getIoOperation());
}
} finally {
lsmIndex.getIOOperationCallback().completed(opCtx.getIoOperation());
@@ -100,4 +101,19 @@
}
}
+ @Override
+ public void writeFailed(ICachedPage page, Throwable failure) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public boolean hasFailed() {
+ return opCtx.getIoOperation().hasFailed();
+ }
+
+ @Override
+ public Throwable getFailure() {
+ return opCtx.getIoOperation().getFailure();
+ }
+
}
\ No newline at end of file
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/NoOpIoOperation.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/NoOpIoOperation.java
index f57c4ef..5ee1503 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/NoOpIoOperation.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/NoOpIoOperation.java
@@ -29,6 +29,7 @@
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
import org.apache.hyracks.storage.am.lsm.common.api.IoOperationCompleteListener;
+import org.apache.hyracks.storage.common.buffercache.ICachedPage;
public class NoOpIoOperation implements ILSMIOOperation {
public static final NoOpIoOperation INSTANCE = new NoOpIoOperation();
@@ -126,4 +127,14 @@
return null;
}
+ @Override
+ public void writeFailed(ICachedPage page, Throwable failure) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public boolean hasFailed() {
+ return false;
+ }
+
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/TracedIOOperation.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/TracedIOOperation.java
index f1172f3..3345e3a 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/TracedIOOperation.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/TracedIOOperation.java
@@ -29,6 +29,7 @@
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
import org.apache.hyracks.storage.am.lsm.common.api.IoOperationCompleteListener;
+import org.apache.hyracks.storage.common.buffercache.ICachedPage;
import org.apache.hyracks.util.trace.ITracer;
import org.apache.hyracks.util.trace.ITracer.Scope;
import org.apache.hyracks.util.trace.TraceUtils;
@@ -162,4 +163,14 @@
public Map<String, Object> getParameters() {
return ioOp.getParameters();
}
+
+ @Override
+ public void writeFailed(ICachedPage page, Throwable failure) {
+ ioOp.writeFailed(page, failure);
+ }
+
+ @Override
+ public boolean hasFailed() {
+ return ioOp.hasFailed();
+ }
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/util/ComponentUtils.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/util/ComponentUtils.java
index 4b7f338..1ff9fa8 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/util/ComponentUtils.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/util/ComponentUtils.java
@@ -33,6 +33,7 @@
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMMemoryComponent;
import org.apache.hyracks.storage.common.buffercache.IBufferCache;
+import org.apache.hyracks.storage.common.buffercache.IPageWriteFailureCallback;
import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -190,12 +191,14 @@
}
}
- public static void markAsValid(ITreeIndex treeIndex, boolean forceToDisk) throws HyracksDataException {
+ public static void markAsValid(ITreeIndex treeIndex, boolean forceToDisk, IPageWriteFailureCallback callback)
+ throws HyracksDataException {
int fileId = treeIndex.getFileId();
IBufferCache bufferCache = treeIndex.getBufferCache();
- treeIndex.getPageManager().close();
- // WARNING: flushing the metadata page should be done after releasing the write latch; otherwise, the page
- // won't be flushed to disk because it won't be dirty until the write latch has been released.
+ treeIndex.getPageManager().close(callback);
+ if (callback.hasFailed()) {
+ throw HyracksDataException.create(callback.getFailure());
+ }
// Force modified metadata page to disk.
// If the index is not durable, then the flush is not necessary.
if (forceToDisk) {
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexDiskComponent.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexDiskComponent.java
index b030e83..41e72cd 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexDiskComponent.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexDiskComponent.java
@@ -34,6 +34,7 @@
import org.apache.hyracks.storage.am.lsm.invertedindex.ondisk.OnDiskInvertedIndex;
import org.apache.hyracks.storage.common.IIndexBulkLoader;
import org.apache.hyracks.storage.common.buffercache.IBufferCache;
+import org.apache.hyracks.storage.common.buffercache.IPageWriteFailureCallback;
public class LSMInvertedIndexDiskComponent extends AbstractLSMWithBuddyDiskComponent {
@@ -102,15 +103,19 @@
}
@Override
- public void markAsValid(boolean persist) throws HyracksDataException {
+ public void markAsValid(boolean persist, IPageWriteFailureCallback callback) throws HyracksDataException {
ComponentUtils.markAsValid(getBloomFilterBufferCache(), getBloomFilter(), persist);
// Flush inverted index second.
invIndex.getBufferCache().force((invIndex).getInvListsFileId(), true);
- ComponentUtils.markAsValid(getMetadataHolder(), persist);
-
- // Flush deleted keys BTree.
- ComponentUtils.markAsValid(getBuddyIndex(), persist);
+ ComponentUtils.markAsValid(getMetadataHolder(), persist, callback);
+ if (!callback.hasFailed()) {
+ // Flush deleted keys BTree.
+ ComponentUtils.markAsValid(getBuddyIndex(), persist, callback);
+ }
+ if (callback.hasFailed()) {
+ throw HyracksDataException.create(callback.getFailure());
+ }
}
@Override
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/ondisk/OnDiskInvertedIndex.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/ondisk/OnDiskInvertedIndex.java
index c3c9c21..0b504a6 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/ondisk/OnDiskInvertedIndex.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/ondisk/OnDiskInvertedIndex.java
@@ -61,6 +61,7 @@
import org.apache.hyracks.storage.common.buffercache.IBufferCache;
import org.apache.hyracks.storage.common.buffercache.ICachedPage;
import org.apache.hyracks.storage.common.buffercache.IFIFOPageQueue;
+import org.apache.hyracks.storage.common.buffercache.PageWriteFailureCallback;
import org.apache.hyracks.storage.common.file.BufferedFileHandle;
/**
@@ -231,7 +232,8 @@
listCursor.open(initState, null);
}
- public abstract class AbstractOnDiskInvertedIndexBulkLoader implements IIndexBulkLoader {
+ public abstract class AbstractOnDiskInvertedIndexBulkLoader extends PageWriteFailureCallback
+ implements IIndexBulkLoader {
protected final ArrayTupleBuilder btreeTupleBuilder;
protected final ArrayTupleReference btreeTupleReference;
protected final IIndexBulkLoader btreeBulkloader;
@@ -272,7 +274,7 @@
}
protected void pinNextPage() throws HyracksDataException {
- queue.put(currentPage);
+ queue.put(currentPage, this);
currentPageId++;
currentPage = bufferCache.confiscatePage(BufferedFileHandle.getDiskPageId(fileId, currentPageId));
}
@@ -352,10 +354,13 @@
btreeBulkloader.end();
if (currentPage != null) {
- queue.put(currentPage);
+ queue.put(currentPage, this);
}
invListsMaxPageId = currentPageId;
bufferCache.finishQueue();
+ if (hasFailed()) {
+ throw HyracksDataException.create(getFailure());
+ }
}
@Override
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/ExternalRTree.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/ExternalRTree.java
index f902153..5e8bd0b 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/ExternalRTree.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/ExternalRTree.java
@@ -59,6 +59,7 @@
import org.apache.hyracks.storage.common.ISearchOperationCallback;
import org.apache.hyracks.storage.common.ISearchPredicate;
import org.apache.hyracks.storage.common.buffercache.IBufferCache;
+import org.apache.hyracks.storage.common.buffercache.ICachedPage;
import org.apache.hyracks.util.trace.ITracer;
/**
@@ -499,12 +500,15 @@
if (isTransaction) {
// Since this is a transaction component, validate and
// deactivate. it could later be added or deleted
- component.markAsValid(durable);
- ioOpCallback.afterFinalize(loadOp);
+ try {
+ component.markAsValid(durable, loadOp);
+ } finally {
+ ioOpCallback.afterFinalize(loadOp);
+ }
component.deactivate();
} else {
ioOpCallback.afterFinalize(loadOp);
- getHarness().addBulkLoadedComponent(component);
+ getHarness().addBulkLoadedComponent(loadOp);
}
}
} finally {
@@ -529,6 +533,21 @@
ioOpCallback.completed(loadOp);
}
}
+
+ @Override
+ public void writeFailed(ICachedPage page, Throwable failure) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public boolean hasFailed() {
+ return loadOp.hasFailed();
+ }
+
+ @Override
+ public Throwable getFailure() {
+ return loadOp.getFailure();
+ }
}
// The only change the the schedule merge is the method used to create the
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-rtree/src/main/java/org/apache/hyracks/storage/am/rtree/impls/RTree.java b/hyracks-fullstack/hyracks/hyracks-storage-am-rtree/src/main/java/org/apache/hyracks/storage/am/rtree/impls/RTree.java
index 0e455c5..f12f423 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-rtree/src/main/java/org/apache/hyracks/storage/am/rtree/impls/RTree.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-rtree/src/main/java/org/apache/hyracks/storage/am/rtree/impls/RTree.java
@@ -941,11 +941,10 @@
propagateBulk(1, false, pagesToWrite);
leafFrontier.pageId = freePageManager.takePage(metaFrame);
- queue.put(leafFrontier.page);
+ queue.put(leafFrontier.page, this);
for (ICachedPage c : pagesToWrite) {
- queue.put(c);
+ queue.put(c, this);
}
-
pagesToWrite.clear();
leafFrontier.page = bufferCache
.confiscatePage(BufferedFileHandle.getDiskPageId(getFileId(), leafFrontier.pageId));
@@ -975,7 +974,7 @@
}
for (ICachedPage c : pagesToWrite) {
- queue.put(c);
+ queue.put(c, this);
}
finish();
super.end();
@@ -1011,7 +1010,7 @@
((RTreeNSMFrame) lowerFrame).adjustMBR();
interiorFrameTupleWriter.writeTupleFields(((RTreeNSMFrame) lowerFrame).getMBRTuples(), 0, mbr, 0);
}
- queue.put(n.page);
+ queue.put(n.page, this);
n.page = null;
prevPageId = n.pageId;
}
@@ -1021,7 +1020,6 @@
protected void propagateBulk(int level, boolean toRoot, List<ICachedPage> pagesToWrite)
throws HyracksDataException {
- boolean propagated = false;
if (level == 1) {
lowerFrame = leafFrame;
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/pom.xml b/hyracks-fullstack/hyracks/hyracks-storage-common/pom.xml
index 67d32dd..a76fe48 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-common/pom.xml
+++ b/hyracks-fullstack/hyracks/hyracks-storage-common/pom.xml
@@ -54,6 +54,11 @@
<version>${project.version}</version>
</dependency>
<dependency>
+ <groupId>org.apache.hyracks</groupId>
+ <artifactId>hyracks-util</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-api</artifactId>
</dependency>
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/IIndexBulkLoader.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/IIndexBulkLoader.java
index d6c954e..138705f 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/IIndexBulkLoader.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/IIndexBulkLoader.java
@@ -20,8 +20,9 @@
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
+import org.apache.hyracks.storage.common.buffercache.IPageWriteFailureCallback;
-public interface IIndexBulkLoader {
+public interface IIndexBulkLoader extends IPageWriteFailureCallback {
/**
* Append a tuple to the index in the context of a bulk load.
*
@@ -36,6 +37,7 @@
/**
* Finalize the bulk loading operation in the given context and release all resources.
+ * After this method is called, caller can't add more tuples nor abort
*
* @throws HyracksDataException
* If the BufferCache throws while un/pinning or un/latching.
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/AsyncFIFOPageQueueManager.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/AsyncFIFOPageQueueManager.java
index dbead1e..589d697 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/AsyncFIFOPageQueueManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/AsyncFIFOPageQueueManager.java
@@ -24,15 +24,19 @@
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.storage.common.file.BufferedFileHandle;
+import org.apache.hyracks.util.ExitUtil;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
public class AsyncFIFOPageQueueManager implements Runnable {
- private final static boolean DEBUG = false;
+ private static final boolean DEBUG = false;
+ private static final Logger LOGGER = LogManager.getLogger();
- protected LinkedBlockingQueue<ICachedPage> queue = new LinkedBlockingQueue<ICachedPage>();
+ protected LinkedBlockingQueue<ICachedPage> queue = new LinkedBlockingQueue<>();
volatile Thread writerThread;
protected AtomicBoolean poisoned = new AtomicBoolean(false);
protected BufferCache bufferCache;
- volatile protected PageQueue pageQueue;
+ protected volatile PageQueue pageQueue;
public AsyncFIFOPageQueueManager(BufferCache bufferCache) {
this.bufferCache = bufferCache;
@@ -57,17 +61,27 @@
return writer;
}
+ @SuppressWarnings("squid:S2142")
@Override
- public void put(ICachedPage page) throws HyracksDataException {
+ public void put(ICachedPage page, IPageWriteFailureCallback callback) throws HyracksDataException {
+ failIfPreviousPageFailed(callback);
+ page.setFailureCallback(callback);
try {
if (!poisoned.get()) {
queue.put(page);
} else {
- throw new HyracksDataException("Queue is closing");
+ LOGGER.error("An attempt to write a page found buffer cache closed");
+ ExitUtil.halt(ExitUtil.EC_ABNORMAL_TERMINATION);
}
} catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw HyracksDataException.create(e);
+ LOGGER.error("IO Operation interrupted", e);
+ ExitUtil.halt(ExitUtil.EC_ABNORMAL_TERMINATION);
+ }
+ }
+
+ private void failIfPreviousPageFailed(IPageWriteFailureCallback callback) throws HyracksDataException {
+ if (callback.hasFailed()) {
+ throw HyracksDataException.create(callback.getFailure());
}
}
}
@@ -136,18 +150,21 @@
}
}
+ @SuppressWarnings("squid:S2142")
@Override
public void run() {
- if (DEBUG)
- System.out.println("[FIFO] Writer started");
+ if (DEBUG) {
+ LOGGER.info("[FIFO] Writer started");
+ }
boolean die = false;
while (!die) {
ICachedPage entry;
try {
entry = queue.take();
} catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- break;
+ LOGGER.error("BufferCache Write Queue was interrupted", e);
+ ExitUtil.halt(ExitUtil.EC_ABNORMAL_TERMINATION);
+ return; // Keep compiler happy
}
if (entry.getQueueInfo() != null && entry.getQueueInfo().hasWaiters()) {
synchronized (entry) {
@@ -158,17 +175,11 @@
continue;
}
}
-
- if (DEBUG)
- System.out.println("[FIFO] Write " + BufferedFileHandle.getFileId(((CachedPage) entry).dpid) + ","
+ if (DEBUG) {
+ LOGGER.info("[FIFO] Write " + BufferedFileHandle.getFileId(((CachedPage) entry).dpid) + ","
+ BufferedFileHandle.getPageId(((CachedPage) entry).dpid));
-
- try {
- pageQueue.getWriter().write(entry, bufferCache);
- } catch (HyracksDataException e) {
- //TODO: What do we do, if we could not write the page?
- e.printStackTrace();
}
+ pageQueue.getWriter().write(entry, bufferCache);
}
}
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/CachedPage.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/CachedPage.java
index 02eb8bf..6ec12aa 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/CachedPage.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/CachedPage.java
@@ -23,10 +23,14 @@
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantReadWriteLock;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
/**
* @author yingyib
*/
public class CachedPage implements ICachedPageInternal {
+ private static final Logger LOGGER = LogManager.getLogger();
final int cpid;
ByteBuffer buffer;
public final AtomicInteger pinCount;
@@ -44,6 +48,7 @@
// DEBUG
private static final boolean DEBUG = false;
private final StackTraceElement[] ctorStack;
+ private IPageWriteFailureCallback failureCallback;
//Constructor for making dummy entry for FIFO queue
public CachedPage() {
@@ -85,6 +90,7 @@
confiscated.set(false);
pageReplacementStrategy.notifyCachePageReset(this);
queueInfo = null;
+ failureCallback = null;
}
public void invalidate() {
@@ -103,11 +109,7 @@
@Override
public boolean isGoodVictim() {
- if (confiscated.get()) {
- return false; // i am not a good victim because i cant flush!
- } else {
- return pinCount.get() == 0;
- }
+ return !confiscated.get() && pinCount.get() == 0;
}
@Override
@@ -205,4 +207,21 @@
public boolean isLargePage() {
return multiplier > 1;
}
+
+ @Override
+ public void setFailureCallback(IPageWriteFailureCallback failureCallback) {
+ if (this.failureCallback != null) {
+ throw new IllegalStateException("failureCallback is already set");
+ }
+ this.failureCallback = failureCallback;
+ }
+
+ @Override
+ public void writeFailed(Exception e) {
+ if (failureCallback != null) {
+ failureCallback.writeFailed(this, e);
+ } else {
+ LOGGER.error("An IO Failure took place but the failure callback is not set", e);
+ }
+ }
}
\ No newline at end of file
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/FIFOLocalWriter.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/FIFOLocalWriter.java
index 856edbc..3d3ce3c 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/FIFOLocalWriter.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/FIFOLocalWriter.java
@@ -15,35 +15,37 @@
package org.apache.hyracks.storage.common.buffercache;
-import org.apache.hyracks.api.exceptions.ErrorCode;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.util.ExitUtil;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
public class FIFOLocalWriter implements IFIFOPageWriter {
+ private static final Logger LOGGER = LogManager.getLogger();
public static final FIFOLocalWriter INSTANCE = new FIFOLocalWriter();
- private static boolean DEBUG = false;
+ private static final boolean DEBUG = false;
private FIFOLocalWriter() {
}
+ @SuppressWarnings("squid:S1181") // System must halt on all IO errors
@Override
- public void write(ICachedPage page, BufferCache bufferCache) throws HyracksDataException {
+ public void write(ICachedPage page, BufferCache bufferCache) {
CachedPage cPage = (CachedPage) page;
try {
bufferCache.write(cPage);
- } catch (HyracksDataException e) {
- if (e.getErrorCode() != ErrorCode.FILE_DOES_NOT_EXIST) {
- throw HyracksDataException.create(e);
- }
+ } catch (Exception e) {
+ page.writeFailed(e);
+ LOGGER.warn("Failed to write page {}", cPage, e);
+ } catch (Throwable th) {
+ // Halt
+ LOGGER.error("FIFOLocalWriter has encountered a fatal error", th);
+ ExitUtil.halt(ExitUtil.EC_ABNORMAL_TERMINATION);
} finally {
bufferCache.returnPage(cPage);
if (DEBUG) {
- System.out.println("[FIFO] Return page: " + cPage.cpid + "," + cPage.dpid);
+ LOGGER.error("[FIFO] Return page: {}, {}", cPage.cpid, cPage.dpid);
}
}
}
- @Override
- public void sync(int fileId, BufferCache bufferCache) throws HyracksDataException {
- bufferCache.force(fileId, true);
- }
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/HaltOnFailureCallback.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/HaltOnFailureCallback.java
new file mode 100644
index 0000000..0b748e1
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/HaltOnFailureCallback.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.storage.common.buffercache;
+
+import org.apache.hyracks.util.ExitUtil;
+
+public class HaltOnFailureCallback implements IPageWriteFailureCallback {
+ public static final HaltOnFailureCallback INSTANCE = new HaltOnFailureCallback();
+
+ private HaltOnFailureCallback() {
+ }
+
+ @Override
+ public void writeFailed(ICachedPage page, Throwable failure) {
+ ExitUtil.halt(ExitUtil.EC_ABNORMAL_TERMINATION);
+ }
+
+ @Override
+ public boolean hasFailed() {
+ return false;
+ }
+
+ @Override
+ public Throwable getFailure() {
+ return null;
+ }
+
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/ICachedPage.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/ICachedPage.java
index 16837b9..cfbb145 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/ICachedPage.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/ICachedPage.java
@@ -44,10 +44,14 @@
void setDiskPageId(long dpid);
+ void setFailureCallback(IPageWriteFailureCallback callback);
+
/**
* Check if a page is a large page
*
* @return true if the page is large, false otherwise
*/
boolean isLargePage();
+
+ void writeFailed(Exception e);
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/ICachedPageInternal.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/ICachedPageInternal.java
index c500286..d900852 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/ICachedPageInternal.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/ICachedPageInternal.java
@@ -19,13 +19,16 @@
package org.apache.hyracks.storage.common.buffercache;
public interface ICachedPageInternal extends ICachedPage {
- public int getCachedPageId();
+ int getCachedPageId();
- public long getDiskPageId();
+ long getDiskPageId();
- public Object getReplacementStrategyObject();
+ Object getReplacementStrategyObject();
- public boolean isGoodVictim();
+ /**
+ * @return true if can be evicted, false otherwise
+ */
+ boolean isGoodVictim();
void setFrameSizeMultiplier(int multiplier);
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/IFIFOPageQueue.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/IFIFOPageQueue.java
index 6c03671..189c402 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/IFIFOPageQueue.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/IFIFOPageQueue.java
@@ -19,5 +19,20 @@
@FunctionalInterface
public interface IFIFOPageQueue {
- void put(ICachedPage page) throws HyracksDataException;
+
+ /**
+ * Put a page in the write queue
+ *
+ * @param page
+ * the page to be written
+ * @param callback
+ * callback in case of a failure
+ * @throws HyracksDataException
+ * if the callback has already failed. This indicates a failure writing a previous page
+ * in the same operation.
+ * Note: having this failure at this place removes the need to check for failures with
+ * every add() call in the bulk loader and so, we check per page given to disk rather
+ * than per tuple given to loader. At the same time, it allows the bulk load to fail early.
+ */
+ void put(ICachedPage page, IPageWriteFailureCallback callback) throws HyracksDataException;
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/IFIFOPageWriter.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/IFIFOPageWriter.java
index 567c01e..26fd414 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/IFIFOPageWriter.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/IFIFOPageWriter.java
@@ -15,10 +15,7 @@
package org.apache.hyracks.storage.common.buffercache;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-
+@FunctionalInterface
public interface IFIFOPageWriter {
- public void write(ICachedPage page, BufferCache bufferCache) throws HyracksDataException;
-
- void sync(int fileId, BufferCache bufferCache) throws HyracksDataException;
+ void write(ICachedPage page, BufferCache bufferCache);
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/IPageWriteFailureCallback.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/IPageWriteFailureCallback.java
new file mode 100644
index 0000000..da9cb6a
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/IPageWriteFailureCallback.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.storage.common.buffercache;
+
+public interface IPageWriteFailureCallback {
+
+ /**
+ * Notify that an async write operation has failed
+ *
+ * @param page
+ * @param failure
+ */
+ void writeFailed(ICachedPage page, Throwable failure);
+
+ /**
+ * @return true if the callback has received any failure
+ */
+ boolean hasFailed();
+
+ /**
+ * @return a failure writing to disk or null if no failure has been seen
+ * This doesn't guarantee which failure is returned but that if one or more failures occurred
+ * while trying to write to disk, one of those failures is returned. All other failures are expected
+ * to be logged.
+ */
+ Throwable getFailure();
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/PageWriteFailureCallback.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/PageWriteFailureCallback.java
new file mode 100644
index 0000000..c11e596
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/PageWriteFailureCallback.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.storage.common.buffercache;
+
+public class PageWriteFailureCallback implements IPageWriteFailureCallback {
+
+ private volatile Throwable failure;
+
+ @Override
+ public final void writeFailed(ICachedPage page, Throwable failure) {
+ if (this.failure == null) {
+ this.failure = failure;
+ }
+ }
+
+ @Override
+ public final boolean hasFailed() {
+ return failure != null;
+ }
+
+ @Override
+ public final Throwable getFailure() {
+ return failure;
+ }
+
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/VirtualPage.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/VirtualPage.java
index d7ec4e9..b2a1ff2 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/VirtualPage.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/VirtualPage.java
@@ -147,4 +147,14 @@
str.append("}");
return str.toString();
}
+
+ @Override
+ public void setFailureCallback(IPageWriteFailureCallback callback) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void writeFailed(Exception e) {
+ throw new UnsupportedOperationException();
+ }
}
diff --git a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-btree-test/src/test/java/org/apache/hyracks/storage/am/btree/DiskBTreeSearchCursorTest.java b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-btree-test/src/test/java/org/apache/hyracks/storage/am/btree/DiskBTreeSearchCursorTest.java
index c77bea0..853a2ab 100644
--- a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-btree-test/src/test/java/org/apache/hyracks/storage/am/btree/DiskBTreeSearchCursorTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-btree-test/src/test/java/org/apache/hyracks/storage/am/btree/DiskBTreeSearchCursorTest.java
@@ -183,6 +183,9 @@
bulkloader.add(tuple);
}
bulkloader.end();
+ if (bulkloader.hasFailed()) {
+ throw HyracksDataException.create(bulkloader.getFailure());
+ }
}
}
diff --git a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-common-test/src/test/java/org/apache/hyracks/storage/common/BufferCacheTest.java b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-common-test/src/test/java/org/apache/hyracks/storage/common/BufferCacheTest.java
index f94914c..d0d02a9 100644
--- a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-common-test/src/test/java/org/apache/hyracks/storage/common/BufferCacheTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-common-test/src/test/java/org/apache/hyracks/storage/common/BufferCacheTest.java
@@ -33,7 +33,6 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
-import org.apache.commons.lang3.mutable.Mutable;
import org.apache.commons.lang3.mutable.MutableObject;
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.exceptions.HyracksDataException;
@@ -41,6 +40,7 @@
import org.apache.hyracks.api.io.FileReference;
import org.apache.hyracks.api.io.IIOManager;
import org.apache.hyracks.storage.common.buffercache.CachedPage;
+import org.apache.hyracks.storage.common.buffercache.HaltOnFailureCallback;
import org.apache.hyracks.storage.common.buffercache.IBufferCache;
import org.apache.hyracks.storage.common.buffercache.ICachedPage;
import org.apache.hyracks.storage.common.file.BufferedFileHandle;
@@ -96,7 +96,7 @@
long dpid = BufferedFileHandle.getDiskPageId(fileId, i);
ICachedPage page = bufferCache.confiscatePage(dpid);
page.getBuffer().putInt(0, i);
- bufferCache.createFIFOQueue().put(page);
+ bufferCache.createFIFOQueue().put(page, HaltOnFailureCallback.INSTANCE);
}
bufferCache.finishQueue();
bufferCache.closeFile(fileId);