Merge branch 'gerrit/mad-hatter'
Change-Id: I1ed2ad00b1fb6ef5fe70e7a1d6a753d8da59e269
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/HaltCallback.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/HaltCallback.java
index 17a4f46..9802001 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/HaltCallback.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/HaltCallback.java
@@ -20,7 +20,6 @@
import org.apache.hyracks.storage.am.lsm.common.api.IIoOperationFailedCallback;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation;
-import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation.LSMIOOperationType;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationScheduler;
import org.apache.hyracks.util.ExitUtil;
import org.apache.logging.log4j.LogManager;
@@ -42,8 +41,6 @@
@Override
public void operationFailed(ILSMIOOperation operation, Throwable t) {
LOGGER.error("Operation {} has failed", operation, t);
- if (operation.getIOOpertionType() == LSMIOOperationType.FLUSH) {
- ExitUtil.halt(ExitUtil.EC_FLUSH_FAILED);
- }
+ ExitUtil.halt(ExitUtil.EC_IO_OPERATION_FAILED);
}
}
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/storage/PersistentLocalResourceRepositoryTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/storage/PersistentLocalResourceRepositoryTest.java
index 00d2d3d..50118fc 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/storage/PersistentLocalResourceRepositoryTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/storage/PersistentLocalResourceRepositoryTest.java
@@ -171,6 +171,59 @@
Assert.assertFalse(indexMetadataMaskFile.exists());
}
+ @Test
+ public void deleteMaskedMergedFiles() throws Exception {
+ final INcApplicationContext ncAppCtx = (INcApplicationContext) integrationUtil.ncs[0].getApplicationContext();
+ final String nodeId = ncAppCtx.getServiceContext().getNodeId();
+ final String datasetName = "ds";
+ TestDataUtil.createIdOnlyDataset(datasetName);
+ final Dataset dataset = TestDataUtil.getDataset(integrationUtil, datasetName);
+ final String indexPath = TestDataUtil.getIndexPath(integrationUtil, dataset, nodeId);
+ FileReference indexDirRef = ncAppCtx.getIoManager().resolve(indexPath);
+ int compSeqStart = 100;
+ int validComponentSequence = 103;
+ // advance valid component seq in checkpoint
+ PersistentLocalResourceRepository localResourceRepository =
+ (PersistentLocalResourceRepository) ncAppCtx.getLocalResourceRepository();
+ LocalResource localResource = localResourceRepository.get(indexPath);
+ DatasetResourceReference drr = DatasetResourceReference.of(localResource);
+ IIndexCheckpointManagerProvider indexCheckpointManagerProvider = ncAppCtx.getIndexCheckpointManagerProvider();
+ IIndexCheckpointManager indexCheckpointManager = indexCheckpointManagerProvider.get(drr);
+ indexCheckpointManager.advanceValidComponentSequence(validComponentSequence);
+ // create components to be merged
+ String btree = "_b";
+ String filter = "_f";
+ String indexDir = indexDirRef.getFile().getAbsolutePath();
+ for (int i = compSeqStart; i <= validComponentSequence; i++) {
+ String componentId = i + "_" + i;
+ Path btreePath = Paths.get(indexDir, componentId + btree);
+ Path filterPath = Paths.get(indexDir, componentId + filter);
+ Files.createFile(btreePath);
+ Files.createFile(filterPath);
+ }
+ // create masked merged component
+ String mergedComponentId = compSeqStart + "_" + validComponentSequence;
+ Path mergedBtreePath = Paths.get(indexDir, mergedComponentId + btree);
+ Path mergedFilterPath = Paths.get(indexDir, mergedComponentId + filter);
+ Path mergeMaskPath = Paths.get(indexDir, StorageConstants.COMPONENT_MASK_FILE_PREFIX + mergedComponentId);
+ Files.createFile(mergedBtreePath);
+ Files.createFile(mergedFilterPath);
+ Files.createFile(mergeMaskPath);
+ // cleanup storage and ensure merged component files were deleted while individual files still exist
+ DatasetLocalResource lr = (DatasetLocalResource) localResourceRepository.get(indexPath).getResource();
+ localResourceRepository.cleanup(lr.getPartition());
+ Assert.assertFalse(mergedBtreePath.toFile().exists());
+ Assert.assertFalse(mergedFilterPath.toFile().exists());
+ Assert.assertFalse(mergeMaskPath.toFile().exists());
+ for (int i = compSeqStart; i <= validComponentSequence; i++) {
+ String componentId = i + "_" + i;
+ Path btreePath = Paths.get(indexDir, componentId + btree);
+ Path filterPath = Paths.get(indexDir, componentId + filter);
+ Assert.assertTrue(btreePath.toFile().exists());
+ Assert.assertTrue(filterPath.toFile().exists());
+ }
+ }
+
private void ensureInvalidComponentDeleted(String indexDir, String componentSeq,
PersistentLocalResourceRepository localResourceRepository, DatasetLocalResource lr) throws IOException {
Path btreePath = Paths.get(indexDir,
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/GlobalVirtualBufferCache.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/GlobalVirtualBufferCache.java
index e3bd13d..45594eb 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/GlobalVirtualBufferCache.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/GlobalVirtualBufferCache.java
@@ -463,7 +463,7 @@
scheduleFlush();
} catch (Throwable e) {
LOGGER.error("Unexpected exception when trying to schedule flushes.", e);
- ExitUtil.halt(ExitUtil.EC_FLUSH_FAILED);
+ ExitUtil.halt(ExitUtil.EC_IO_SCHEDULER_FAILED);
}
}
}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMIOOperationCallback.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMIOOperationCallback.java
index 0aa46a8..c3737da 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMIOOperationCallback.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMIOOperationCallback.java
@@ -19,6 +19,10 @@
package org.apache.asterix.common.ioopcallbacks;
+import static org.apache.asterix.common.storage.ResourceReference.getComponentSequence;
+
+import java.nio.file.Path;
+import java.nio.file.Paths;
import java.util.ArrayDeque;
import java.util.Collection;
import java.util.Deque;
@@ -28,8 +32,10 @@
import org.apache.asterix.common.context.DatasetInfo;
import org.apache.asterix.common.storage.IIndexCheckpointManagerProvider;
import org.apache.asterix.common.storage.ResourceReference;
+import org.apache.asterix.common.utils.StorageConstants;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.io.FileReference;
+import org.apache.hyracks.api.util.IoUtil;
import org.apache.hyracks.data.std.primitive.LongPointable;
import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
import org.apache.hyracks.storage.am.common.api.IMetadataPageManager;
@@ -83,7 +89,15 @@
@Override
public void beforeOperation(ILSMIOOperation operation) throws HyracksDataException {
- // No Op
+ if (isMerge(operation)) {
+ FileReference operationMaskFilePath = getOperationMaskFilePath(operation);
+ // if a merge operation is attempted after a failure, its mask file may already exists
+ if (!operationMaskFilePath.getFile().exists()) {
+ IoUtil.create(operationMaskFilePath);
+ } else {
+ LOGGER.warn("merge operation mask file {} already exists", operationMaskFilePath);
+ }
+ }
}
@Override
@@ -121,6 +135,8 @@
} else if (operation.getIOOpertionType() == LSMIOOperationType.FLUSH
|| operation.getIOOpertionType() == LSMIOOperationType.LOAD) {
addComponentToCheckpoint(operation);
+ } else if (isMerge(operation)) {
+ IoUtil.delete(getOperationMaskFilePath(operation));
}
}
@@ -277,4 +293,18 @@
public void allocated(ILSMMemoryComponent component) throws HyracksDataException {
// no op
}
+
+ private boolean isMerge(ILSMIOOperation operation) {
+ return operation.getIOOpertionType() == LSMIOOperationType.MERGE
+ && operation.getAccessor().getOpContext().getOperation() != IndexOperation.DELETE_COMPONENTS;
+ }
+
+ private static FileReference getOperationMaskFilePath(ILSMIOOperation operation) {
+ FileReference target = operation.getTarget();
+ final String componentSequence = getComponentSequence(target.getFile().getAbsolutePath());
+ Path idxRelPath = Paths.get(target.getRelativePath()).getParent();
+ Path maskFileRelPath =
+ Paths.get(idxRelPath.toString(), StorageConstants.COMPONENT_MASK_FILE_PREFIX + componentSequence);
+ return new FileReference(target.getDeviceHandle(), maskFileRelPath.toString());
+ }
}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/ResourceReference.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/ResourceReference.java
index 9d934e4..c3b6229 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/ResourceReference.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/ResourceReference.java
@@ -24,6 +24,7 @@
import org.apache.asterix.common.utils.StorageConstants;
import org.apache.commons.lang3.StringUtils;
+import org.apache.hyracks.storage.am.lsm.common.impls.IndexComponentFileReference;
public class ResourceReference {
@@ -126,4 +127,17 @@
public String toString() {
return getRelativePath().toString();
}
+
+ /**
+ * Gets a component sequence based on its unique timestamp.
+ * e.g. a component file 1_3_b
+ * will return a component sequence 1_3
+ *
+ * @param componentFile any component file
+ * @return The component sequence
+ */
+ public static String getComponentSequence(String componentFile) {
+ final ResourceReference ref = of(componentFile);
+ return IndexComponentFileReference.of(ref.getName()).getSequence();
+ }
}
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ComponentMaskTask.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ComponentMaskTask.java
index 181a75b..3f04bd2 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ComponentMaskTask.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ComponentMaskTask.java
@@ -28,10 +28,10 @@
import org.apache.asterix.common.api.INcApplicationContext;
import org.apache.asterix.common.exceptions.ReplicationException;
+import org.apache.asterix.common.storage.ResourceReference;
import org.apache.asterix.common.utils.StorageConstants;
import org.apache.asterix.replication.api.IReplicaTask;
import org.apache.asterix.replication.api.IReplicationWorker;
-import org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.io.FileReference;
import org.apache.hyracks.api.io.IIOManager;
@@ -63,7 +63,7 @@
final IIOManager ioManager = appCtx.getIoManager();
final FileReference localPath = ioManager.resolve(componentFile);
final Path resourceDir = Files.createDirectories(localPath.getFile().getParentFile().toPath());
- final String componentSequence = PersistentLocalResourceRepository.getComponentSequence(componentFile);
+ final String componentSequence = ResourceReference.getComponentSequence(componentFile);
return Paths.get(resourceDir.toString(), StorageConstants.COMPONENT_MASK_FILE_PREFIX + componentSequence);
}
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
index 31f5171..145be86 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
@@ -18,6 +18,7 @@
*/
package org.apache.asterix.transaction.management.resource;
+import static org.apache.asterix.common.storage.ResourceReference.getComponentSequence;
import static org.apache.asterix.common.utils.StorageConstants.INDEX_CHECKPOINT_FILE_PREFIX;
import static org.apache.asterix.common.utils.StorageConstants.METADATA_FILE_NAME;
import static org.apache.hyracks.api.exceptions.ErrorCode.CANNOT_CREATE_FILE;
@@ -593,19 +594,6 @@
return Paths.get(resourceFile.getFile().getParentFile().getAbsolutePath(), METADATA_FILE_MASK_NAME);
}
- /**
- * Gets a component sequence based on its unique timestamp.
- * e.g. a component file 1_3_b
- * will return a component sequence 1_3
- *
- * @param componentFile any component file
- * @return The component sequence
- */
- public static String getComponentSequence(String componentFile) {
- final ResourceReference ref = ResourceReference.of(componentFile);
- return IndexComponentFileReference.of(ref.getName()).getSequence();
- }
-
private static boolean isComponentMask(File mask) {
return mask.getName().startsWith(StorageConstants.COMPONENT_MASK_FILE_PREFIX);
}
diff --git a/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/ExitUtil.java b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/ExitUtil.java
index f8bc9f9..4991f86 100644
--- a/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/ExitUtil.java
+++ b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/ExitUtil.java
@@ -58,7 +58,7 @@
public static final int EC_IO_SCHEDULER_FAILED = 55;
public static final int EC_HALT_SHUTDOWN_TIMED_OUT = 66;
public static final int EC_HALT_WATCHDOG_FAILED = 77;
- public static final int EC_FLUSH_FAILED = 88;
+ public static final int EC_IO_OPERATION_FAILED = 88;
public static final int EC_TERMINATE_NC_SERVICE_DIRECTIVE = 99;
private static final ExitThread exitThread = new ExitThread();
private static final ShutdownWatchdog watchdogThread = new ShutdownWatchdog();
diff --git a/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/Span.java b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/Span.java
index 183cb6f..bc33e6d 100644
--- a/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/Span.java
+++ b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/Span.java
@@ -21,18 +21,26 @@
import java.util.concurrent.TimeUnit;
public class Span {
- private final long startNanos;
private final long spanNanos;
+ private volatile long startNanos;
private Span(long span, TimeUnit unit) {
- startNanos = System.nanoTime();
spanNanos = unit.toNanos(span);
+ reset();
+ }
+
+ public void reset() {
+ startNanos = System.nanoTime();
}
public long getSpanNanos() {
return spanNanos;
}
+ public long getSpan(TimeUnit unit) {
+ return unit.convert(spanNanos, TimeUnit.NANOSECONDS);
+ }
+
public static Span start(long span, TimeUnit unit) {
return new Span(span, unit);
}
@@ -46,6 +54,15 @@
}
/**
+ * Sleep for the remainder of this span
+ *
+ * @throws InterruptedException
+ */
+ public void sleep() throws InterruptedException {
+ TimeUnit.NANOSECONDS.sleep(remaining(TimeUnit.NANOSECONDS));
+ }
+
+ /**
* Sleep for the minimum of the duration or the remaining of this span
*
* @param sleep