[NO ISSUE][STO] Misc Storage Fixes and Improvements
- user model changes: no
- storage format changes: no
- interface changes: yes
Details:
- This change introduces some improvements to storage
operations.
- Local RecoveryManager is now extensible.
- Bulk loaders now call the IO callback similar to
Flushes, making them less special and creating a
unified lifecycle for adding an index component.
- As a result, The IndexCheckpointManager doesn't need
to have a special treatment for components loaded
through the bulk load operation.
- Component Id have been added to the index checkpoint
files.
- Cleanup for the code of local recovery for failed flush
operations.
- Ensure that after local recovery of flushes, primary
and secondary indexes have the same index for mutable
memory component.
- The use of WAIT logs to ensure in-flight flushes
are scheduled didn't work as expected. A new log type
WAIT_FOR_FLUSHES was introduced to acheive the expected
behavior.
- The local test framework was made Extensible to support
more use cases.
- Test cases were added for component ids in checkpoint files.
The following scenarios were covered:
- Primary and secondary both have values when a flush is
shceduled.
- Primary have values but not secondary when a flush is
scheduled.
- Primary is empty and an index is created through bulk
load.
- Primary has a single component and secondary is created
through bulk load.
- Primary has multiple components and secondary is created
through bulk load.
- Each primary opTracker now keeps a list of ongoing flushes.
- FlushDataset now waits only for flushes only and
not all io operations.
- Previously, we had many flushes scheduled on open datasets.
This was not detected but after this change, a failure
is thrown in such cases.
- Flush operations dont need to extend the comparable
interface anymore since they are FIFO per index.
Change-Id: If24c9baaac2b79e7d1acf47fa2601767388ce988
Reviewed-on: https://asterix-gerrit.ics.uci.edu/2632
Sonar-Qube: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Contrib: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/IndexCheckpointManager.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/IndexCheckpointManager.java
index a012f1e..c1a81a3 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/IndexCheckpointManager.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/IndexCheckpointManager.java
@@ -22,6 +22,7 @@
import java.io.File;
import java.io.FilenameFilter;
import java.io.IOException;
+import java.nio.channels.ClosedByInterruptException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
@@ -54,29 +55,36 @@
}
@Override
- public synchronized void init(long lsn) throws HyracksDataException {
- final List<IndexCheckpoint> checkpoints = getCheckpoints();
+ public synchronized void init(String lastComponentTimestamp, long lsn) throws HyracksDataException {
+ List<IndexCheckpoint> checkpoints;
+ try {
+ checkpoints = getCheckpoints();
+ } catch (ClosedByInterruptException e) {
+ throw HyracksDataException.create(e);
+ }
if (!checkpoints.isEmpty()) {
LOGGER.warn(() -> "Checkpoints found on initializing: " + indexPath);
delete();
}
- IndexCheckpoint firstCheckpoint = IndexCheckpoint.first(lsn);
+ IndexCheckpoint firstCheckpoint = IndexCheckpoint.first(lastComponentTimestamp, lsn);
persist(firstCheckpoint);
}
@Override
- public synchronized void replicated(String componentTimestamp, long masterLsn) throws HyracksDataException {
+ public synchronized void replicated(String componentTimestamp, long masterLsn, long componentId)
+ throws HyracksDataException {
final Long localLsn = getLatest().getMasterNodeFlushMap().get(masterLsn);
if (localLsn == null) {
throw new IllegalStateException("Component flushed before lsn mapping was received");
}
- flushed(componentTimestamp, localLsn);
+ flushed(componentTimestamp, localLsn, componentId);
}
@Override
- public synchronized void flushed(String componentTimestamp, long lsn) throws HyracksDataException {
+ public synchronized void flushed(String componentTimestamp, long lsn, long componentId)
+ throws HyracksDataException {
final IndexCheckpoint latest = getLatest();
- IndexCheckpoint nextCheckpoint = IndexCheckpoint.next(latest, lsn, componentTimestamp);
+ IndexCheckpoint nextCheckpoint = IndexCheckpoint.next(latest, lsn, componentTimestamp, componentId);
persist(nextCheckpoint);
deleteHistory(nextCheckpoint.getId(), HISTORY_CHECKPOINTS);
}
@@ -85,19 +93,19 @@
public synchronized void masterFlush(long masterLsn, long localLsn) throws HyracksDataException {
final IndexCheckpoint latest = getLatest();
latest.getMasterNodeFlushMap().put(masterLsn, localLsn);
- final IndexCheckpoint next =
- IndexCheckpoint.next(latest, latest.getLowWatermark(), latest.getValidComponentTimestamp());
+ final IndexCheckpoint next = IndexCheckpoint.next(latest, latest.getLowWatermark(),
+ latest.getValidComponentTimestamp(), latest.getLastComponentId());
persist(next);
notifyAll();
}
@Override
- public synchronized long getLowWatermark() {
+ public synchronized long getLowWatermark() throws HyracksDataException {
return getLatest().getLowWatermark();
}
@Override
- public synchronized boolean isFlushed(long masterLsn) {
+ public synchronized boolean isFlushed(long masterLsn) throws HyracksDataException {
if (masterLsn == BULKLOAD_LSN) {
return true;
}
@@ -110,18 +118,28 @@
}
@Override
- public Optional<String> getValidComponentTimestamp() {
- final String validComponentTimestamp = getLatest().getValidComponentTimestamp();
+ public Optional<String> getValidComponentTimestamp() throws HyracksDataException {
+ String validComponentTimestamp = getLatest().getValidComponentTimestamp();
return validComponentTimestamp != null ? Optional.of(validComponentTimestamp) : Optional.empty();
}
@Override
- public int getCheckpointCount() {
- return getCheckpoints().size();
+ public int getCheckpointCount() throws HyracksDataException {
+ try {
+ return getCheckpoints().size();
+ } catch (ClosedByInterruptException e) {
+ throw HyracksDataException.create(e);
+ }
}
- private IndexCheckpoint getLatest() {
- final List<IndexCheckpoint> checkpoints = getCheckpoints();
+ @Override
+ public synchronized IndexCheckpoint getLatest() throws HyracksDataException {
+ List<IndexCheckpoint> checkpoints;
+ try {
+ checkpoints = getCheckpoints();
+ } catch (ClosedByInterruptException e) {
+ throw HyracksDataException.create(e);
+ }
if (checkpoints.isEmpty()) {
throw new IllegalStateException("Couldn't find any checkpoints for resource: " + indexPath);
}
@@ -129,13 +147,34 @@
return checkpoints.get(0);
}
- private List<IndexCheckpoint> getCheckpoints() {
+ @Override
+ public synchronized void setLastComponentId(long componentId) throws HyracksDataException {
+ final IndexCheckpoint latest = getLatest();
+ final IndexCheckpoint next = IndexCheckpoint.next(latest, latest.getLowWatermark(),
+ latest.getValidComponentTimestamp(), componentId);
+ persist(next);
+ }
+
+ @Override
+ public synchronized void advanceValidComponentTimestamp(String timestamp) throws HyracksDataException {
+ final IndexCheckpoint latest = getLatest();
+ if (latest.getValidComponentTimestamp() == null
+ || timestamp.compareTo(latest.getValidComponentTimestamp()) > 0) {
+ final IndexCheckpoint next =
+ IndexCheckpoint.next(latest, latest.getLowWatermark(), timestamp, latest.getLastComponentId());
+ persist(next);
+ }
+ }
+
+ private List<IndexCheckpoint> getCheckpoints() throws ClosedByInterruptException {
List<IndexCheckpoint> checkpoints = new ArrayList<>();
final File[] checkpointFiles = indexPath.toFile().listFiles(CHECKPOINT_FILE_FILTER);
if (checkpointFiles != null) {
for (File checkpointFile : checkpointFiles) {
try {
checkpoints.add(read(checkpointFile.toPath()));
+ } catch (ClosedByInterruptException e) {
+ throw e;
} catch (IOException e) {
LOGGER.warn(() -> "Couldn't read index checkpoint file: " + checkpointFile, e);
}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/IndexCheckpointManagerProvider.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/IndexCheckpointManagerProvider.java
index 19ad8f6..e0b3105 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/IndexCheckpointManagerProvider.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/IndexCheckpointManagerProvider.java
@@ -25,6 +25,7 @@
import org.apache.asterix.common.storage.IIndexCheckpointManager;
import org.apache.asterix.common.storage.IIndexCheckpointManagerProvider;
import org.apache.asterix.common.storage.ResourceReference;
+import org.apache.asterix.common.utils.StoragePathUtil;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.io.IIOManager;
@@ -53,14 +54,10 @@
private IndexCheckpointManager create(ResourceReference ref) {
try {
- final Path indexPath = getIndexPath(ref);
+ final Path indexPath = StoragePathUtil.getIndexPath(ioManager, ref);
return new IndexCheckpointManager(indexPath);
} catch (HyracksDataException e) {
throw new IllegalStateException(e);
}
}
-
- private Path getIndexPath(ResourceReference indexRef) throws HyracksDataException {
- return ioManager.resolve(indexRef.getRelativePath().toString()).getFile().toPath();
- }
}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
index c3b1bae..c3201c3 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
@@ -60,6 +60,7 @@
import org.apache.asterix.common.storage.IReplicaManager;
import org.apache.asterix.common.transactions.IRecoveryManager;
import org.apache.asterix.common.transactions.IRecoveryManager.SystemState;
+import org.apache.asterix.common.transactions.IRecoveryManagerFactory;
import org.apache.asterix.common.transactions.ITransactionSubsystem;
import org.apache.asterix.external.library.ExternalLibraryManager;
import org.apache.asterix.file.StorageComponentProvider;
@@ -168,7 +169,7 @@
}
@Override
- public void initialize(boolean initialRun) throws IOException {
+ public void initialize(IRecoveryManagerFactory recoveryManagerFactory, boolean initialRun) throws IOException {
ioManager = getServiceContext().getIoManager();
threadExecutor =
MaintainedThreadNameExecutorService.newCachedThreadPool(getServiceContext().getThreadFactory());
@@ -183,7 +184,7 @@
new PersistentLocalResourceRepositoryFactory(ioManager, indexCheckpointManagerProvider);
localResourceRepository =
(PersistentLocalResourceRepository) persistentLocalResourceRepositoryFactory.createRepository();
- txnSubsystem = new TransactionSubsystem(this);
+ txnSubsystem = new TransactionSubsystem(this, recoveryManagerFactory);
IRecoveryManager recoveryMgr = txnSubsystem.getRecoveryManager();
SystemState systemState = recoveryMgr.getSystemState();
if (initialRun || systemState == SystemState.PERMANENT_DATA_LOSS) {
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java
index efd173f..5e8a5e8 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java
@@ -77,6 +77,8 @@
import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentId.IdCompareResult;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentIdGenerator;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation.LSMIOOperationStatus;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexOperationContext;
@@ -102,14 +104,14 @@
private static final String RECOVERY_FILES_DIR_NAME = "recovery_temp";
private Map<Long, JobEntityCommits> jobId2WinnerEntitiesMap = null;
private final long cachedEntityCommitsPerJobSize;
- private final PersistentLocalResourceRepository localResourceRepository;
+ protected final PersistentLocalResourceRepository localResourceRepository;
private final ICheckpointManager checkpointManager;
private SystemState state;
- private final INCServiceContext serviceCtx;
- private final INcApplicationContext appCtx;
+ protected final INCServiceContext serviceCtx;
+ protected final INcApplicationContext appCtx;
private static final TxnId recoveryTxnId = new TxnId(-1);
- public RecoveryManager(ITransactionSubsystem txnSubsystem, INCServiceContext serviceCtx) {
+ public RecoveryManager(INCServiceContext serviceCtx, ITransactionSubsystem txnSubsystem) {
this.serviceCtx = serviceCtx;
this.txnSubsystem = txnSubsystem;
this.appCtx = txnSubsystem.getApplicationContext();
@@ -225,6 +227,7 @@
break;
case LogType.FLUSH:
case LogType.WAIT:
+ case LogType.WAIT_FOR_FLUSHES:
case LogType.MARKER:
case LogType.FILTER:
break;
@@ -392,7 +395,6 @@
logRecord = logReader.next();
continue;
}
- idGenerator.refresh();
DatasetInfo dsInfo = datasetLifecycleManager.getDatasetInfo(datasetId);
// we only need to flush open indexes here (opened by previous update records)
// if an index has no ongoing updates, then it's memory component must be empty
@@ -401,23 +403,15 @@
if (iInfo.isOpen() && iInfo.getPartition() == partition) {
maxDiskLastLsn = resourceId2MaxLSNMap.get(iInfo.getResourceId());
index = iInfo.getIndex();
- LSMIOOperationCallback ioCallback =
- (LSMIOOperationCallback) index.getIOOperationCallback();
if (logRecord.getLSN() > maxDiskLastLsn
&& !index.isCurrentMutableComponentEmpty()) {
// schedule flush
- redoFlush(index, logRecord, idGenerator.getId());
+ redoFlush(index, logRecord);
redoCount++;
} else {
- if (index.isMemoryComponentsAllocated()) {
- // if the memory component has been allocated, we
- // force it to receive the same Id
- index.getCurrentMemoryComponent().resetId(idGenerator.getId(), true);
- } else {
- // otherwise, we refresh the id stored in ioCallback
- // to ensure the memory component receives correct Id upon activation
- ioCallback.forceRefreshNextId(idGenerator.getId());
- }
+ // otherwise, do nothing since this component had no records when flush was
+ // scheduled.. TODO: update checkpoint file? and do the
+ // lsn checks from the checkpoint file
}
}
}
@@ -427,6 +421,7 @@
case LogType.ENTITY_COMMIT:
case LogType.ABORT:
case LogType.WAIT:
+ case LogType.WAIT_FOR_FLUSHES:
case LogType.MARKER:
//do nothing
break;
@@ -683,6 +678,7 @@
case LogType.FLUSH:
case LogType.FILTER:
case LogType.WAIT:
+ case LogType.WAIT_FOR_FLUSHES:
case LogType.MARKER:
//ignore
break;
@@ -822,8 +818,7 @@
}
}
- private static void redoFlush(ILSMIndex index, ILogRecord logRecord, ILSMComponentId nextId)
- throws HyracksDataException {
+ private static void redoFlush(ILSMIndex index, ILogRecord logRecord) throws HyracksDataException {
long flushLsn = logRecord.getLSN();
Map<String, Object> flushMap = new HashMap<>();
flushMap.put(LSMIOOperationCallback.KEY_FLUSH_LOG_LSN, flushLsn);
@@ -832,7 +827,7 @@
long minId = logRecord.getFlushingComponentMinId();
long maxId = logRecord.getFlushingComponentMaxId();
ILSMComponentId id = new LSMComponentId(minId, maxId);
- flushMap.put(LSMIOOperationCallback.KEY_NEXT_COMPONENT_ID, nextId);
+ flushMap.put(LSMIOOperationCallback.KEY_NEXT_COMPONENT_ID, index.getCurrentMemoryComponent().getId());
if (!index.getDiskComponents().isEmpty()) {
ILSMDiskComponent diskComponent = index.getDiskComponents().get(0);
ILSMComponentId maxDiskComponentId = diskComponent.getId();
@@ -842,7 +837,17 @@
}
}
index.getCurrentMemoryComponent().resetId(id, true);
- accessor.scheduleFlush();
+ ILSMIOOperation flush = accessor.scheduleFlush();
+ try {
+ flush.sync();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw HyracksDataException.create(e);
+ }
+ if (flush.getStatus() == LSMIOOperationStatus.FAILURE) {
+ throw HyracksDataException.create(flush.getFailure());
+ }
+ index.resetCurrentComponentIndex();
}
private class JobEntityCommits {
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/TransactionSubsystem.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/TransactionSubsystem.java
index 8158096..79c87c0 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/TransactionSubsystem.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/TransactionSubsystem.java
@@ -30,6 +30,7 @@
import org.apache.asterix.common.transactions.ILockManager;
import org.apache.asterix.common.transactions.ILogManager;
import org.apache.asterix.common.transactions.IRecoveryManager;
+import org.apache.asterix.common.transactions.IRecoveryManagerFactory;
import org.apache.asterix.common.transactions.ITransactionManager;
import org.apache.asterix.common.transactions.ITransactionSubsystem;
import org.apache.asterix.transaction.management.service.locking.ConcurrentLockManager;
@@ -59,7 +60,7 @@
private long profilerEntityCommitLogCount = 0;
private EntityCommitProfiler ecp;
- public TransactionSubsystem(INcApplicationContext appCtx) {
+ public TransactionSubsystem(INcApplicationContext appCtx, IRecoveryManagerFactory recoveryManagerFactory) {
this.appCtx = appCtx;
this.id = appCtx.getServiceContext().getNodeId();
this.txnProperties = appCtx.getTransactionProperties();
@@ -78,7 +79,7 @@
}
this.logManager = replicationEnabled ? new LogManagerWithReplication(this) : new LogManager(this);
- this.recoveryManager = new RecoveryManager(this, appCtx.getServiceContext());
+ this.recoveryManager = recoveryManagerFactory.createRecoveryManager(appCtx.getServiceContext(), this);
if (txnProperties.isCommitProfilerEnabled()) {
ecp = new EntityCommitProfiler(this, this.txnProperties.getCommitProfilerReportInterval());
((ExecutorService) appCtx.getThreadExecutor()).submit(ecp);
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
index 494198b..b8c7e15 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
@@ -27,6 +27,7 @@
import org.apache.asterix.api.http.server.ServletConstants;
import org.apache.asterix.api.http.server.StorageApiServlet;
import org.apache.asterix.app.nc.NCAppRuntimeContext;
+import org.apache.asterix.app.nc.RecoveryManager;
import org.apache.asterix.app.replication.message.RegistrationTasksRequestMessage;
import org.apache.asterix.common.api.AsterixThreadFactory;
import org.apache.asterix.common.api.INcApplicationContext;
@@ -44,6 +45,7 @@
import org.apache.asterix.common.transactions.Checkpoint;
import org.apache.asterix.common.transactions.IRecoveryManager;
import org.apache.asterix.common.transactions.IRecoveryManager.SystemState;
+import org.apache.asterix.common.transactions.IRecoveryManagerFactory;
import org.apache.asterix.common.utils.PrintUtil;
import org.apache.asterix.common.utils.Servlets;
import org.apache.asterix.common.utils.StorageConstants;
@@ -121,7 +123,7 @@
}
updateOnNodeJoin();
}
- runtimeContext.initialize(runtimeContext.getNodeProperties().isInitialRun());
+ runtimeContext.initialize(getRecoveryManagerFactory(), runtimeContext.getNodeProperties().isInitialRun());
MessagingProperties messagingProperties = runtimeContext.getMessagingProperties();
IMessageBroker messageBroker = new NCMessageBroker(controllerService, messagingProperties);
this.ncServiceCtx.setMessageBroker(messageBroker);
@@ -144,6 +146,10 @@
performLocalCleanUp();
}
+ protected IRecoveryManagerFactory getRecoveryManagerFactory() {
+ return RecoveryManager::new;
+ }
+
@Override
protected void configureLoggingLevel(Level level) {
super.configureLoggingLevel(level);
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java
index cc70d57..4b245a8 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java
@@ -97,7 +97,8 @@
* main method to run a simple 2 node cluster in-process
* suggested VM arguments: <code>-enableassertions -Xmx2048m -Dfile.encoding=UTF-8</code>
*
- * @param args unused
+ * @param args
+ * unused
*/
public static void main(String[] args) throws Exception {
TestUtils.redirectLoggingToConsole();
@@ -226,11 +227,14 @@
return ncConfig;
}
- protected INCApplication createNCApplication() {
+ protected INCApplication createNCApplication()
+ throws InstantiationException, IllegalAccessException, ClassNotFoundException {
+ // Instead of using this flag, RecoveryManagerTest should set the desired class in its config file
if (!gracefulShutdown) {
return new UngracefulShutdownNCApplication();
}
- return new NCApplication();
+ String ncAppClass = (String) configManager.get(NCConfig.Option.APP_CLASS);
+ return (INCApplication) Class.forName(ncAppClass).newInstance();
}
private NCConfig fixupIODevices(NCConfig ncConfig) throws IOException, AsterixException, CmdLineException {
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
index 5cda9f2..5be349e 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
@@ -37,7 +37,6 @@
import org.apache.asterix.common.dataflow.ICcApplicationContext;
import org.apache.asterix.common.dataflow.LSMInsertDeleteOperatorNodePushable;
import org.apache.asterix.common.exceptions.ACIDException;
-import org.apache.asterix.common.transactions.IRecoveryManager.ResourceType;
import org.apache.asterix.common.transactions.ITransactionManager;
import org.apache.asterix.common.transactions.TxnId;
import org.apache.asterix.dataflow.data.nontagged.MissingWriterFactory;
@@ -58,17 +57,18 @@
import org.apache.asterix.runtime.formats.FormatUtils;
import org.apache.asterix.runtime.formats.NonTaggedDataFormat;
import org.apache.asterix.runtime.job.listener.JobEventListenerFactory;
+import org.apache.asterix.runtime.operators.LSMIndexBulkLoadOperatorDescriptor.BulkLoadUsage;
+import org.apache.asterix.runtime.operators.LSMIndexBulkLoadOperatorNodePushable;
import org.apache.asterix.runtime.operators.LSMPrimaryUpsertOperatorNodePushable;
import org.apache.asterix.runtime.utils.CcApplicationContext;
import org.apache.asterix.test.runtime.ExecutionTestUtil;
-import org.apache.asterix.transaction.management.opcallbacks.AbstractIndexModificationOperationCallback.Operation;
-import org.apache.asterix.transaction.management.opcallbacks.PrimaryIndexModificationOperationCallbackFactory;
import org.apache.asterix.transaction.management.runtime.CommitRuntime;
import org.apache.asterix.transaction.management.service.logging.LogReader;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.algebricks.runtime.base.IPushRuntime;
import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
+import org.apache.hyracks.algebricks.runtime.operators.base.SinkRuntimeFactory;
import org.apache.hyracks.algebricks.runtime.operators.std.AssignRuntimeFactory;
import org.apache.hyracks.algebricks.runtime.operators.std.EmptyTupleSourceRuntimeFactory;
import org.apache.hyracks.api.comm.IFrameWriter;
@@ -166,7 +166,7 @@
public void deInit(boolean cleanupOnStop) throws Exception {
ExternalUDFLibrarian.removeLibraryDir();
- ExecutionTestUtil.tearDown(cleanupOnStop);
+ ExecutionTestUtil.tearDown(cleanupOnStop, runHDFS);
}
public void setOpts(List<Pair<IOption, Object>> opts) {
@@ -186,7 +186,7 @@
return new TxnId(jobId.getId());
}
- public Pair<LSMInsertDeleteOperatorNodePushable, CommitRuntime> getInsertPipeline(IHyracksTaskContext ctx,
+ public Pair<LSMInsertDeleteOperatorNodePushable, IPushRuntime> getInsertPipeline(IHyracksTaskContext ctx,
Dataset dataset, IAType[] primaryKeyTypes, ARecordType recordType, ARecordType metaType, int[] filterFields,
int[] primaryKeyIndexes, List<Integer> primaryKeyIndicators,
StorageComponentProvider storageComponentProvider, Index secondaryIndex)
@@ -195,7 +195,39 @@
primaryKeyIndicators, storageComponentProvider, secondaryIndex, IndexOperation.INSERT);
}
- public Pair<LSMInsertDeleteOperatorNodePushable, CommitRuntime> getInsertPipeline(IHyracksTaskContext ctx,
+ public Pair<SecondaryIndexInfo, LSMIndexBulkLoadOperatorNodePushable> getBulkLoadSecondaryOperator(
+ IHyracksTaskContext ctx, Dataset dataset, IAType[] primaryKeyTypes, ARecordType recordType,
+ ARecordType metaType, int[] filterFields, int[] primaryKeyIndexes, List<Integer> primaryKeyIndicators,
+ StorageComponentProvider storageComponentProvider, Index secondaryIndex, int numElementsHint)
+ throws HyracksDataException, RemoteException, ACIDException, AlgebricksException {
+ try {
+ MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
+ org.apache.hyracks.algebricks.common.utils.Pair<ILSMMergePolicyFactory, Map<String, String>> mergePolicy =
+ DatasetUtil.getMergePolicyFactory(dataset, mdTxnCtx);
+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+ PrimaryIndexInfo primaryIndexInfo = new PrimaryIndexInfo(dataset, primaryKeyTypes, recordType, metaType,
+ mergePolicy.first, mergePolicy.second, filterFields, primaryKeyIndexes, primaryKeyIndicators);
+ SecondaryIndexInfo secondaryIndexInfo = new SecondaryIndexInfo(primaryIndexInfo, secondaryIndex);
+ IIndexDataflowHelperFactory secondaryIndexHelperFactory = new IndexDataflowHelperFactory(
+ storageComponentProvider.getStorageManager(), secondaryIndexInfo.fileSplitProvider);
+ IIndexDataflowHelperFactory primaryIndexHelperFactory = new IndexDataflowHelperFactory(
+ storageComponentProvider.getStorageManager(), primaryIndexInfo.getFileSplitProvider());
+ int[] fieldPermutation = new int[secondaryIndex.getKeyFieldNames().size()];
+ for (int i = 0; i < fieldPermutation.length; i++) {
+ fieldPermutation[i] = i;
+ }
+ LSMIndexBulkLoadOperatorNodePushable op =
+ new LSMIndexBulkLoadOperatorNodePushable(secondaryIndexHelperFactory, primaryIndexHelperFactory,
+ ctx, 0, fieldPermutation, 1.0F, false, numElementsHint, true, secondaryIndexInfo.rDesc,
+ BulkLoadUsage.CREATE_INDEX, dataset.getDatasetId());
+ op.setOutputFrameWriter(0, new SinkRuntimeFactory().createPushRuntime(ctx)[0], null);
+ return Pair.of(secondaryIndexInfo, op);
+ } catch (Throwable th) {
+ throw HyracksDataException.create(th);
+ }
+ }
+
+ public Pair<LSMInsertDeleteOperatorNodePushable, IPushRuntime> getInsertPipeline(IHyracksTaskContext ctx,
Dataset dataset, IAType[] primaryKeyTypes, ARecordType recordType, ARecordType metaType, int[] filterFields,
int[] primaryKeyIndexes, List<Integer> primaryKeyIndicators,
StorageComponentProvider storageComponentProvider, Index secondaryIndex, IndexOperation op)
@@ -210,10 +242,8 @@
MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
PrimaryIndexInfo primaryIndexInfo = new PrimaryIndexInfo(dataset, primaryKeyTypes, recordType, metaType,
mergePolicy.first, mergePolicy.second, filterFields, primaryKeyIndexes, primaryKeyIndicators);
- IModificationOperationCallbackFactory modOpCallbackFactory =
- new PrimaryIndexModificationOperationCallbackFactory(dataset.getDatasetId(),
- primaryIndexInfo.primaryKeyIndexes, TXN_SUBSYSTEM_PROVIDER, Operation.get(op),
- ResourceType.LSM_BTREE);
+ IModificationOperationCallbackFactory modOpCallbackFactory = dataset.getModificationCallbackFactory(
+ storageComponentProvider, primaryIndexInfo.index, op, primaryIndexInfo.primaryKeyIndexes);
IRecordDescriptorProvider recordDescProvider = primaryIndexInfo.getInsertRecordDescriptorProvider();
RecordDescriptor recordDesc =
recordDescProvider.getInputRecordDescriptor(new ActivityId(new OperatorDescriptorId(0), 0), 0);
@@ -272,16 +302,18 @@
secondaryIndexInfo.insertFieldsPermutations, secondaryIndexInfo.rDesc, op, false,
secondaryIndexHelperFactory, secondaryModCallbackFactory, null);
assignOp.setOutputFrameWriter(0, secondaryInsertOp, secondaryIndexInfo.rDesc);
- CommitRuntime commitOp = new CommitRuntime(ctx, getTxnJobId(ctx), dataset.getDatasetId(),
- secondaryIndexInfo.primaryKeyIndexes, true, ctx.getTaskAttemptId().getTaskId().getPartition(),
- true);
+
+ IPushRuntime commitOp =
+ dataset.getCommitRuntimeFactory(mdProvider, secondaryIndexInfo.primaryKeyIndexes, true)
+ .createPushRuntime(ctx)[0];
+
secondaryInsertOp.setOutputFrameWriter(0, commitOp, secondaryIndexInfo.rDesc);
commitOp.setInputRecordDescriptor(0, secondaryIndexInfo.rDesc);
return Pair.of(insertOp, commitOp);
} else {
- CommitRuntime commitOp = new CommitRuntime(ctx, getTxnJobId(ctx), dataset.getDatasetId(),
- primaryIndexInfo.primaryKeyIndexes, true, ctx.getTaskAttemptId().getTaskId().getPartition(),
- true);
+ IPushRuntime commitOp =
+ dataset.getCommitRuntimeFactory(mdProvider, primaryIndexInfo.primaryKeyIndexes, true)
+ .createPushRuntime(ctx)[0];
insertOp.setOutputFrameWriter(0, commitOp, primaryIndexInfo.rDesc);
commitOp.setInputRecordDescriptor(0, primaryIndexInfo.rDesc);
return Pair.of(insertOp, commitOp);
@@ -380,7 +412,6 @@
(ICcApplicationContext) ExecutionTestUtil.integrationUtil.cc.getApplicationContext(), dataverse);
SecondaryIndexInfo secondaryIndexInfo = new SecondaryIndexInfo(primaryIndexInfo, secondaryIndex);
try {
-
IResourceFactory resourceFactory = primaryIndexInfo.dataset.getResourceFactory(mdProvider, secondaryIndex,
primaryIndexInfo.recordType, primaryIndexInfo.metaType, mergePolicy.first, mergePolicy.second);
IndexBuilderFactory indexBuilderFactory =
@@ -419,9 +450,9 @@
secondaryIndexSerdes[i] =
SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(secondaryKeyTypes[i]);
}
- for (; i < primaryKeyTypes.length; i++) {
- secondaryIndexSerdes[i] =
- SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(primaryKeyTypes[i]);
+ for (; i < secondaryKeyTypes.length + primaryKeyTypes.length; i++) {
+ secondaryIndexSerdes[i] = SerializerDeserializerProvider.INSTANCE
+ .getSerializerDeserializer(primaryKeyTypes[i - secondaryKeyTypes.length]);
}
return secondaryIndexSerdes;
}
@@ -447,8 +478,9 @@
for (; i < secondaryKeyTypes.length; i++) {
secondaryIndexTypeTraits[i] = TypeTraitProvider.INSTANCE.getTypeTrait(secondaryKeyTypes[i]);
}
- for (; i < primaryKeyTypes.length; i++) {
- secondaryIndexTypeTraits[i] = TypeTraitProvider.INSTANCE.getTypeTrait(primaryKeyTypes[i]);
+ for (; i < secondaryKeyTypes.length + primaryKeyTypes.length; i++) {
+ secondaryIndexTypeTraits[i] =
+ TypeTraitProvider.INSTANCE.getTypeTrait(primaryKeyTypes[i - secondaryKeyTypes.length]);
}
return secondaryIndexTypeTraits;
}
@@ -491,12 +523,14 @@
}
public static class SecondaryIndexInfo {
- private final int[] primaryKeyIndexes;
- private final PrimaryIndexInfo primaryIndexInfo;
- private final Index secondaryIndex;
- private final ConstantFileSplitProvider fileSplitProvider;
- private final RecordDescriptor rDesc;
- private final int[] insertFieldsPermutations;
+ final int[] primaryKeyIndexes;
+ final PrimaryIndexInfo primaryIndexInfo;
+ final Index secondaryIndex;
+ final ConstantFileSplitProvider fileSplitProvider;
+ final ISerializerDeserializer<?>[] secondaryIndexSerdes;
+ final RecordDescriptor rDesc;
+ final int[] insertFieldsPermutations;
+ final ITypeTraits[] secondaryIndexTypeTraits;
public SecondaryIndexInfo(PrimaryIndexInfo primaryIndexInfo, Index secondaryIndex) {
this.primaryIndexInfo = primaryIndexInfo;
@@ -507,11 +541,11 @@
FileSplit[] splits = SplitsAndConstraintsUtil.getIndexSplits(appCtx.getClusterStateManager(),
primaryIndexInfo.dataset, secondaryIndex.getIndexName(), nodes);
fileSplitProvider = new ConstantFileSplitProvider(splits);
- ITypeTraits[] secondaryIndexTypeTraits = createSecondaryIndexTypeTraits(primaryIndexInfo.recordType,
+ secondaryIndexTypeTraits = createSecondaryIndexTypeTraits(primaryIndexInfo.recordType,
primaryIndexInfo.metaType, primaryIndexInfo.primaryKeyTypes,
secondaryIndex.getKeyFieldTypes().toArray(new IAType[secondaryIndex.getKeyFieldTypes().size()]));
- ISerializerDeserializer<?>[] secondaryIndexSerdes = createSecondaryIndexSerdes(primaryIndexInfo.recordType,
- primaryIndexInfo.metaType, primaryIndexInfo.primaryKeyTypes,
+ secondaryIndexSerdes = createSecondaryIndexSerdes(primaryIndexInfo.recordType, primaryIndexInfo.metaType,
+ primaryIndexInfo.primaryKeyTypes,
secondaryIndex.getKeyFieldTypes().toArray(new IAType[secondaryIndex.getKeyFieldTypes().size()]));
rDesc = new RecordDescriptor(secondaryIndexSerdes, secondaryIndexTypeTraits);
insertFieldsPermutations = new int[secondaryIndexTypeTraits.length];
@@ -527,6 +561,10 @@
public IFileSplitProvider getFileSplitProvider() {
return fileSplitProvider;
}
+
+ public ISerializerDeserializer<?>[] getSerdes() {
+ return secondaryIndexSerdes;
+ }
}
public static class PrimaryIndexInfo {
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/data/gen/ABooleanFieldValueGenerator.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/data/gen/ABooleanFieldValueGenerator.java
index 2eba473..ff027e2 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/data/gen/ABooleanFieldValueGenerator.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/data/gen/ABooleanFieldValueGenerator.java
@@ -22,7 +22,7 @@
import java.io.IOException;
import java.util.Random;
-import org.apache.asterix.app.data.gen.TupleGenerator.GenerationFunction;
+import org.apache.asterix.app.data.gen.RecordTupleGenerator.GenerationFunction;
import org.apache.asterix.om.types.ATypeTag;
public class ABooleanFieldValueGenerator implements IAsterixFieldValueGenerator<Boolean> {
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/data/gen/ADoubleFieldValueGenerator.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/data/gen/ADoubleFieldValueGenerator.java
index e698676..64dab3d 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/data/gen/ADoubleFieldValueGenerator.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/data/gen/ADoubleFieldValueGenerator.java
@@ -26,7 +26,7 @@
import java.util.List;
import java.util.Random;
-import org.apache.asterix.app.data.gen.TupleGenerator.GenerationFunction;
+import org.apache.asterix.app.data.gen.RecordTupleGenerator.GenerationFunction;
import org.apache.asterix.om.types.ATypeTag;
public class ADoubleFieldValueGenerator implements IAsterixFieldValueGenerator<Double> {
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/data/gen/AInt32FieldValueGenerator.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/data/gen/AInt32FieldValueGenerator.java
index 7c6556b..5540c11 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/data/gen/AInt32FieldValueGenerator.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/data/gen/AInt32FieldValueGenerator.java
@@ -26,7 +26,7 @@
import java.util.List;
import java.util.Random;
-import org.apache.asterix.app.data.gen.TupleGenerator.GenerationFunction;
+import org.apache.asterix.app.data.gen.RecordTupleGenerator.GenerationFunction;
import org.apache.asterix.om.types.ATypeTag;
public class AInt32FieldValueGenerator implements IAsterixFieldValueGenerator<Integer> {
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/data/gen/AInt64FieldValueGenerator.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/data/gen/AInt64FieldValueGenerator.java
index 2a2496e..e62054a 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/data/gen/AInt64FieldValueGenerator.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/data/gen/AInt64FieldValueGenerator.java
@@ -26,7 +26,7 @@
import java.util.List;
import java.util.Random;
-import org.apache.asterix.app.data.gen.TupleGenerator.GenerationFunction;
+import org.apache.asterix.app.data.gen.RecordTupleGenerator.GenerationFunction;
import org.apache.asterix.om.types.ATypeTag;
public class AInt64FieldValueGenerator implements IAsterixFieldValueGenerator<Long> {
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/data/gen/ARecordValueGenerator.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/data/gen/ARecordValueGenerator.java
index cd4de62..b242189 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/data/gen/ARecordValueGenerator.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/data/gen/ARecordValueGenerator.java
@@ -21,7 +21,7 @@
import java.io.DataOutput;
import java.io.IOException;
-import org.apache.asterix.app.data.gen.TupleGenerator.GenerationFunction;
+import org.apache.asterix.app.data.gen.RecordTupleGenerator.GenerationFunction;
import org.apache.asterix.builders.RecordBuilder;
import org.apache.asterix.om.types.ARecordType;
import org.apache.asterix.om.types.ATypeTag;
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/data/gen/AStringFieldValueGenerator.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/data/gen/AStringFieldValueGenerator.java
index 5ee6d40..419d1b6 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/data/gen/AStringFieldValueGenerator.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/data/gen/AStringFieldValueGenerator.java
@@ -26,7 +26,7 @@
import java.util.List;
import java.util.Random;
-import org.apache.asterix.app.data.gen.TupleGenerator.GenerationFunction;
+import org.apache.asterix.app.data.gen.RecordTupleGenerator.GenerationFunction;
import org.apache.asterix.om.types.ATypeTag;
import org.apache.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerDeserializer;
import org.apache.hyracks.util.string.UTF8StringReader;
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/data/gen/TupleGenerator.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/data/gen/RecordTupleGenerator.java
similarity index 96%
rename from asterixdb/asterix-app/src/test/java/org/apache/asterix/app/data/gen/TupleGenerator.java
rename to asterixdb/asterix-app/src/test/java/org/apache/asterix/app/data/gen/RecordTupleGenerator.java
index 0469349..1b04c1a 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/data/gen/TupleGenerator.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/data/gen/RecordTupleGenerator.java
@@ -26,7 +26,7 @@
import org.apache.asterix.test.common.TestTupleReference;
import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
-public class TupleGenerator {
+public class RecordTupleGenerator {
private final int[] keyIndexes;
private final int[] keyIndicators;
@@ -52,7 +52,7 @@
* @param metaGeneration
* @param uniqueMetaFields
*/
- public TupleGenerator(ARecordType recordType, ARecordType metaType, int[] keyIndexes, int[] keyIndicators,
+ public RecordTupleGenerator(ARecordType recordType, ARecordType metaType, int[] keyIndexes, int[] keyIndicators,
GenerationFunction[] recordGeneration, boolean[] uniqueRecordFields, GenerationFunction[] metaGeneration,
boolean[] uniqueMetaFields) {
this.keyIndexes = keyIndexes;
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/AInt32ValueGenerator.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/AInt32ValueGenerator.java
new file mode 100644
index 0000000..c34f5a3
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/AInt32ValueGenerator.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.test.dataflow;
+
+import org.apache.asterix.om.base.AInt32;
+import org.apache.hyracks.storage.am.common.datagen.IFieldValueGenerator;
+
+public class AInt32ValueGenerator implements IFieldValueGenerator<AInt32> {
+ int counter = 0;
+
+ @Override
+ public AInt32 next() {
+ return new AInt32(counter++);
+ }
+
+ @Override
+ public void reset() {
+ }
+
+}
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/AInt64ValueGenerator.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/AInt64ValueGenerator.java
new file mode 100644
index 0000000..b860737
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/AInt64ValueGenerator.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.test.dataflow;
+
+import org.apache.asterix.om.base.AInt64;
+import org.apache.hyracks.storage.am.common.datagen.IFieldValueGenerator;
+
+public class AInt64ValueGenerator implements IFieldValueGenerator<AInt64> {
+ long counter = 0L;
+
+ @Override
+ public AInt64 next() {
+ return new AInt64(counter++);
+ }
+
+ @Override
+ public void reset() {
+ }
+
+}
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/CheckpointInSecondaryIndexTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/CheckpointInSecondaryIndexTest.java
new file mode 100644
index 0000000..a0ed26e
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/CheckpointInSecondaryIndexTest.java
@@ -0,0 +1,577 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.test.dataflow;
+
+import java.io.File;
+import java.lang.reflect.Field;
+import java.rmi.RemoteException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.asterix.app.bootstrap.TestNodeController;
+import org.apache.asterix.app.bootstrap.TestNodeController.PrimaryIndexInfo;
+import org.apache.asterix.app.bootstrap.TestNodeController.SecondaryIndexInfo;
+import org.apache.asterix.app.data.gen.RecordTupleGenerator;
+import org.apache.asterix.app.data.gen.RecordTupleGenerator.GenerationFunction;
+import org.apache.asterix.app.nc.NCAppRuntimeContext;
+import org.apache.asterix.common.api.IDatasetLifecycleManager;
+import org.apache.asterix.common.config.DatasetConfig.DatasetType;
+import org.apache.asterix.common.config.DatasetConfig.IndexType;
+import org.apache.asterix.common.dataflow.LSMInsertDeleteOperatorNodePushable;
+import org.apache.asterix.common.exceptions.ACIDException;
+import org.apache.asterix.common.storage.IIndexCheckpointManager;
+import org.apache.asterix.common.storage.IIndexCheckpointManagerProvider;
+import org.apache.asterix.common.storage.IndexCheckpoint;
+import org.apache.asterix.common.storage.ResourceReference;
+import org.apache.asterix.common.transactions.ITransactionContext;
+import org.apache.asterix.common.transactions.ITransactionManager;
+import org.apache.asterix.common.transactions.TransactionOptions;
+import org.apache.asterix.external.util.DataflowUtils;
+import org.apache.asterix.file.StorageComponentProvider;
+import org.apache.asterix.metadata.entities.Dataset;
+import org.apache.asterix.metadata.entities.Index;
+import org.apache.asterix.metadata.entities.InternalDatasetDetails;
+import org.apache.asterix.metadata.entities.InternalDatasetDetails.PartitioningStrategy;
+import org.apache.asterix.om.types.ARecordType;
+import org.apache.asterix.om.types.BuiltinType;
+import org.apache.asterix.om.types.IAType;
+import org.apache.asterix.runtime.operators.LSMIndexBulkLoadOperatorNodePushable;
+import org.apache.asterix.test.common.TestHelper;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.api.comm.VSizeFrame;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.api.util.SingleThreadEventProcessor;
+import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
+import org.apache.hyracks.storage.am.common.api.IIndexDataflowHelper;
+import org.apache.hyracks.storage.am.common.dataflow.IndexDataflowHelperFactory;
+import org.apache.hyracks.storage.am.common.datagen.TupleGenerator;
+import org.apache.hyracks.storage.am.common.datagen.IFieldValueGenerator;
+import org.apache.hyracks.storage.am.lsm.btree.impl.TestLsmBtree;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexFileManager;
+import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndex;
+import org.apache.hyracks.storage.am.lsm.common.impls.LSMComponentId;
+import org.apache.hyracks.storage.am.lsm.common.impls.NoMergePolicyFactory;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.runners.Parameterized;
+
+public class CheckpointInSecondaryIndexTest {
+ static final int REPREAT_TEST_COUNT = 1;
+
+ @Parameterized.Parameters
+ public static List<Object[]> data() {
+ return Arrays.asList(new Object[REPREAT_TEST_COUNT][0]);
+ }
+
+ private static final IAType[] KEY_TYPES = { BuiltinType.AINT32 };
+ private static final ARecordType RECORD_TYPE = new ARecordType("TestRecordType", new String[] { "key", "value" },
+ new IAType[] { BuiltinType.AINT32, BuiltinType.AINT64 }, false);
+ private static final GenerationFunction[] RECORD_GEN_FUNCTION =
+ { GenerationFunction.DETERMINISTIC, GenerationFunction.DETERMINISTIC };
+ private static final boolean[] UNIQUE_RECORD_FIELDS = { true, false };
+ private static final ARecordType META_TYPE = null;
+ private static final GenerationFunction[] META_GEN_FUNCTION = null;
+ private static final boolean[] UNIQUE_META_FIELDS = null;
+ private static final int[] KEY_INDEXES = { 0 };
+ private static final int[] KEY_INDICATORS = { Index.RECORD_INDICATOR };
+ private static final List<Integer> KEY_INDICATORS_LIST = Arrays.asList(new Integer[] { Index.RECORD_INDICATOR });
+ private static final int RECORDS_PER_COMPONENT = 500;
+ private static final int DATASET_ID = 101;
+ private static final String DATAVERSE_NAME = "TestDV";
+ private static final String DATASET_NAME = "TestDS";
+ private static final String INDEX_NAME = "TestIdx";
+ private static final String DATA_TYPE_NAME = "DUMMY";
+ private static final String NODE_GROUP_NAME = "DEFAULT";
+ private static final IndexType INDEX_TYPE = IndexType.BTREE;
+ private static final IFieldValueGenerator[] SECONDARY_INDEX_VALUE_GENERATOR =
+ { new AInt64ValueGenerator(), new AInt32ValueGenerator() };
+ private static final List<List<String>> INDEX_FIELD_NAMES =
+ Arrays.asList(Arrays.asList(RECORD_TYPE.getFieldNames()[1]));
+ private static final List<Integer> INDEX_FIELD_INDICATORS = Arrays.asList(Index.RECORD_INDICATOR);
+ private static final List<IAType> INDEX_FIELD_TYPES = Arrays.asList(BuiltinType.AINT64);
+ private static final StorageComponentProvider storageManager = new StorageComponentProvider();
+ private static TestNodeController nc;
+ private static NCAppRuntimeContext ncAppCtx;
+ private static IDatasetLifecycleManager dsLifecycleMgr;
+ private static Dataset dataset;
+ private static Index secondaryIndex;
+ private static ITransactionContext txnCtx;
+ private static TestLsmBtree primaryLsmBtree;
+ private static TestLsmBtree secondaryLsmBtree;
+ private static PrimaryIndexInfo primaryIndexInfo;
+ private static IHyracksTaskContext taskCtx;
+ private static IIndexDataflowHelper primaryIndexDataflowHelper;
+ private static IIndexDataflowHelper secondaryIndexDataflowHelper;
+ private static LSMInsertDeleteOperatorNodePushable insertOp;
+ private static LSMIndexBulkLoadOperatorNodePushable indexLoadOp;
+ private static IHyracksTaskContext loadTaskCtx;
+ private static SecondaryIndexInfo secondaryIndexInfo;
+ private static Actor actor;
+
+ @BeforeClass
+ public static void setUp() throws Exception {
+ System.out.println("SetUp: ");
+ TestHelper.deleteExistingInstanceFiles();
+ String configPath = System.getProperty("user.dir") + File.separator + "src" + File.separator + "test"
+ + File.separator + "resources" + File.separator + "cc-multipart.conf";
+ nc = new TestNodeController(configPath, false);
+ nc.init();
+ ncAppCtx = nc.getAppRuntimeContext();
+ dsLifecycleMgr = ncAppCtx.getDatasetLifecycleManager();
+ }
+
+ @AfterClass
+ public static void tearDown() throws Exception {
+ System.out.println("TearDown");
+ nc.deInit();
+ TestHelper.deleteExistingInstanceFiles();
+ }
+
+ @Before
+ public void createIndex() throws Exception {
+ List<List<String>> partitioningKeys = new ArrayList<>();
+ partitioningKeys.add(Collections.singletonList("key"));
+ dataset = new TestDataset(DATAVERSE_NAME, DATASET_NAME, DATAVERSE_NAME, DATA_TYPE_NAME,
+ NODE_GROUP_NAME, NoMergePolicyFactory.NAME, null, new InternalDatasetDetails(null,
+ PartitioningStrategy.HASH, partitioningKeys, null, null, null, false, null),
+ null, DatasetType.INTERNAL, DATASET_ID, 0);
+ secondaryIndex = new Index(DATAVERSE_NAME, DATASET_NAME, INDEX_NAME, INDEX_TYPE, INDEX_FIELD_NAMES,
+ INDEX_FIELD_INDICATORS, INDEX_FIELD_TYPES, false, false, false, 0);
+ taskCtx = null;
+ primaryIndexDataflowHelper = null;
+ secondaryIndexDataflowHelper = null;
+ primaryLsmBtree = null;
+ insertOp = null;
+ JobId jobId = nc.newJobId();
+ txnCtx = nc.getTransactionManager().beginTransaction(nc.getTxnJobId(jobId),
+ new TransactionOptions(ITransactionManager.AtomicityLevel.ENTITY_LEVEL));
+ actor = null;
+ taskCtx = nc.createTestContext(jobId, 0, false);
+ primaryIndexInfo = nc.createPrimaryIndex(dataset, KEY_TYPES, RECORD_TYPE, META_TYPE, null, storageManager,
+ KEY_INDEXES, KEY_INDICATORS_LIST, 0);
+ IndexDataflowHelperFactory iHelperFactory =
+ new IndexDataflowHelperFactory(nc.getStorageManager(), primaryIndexInfo.getFileSplitProvider());
+ primaryIndexDataflowHelper = iHelperFactory.create(taskCtx.getJobletContext().getServiceContext(), 0);
+ primaryIndexDataflowHelper.open();
+ primaryLsmBtree = (TestLsmBtree) primaryIndexDataflowHelper.getIndexInstance();
+ primaryIndexDataflowHelper.close();
+ // This pipeline skips the secondary index
+ insertOp = nc.getInsertPipeline(taskCtx, dataset, KEY_TYPES, RECORD_TYPE, META_TYPE, null, KEY_INDEXES,
+ KEY_INDICATORS_LIST, storageManager, null).getLeft();
+ actor = new Actor("player");
+ // allow all operations
+ StorageTestUtils.allowAllOps(primaryLsmBtree);
+ actor.add(new Request(Request.Action.INSERT_OPEN));
+ }
+
+ @After
+ public void destroyIndex() throws Exception {
+ Request close = new Request(Request.Action.INSERT_CLOSE);
+ actor.add(close);
+ close.await();
+ nc.getTransactionManager().commitTransaction(txnCtx.getTxnId());
+ if (secondaryIndexDataflowHelper != null) {
+ secondaryIndexDataflowHelper.destroy();
+ }
+ primaryIndexDataflowHelper.destroy();
+ actor.stop();
+ }
+
+ @Test
+ public void testCheckpointUpdatedWhenSecondaryIsEmpty() throws Exception {
+ try {
+ // create secondary
+ createSecondaryIndex();
+ actor.add(new Request(Request.Action.INSERT_PATCH));
+ ensureDone(actor);
+ // search now and ensure partition 0 has all the records
+ StorageTestUtils.searchAndAssertCount(nc, 0, dataset, storageManager, RECORDS_PER_COMPONENT);
+ // and that secondary index is empty
+ Assert.assertTrue(secondaryLsmBtree.isCurrentMutableComponentEmpty());
+ // flush
+ actor.add(new Request(Request.Action.FLUSH_DATASET));
+ ensureDone(actor);
+ // ensure primary has a component
+ Assert.assertEquals(1, primaryLsmBtree.getDiskComponents().size());
+
+ // ensure secondary doesn't have a component
+ Assert.assertEquals(0, secondaryLsmBtree.getDiskComponents().size());
+ // ensure that current memory component index match
+ Assert.assertEquals(secondaryLsmBtree.getCurrentMemoryComponentIndex(),
+ primaryLsmBtree.getCurrentMemoryComponentIndex());
+ // ensure both checkpoint files has the same component id as the last flushed component id
+ ILSMDiskComponent primaryDiskComponent = primaryLsmBtree.getDiskComponents().get(0);
+ LSMComponentId id = (LSMComponentId) primaryDiskComponent.getId();
+ long min = id.getMinId();
+ // primary ref
+ Field fileManagerField = AbstractLSMIndex.class.getDeclaredField("fileManager"); //get private declared object from class
+ fileManagerField.setAccessible(true); //Make it accessible so you can access it
+ ILSMIndexFileManager primaryFileManager = (ILSMIndexFileManager) fileManagerField.get(primaryLsmBtree);
+ final ResourceReference primaryRef = ResourceReference
+ .of(primaryFileManager.getRelFlushFileReference().getInsertIndexFileReference().getAbsolutePath());
+ IIndexCheckpointManager primaryCheckpointManager = getIndexCheckpointManagerProvider().get(primaryRef);
+ // secondary ref
+ ILSMIndexFileManager secondaryFileManager = (ILSMIndexFileManager) fileManagerField.get(secondaryLsmBtree);
+ final ResourceReference secondaryRef = ResourceReference.of(
+ secondaryFileManager.getRelFlushFileReference().getInsertIndexFileReference().getAbsolutePath());
+ IIndexCheckpointManager secondaryCheckpointManager = getIndexCheckpointManagerProvider().get(secondaryRef);
+ IndexCheckpoint latestPrimaryCheckpoint = primaryCheckpointManager.getLatest();
+ IndexCheckpoint latestSecondaryCheckpoint = secondaryCheckpointManager.getLatest();
+ Assert.assertEquals(latestPrimaryCheckpoint.getLastComponentId(), min);
+ Assert.assertEquals(latestSecondaryCheckpoint.getLastComponentId(), min);
+ } catch (Throwable e) {
+ e.printStackTrace();
+ throw e;
+ }
+ }
+
+ private void createSecondaryIndex()
+ throws HyracksDataException, RemoteException, ACIDException, AlgebricksException {
+ SecondaryIndexInfo secondaryIndexInfo =
+ nc.createSecondaryIndex(primaryIndexInfo, secondaryIndex, storageManager, 0);
+ IndexDataflowHelperFactory iHelperFactory =
+ new IndexDataflowHelperFactory(nc.getStorageManager(), secondaryIndexInfo.getFileSplitProvider());
+ secondaryIndexDataflowHelper = iHelperFactory.create(taskCtx.getJobletContext().getServiceContext(), 0);
+ secondaryIndexDataflowHelper.open();
+ secondaryLsmBtree = (TestLsmBtree) secondaryIndexDataflowHelper.getIndexInstance();
+ secondaryIndexDataflowHelper.close();
+ }
+
+ @Test
+ public void testCheckpointWhenBulkloadingSecondaryAndPrimaryIsSingleComponent() throws Exception {
+ try {
+ // create secondary
+ actor.add(new Request(Request.Action.INSERT_PATCH));
+ ensureDone(actor);
+ // search now and ensure partition 0 has all the records
+ StorageTestUtils.searchAndAssertCount(nc, 0, dataset, storageManager, RECORDS_PER_COMPONENT);
+ // flush
+ actor.add(new Request(Request.Action.FLUSH_DATASET));
+ ensureDone(actor);
+ // ensure primary has a component
+ Assert.assertEquals(1, primaryLsmBtree.getDiskComponents().size());
+ // ensure both checkpoint files has the same component id as the last flushed component id
+ ILSMDiskComponent primaryDiskComponent = primaryLsmBtree.getDiskComponents().get(0);
+ LSMComponentId id = (LSMComponentId) primaryDiskComponent.getId();
+ long min = id.getMinId();
+ // primary ref
+ Field fileManagerField = AbstractLSMIndex.class.getDeclaredField("fileManager"); //get private declared object from class
+ fileManagerField.setAccessible(true); //Make it accessible so you can access it
+ ILSMIndexFileManager primaryFileManager = (ILSMIndexFileManager) fileManagerField.get(primaryLsmBtree);
+ final ResourceReference primaryRef = ResourceReference
+ .of(primaryFileManager.getRelFlushFileReference().getInsertIndexFileReference().getAbsolutePath());
+ IIndexCheckpointManager primaryCheckpointManager = getIndexCheckpointManagerProvider().get(primaryRef);
+ IndexCheckpoint latestPrimaryCheckpoint = primaryCheckpointManager.getLatest();
+ Assert.assertEquals(latestPrimaryCheckpoint.getLastComponentId(), min);
+ createSecondaryIndex();
+ JobId jobId = nc.newJobId();
+ loadTaskCtx = nc.createTestContext(jobId, 0, false);
+ Pair<SecondaryIndexInfo, LSMIndexBulkLoadOperatorNodePushable> infoAndOp =
+ nc.getBulkLoadSecondaryOperator(loadTaskCtx, dataset, KEY_TYPES, RECORD_TYPE, META_TYPE, null,
+ KEY_INDEXES, KEY_INDICATORS_LIST, storageManager, secondaryIndex, RECORDS_PER_COMPONENT);
+ indexLoadOp = infoAndOp.getRight();
+ secondaryIndexInfo = infoAndOp.getLeft();
+ actor.add(new Request(Request.Action.LOAD_OPEN));
+ actor.add(new Request(Request.Action.INDEX_LOAD_PATCH));
+ actor.add(new Request(Request.Action.LOAD_CLOSE));
+ ensureDone(actor);
+ latestPrimaryCheckpoint = primaryCheckpointManager.getLatest();
+ Assert.assertEquals(latestPrimaryCheckpoint.getLastComponentId(), min);
+ ILSMIndexFileManager secondaryFileManager = (ILSMIndexFileManager) fileManagerField.get(secondaryLsmBtree);
+ final ResourceReference secondaryRef = ResourceReference.of(
+ secondaryFileManager.getRelFlushFileReference().getInsertIndexFileReference().getAbsolutePath());
+ IIndexCheckpointManager secondaryCheckpointManager = getIndexCheckpointManagerProvider().get(secondaryRef);
+ IndexCheckpoint latestSecondaryCheckpoint = secondaryCheckpointManager.getLatest();
+ Assert.assertEquals(latestSecondaryCheckpoint.getLastComponentId(), min);
+ } catch (Throwable e) {
+ e.printStackTrace();
+ throw e;
+ }
+ }
+
+ @Test
+ public void testCheckpointWhenBulkloadingSecondaryAndPrimaryIsTwoComponents() throws Exception {
+ try {
+ // create secondary
+ actor.add(new Request(Request.Action.INSERT_PATCH));
+ ensureDone(actor);
+ // search now and ensure partition 0 has all the records
+ StorageTestUtils.searchAndAssertCount(nc, 0, dataset, storageManager, RECORDS_PER_COMPONENT);
+ // flush
+ actor.add(new Request(Request.Action.FLUSH_DATASET));
+ ensureDone(actor);
+ // ensure primary has a component
+ Assert.assertEquals(1, primaryLsmBtree.getDiskComponents().size());
+ // ensure both checkpoint files has the same component id as the last flushed component id
+ ILSMDiskComponent primaryDiskComponent = primaryLsmBtree.getDiskComponents().get(0);
+ LSMComponentId id = (LSMComponentId) primaryDiskComponent.getId();
+ long min = id.getMinId();
+ // primary ref
+ Field fileManagerField = AbstractLSMIndex.class.getDeclaredField("fileManager"); //get private declared object from class
+ fileManagerField.setAccessible(true); //Make it accessible so you can access it
+ ILSMIndexFileManager primaryFileManager = (ILSMIndexFileManager) fileManagerField.get(primaryLsmBtree);
+ final ResourceReference primaryRef = ResourceReference
+ .of(primaryFileManager.getRelFlushFileReference().getInsertIndexFileReference().getAbsolutePath());
+ IIndexCheckpointManager primaryCheckpointManager = getIndexCheckpointManagerProvider().get(primaryRef);
+ IndexCheckpoint latestPrimaryCheckpoint = primaryCheckpointManager.getLatest();
+ Assert.assertEquals(latestPrimaryCheckpoint.getLastComponentId(), min);
+ actor.add(new Request(Request.Action.INSERT_PATCH));
+ ensureDone(actor);
+ actor.add(new Request(Request.Action.FLUSH_DATASET));
+ ensureDone(actor);
+ Assert.assertEquals(2, primaryLsmBtree.getDiskComponents().size());
+ // ensure both checkpoint files has the same component id as the last flushed component id
+ primaryDiskComponent = primaryLsmBtree.getDiskComponents().get(0);
+ id = (LSMComponentId) primaryDiskComponent.getId();
+ min = id.getMaxId();
+ createSecondaryIndex();
+ JobId jobId = nc.newJobId();
+ loadTaskCtx = nc.createTestContext(jobId, 0, false);
+ Pair<SecondaryIndexInfo, LSMIndexBulkLoadOperatorNodePushable> infoAndOp =
+ nc.getBulkLoadSecondaryOperator(loadTaskCtx, dataset, KEY_TYPES, RECORD_TYPE, META_TYPE, null,
+ KEY_INDEXES, KEY_INDICATORS_LIST, storageManager, secondaryIndex, RECORDS_PER_COMPONENT);
+ indexLoadOp = infoAndOp.getRight();
+ secondaryIndexInfo = infoAndOp.getLeft();
+ actor.add(new Request(Request.Action.LOAD_OPEN));
+ actor.add(new Request(Request.Action.INDEX_LOAD_PATCH));
+ actor.add(new Request(Request.Action.LOAD_CLOSE));
+ ensureDone(actor);
+ latestPrimaryCheckpoint = primaryCheckpointManager.getLatest();
+ Assert.assertEquals(latestPrimaryCheckpoint.getLastComponentId(), min);
+ ILSMIndexFileManager secondaryFileManager = (ILSMIndexFileManager) fileManagerField.get(secondaryLsmBtree);
+ final ResourceReference secondaryRef = ResourceReference.of(
+ secondaryFileManager.getRelFlushFileReference().getInsertIndexFileReference().getAbsolutePath());
+ IIndexCheckpointManager secondaryCheckpointManager = getIndexCheckpointManagerProvider().get(secondaryRef);
+ IndexCheckpoint latestSecondaryCheckpoint = secondaryCheckpointManager.getLatest();
+ Assert.assertEquals(latestSecondaryCheckpoint.getLastComponentId(), min);
+ } catch (Throwable e) {
+ e.printStackTrace();
+ throw e;
+ }
+ }
+
+ @Test
+ public void testCheckpointWhenBulkloadedSecondaryIsEmptyAndPrimaryIsEmpty() throws Exception {
+ try {
+ // ensure primary has no component
+ Assert.assertEquals(0, primaryLsmBtree.getDiskComponents().size());
+ // primary ref
+ Field fileManagerField = AbstractLSMIndex.class.getDeclaredField("fileManager"); //get private declared object from class
+ fileManagerField.setAccessible(true); //Make it accessible so you can access it
+ ILSMIndexFileManager primaryFileManager = (ILSMIndexFileManager) fileManagerField.get(primaryLsmBtree);
+ final ResourceReference primaryRef = ResourceReference
+ .of(primaryFileManager.getRelFlushFileReference().getInsertIndexFileReference().getAbsolutePath());
+ IIndexCheckpointManager primaryCheckpointManager = getIndexCheckpointManagerProvider().get(primaryRef);
+ IndexCheckpoint latestPrimaryCheckpoint = primaryCheckpointManager.getLatest();
+ createSecondaryIndex();
+ JobId jobId = nc.newJobId();
+ loadTaskCtx = nc.createTestContext(jobId, 0, false);
+ Pair<SecondaryIndexInfo, LSMIndexBulkLoadOperatorNodePushable> infoAndOp =
+ nc.getBulkLoadSecondaryOperator(loadTaskCtx, dataset, KEY_TYPES, RECORD_TYPE, META_TYPE, null,
+ KEY_INDEXES, KEY_INDICATORS_LIST, storageManager, secondaryIndex, RECORDS_PER_COMPONENT);
+ indexLoadOp = infoAndOp.getRight();
+ secondaryIndexInfo = infoAndOp.getLeft();
+ actor.add(new Request(Request.Action.LOAD_OPEN));
+ actor.add(new Request(Request.Action.LOAD_CLOSE));
+ ensureDone(actor);
+ latestPrimaryCheckpoint = primaryCheckpointManager.getLatest();
+ ILSMIndexFileManager secondaryFileManager = (ILSMIndexFileManager) fileManagerField.get(secondaryLsmBtree);
+ final ResourceReference secondaryRef = ResourceReference.of(
+ secondaryFileManager.getRelFlushFileReference().getInsertIndexFileReference().getAbsolutePath());
+ IIndexCheckpointManager secondaryCheckpointManager = getIndexCheckpointManagerProvider().get(secondaryRef);
+ IndexCheckpoint latestSecondaryCheckpoint = secondaryCheckpointManager.getLatest();
+ Assert.assertEquals(latestSecondaryCheckpoint.getLastComponentId(),
+ latestPrimaryCheckpoint.getLastComponentId());
+ } catch (Throwable e) {
+ e.printStackTrace();
+ throw e;
+ }
+ }
+
+ @Test
+ public void testCheckpointWhenBulkloadedSecondaryIsEmptyAndPrimaryIsNotEmpty() throws Exception {
+ try {
+ // create secondary
+ actor.add(new Request(Request.Action.INSERT_PATCH));
+ ensureDone(actor);
+ // search now and ensure partition 0 has all the records
+ StorageTestUtils.searchAndAssertCount(nc, 0, dataset, storageManager, RECORDS_PER_COMPONENT);
+ // flush
+ actor.add(new Request(Request.Action.FLUSH_DATASET));
+ ensureDone(actor);
+ // ensure primary has a component
+ Assert.assertEquals(1, primaryLsmBtree.getDiskComponents().size());
+ // ensure both checkpoint files has the same component id as the last flushed component id
+ ILSMDiskComponent primaryDiskComponent = primaryLsmBtree.getDiskComponents().get(0);
+ LSMComponentId id = (LSMComponentId) primaryDiskComponent.getId();
+ long min = id.getMinId();
+ // primary ref
+ Field fileManagerField = AbstractLSMIndex.class.getDeclaredField("fileManager"); //get private declared object from class
+ fileManagerField.setAccessible(true); //Make it accessible so you can access it
+ ILSMIndexFileManager primaryFileManager = (ILSMIndexFileManager) fileManagerField.get(primaryLsmBtree);
+ final ResourceReference primaryRef = ResourceReference
+ .of(primaryFileManager.getRelFlushFileReference().getInsertIndexFileReference().getAbsolutePath());
+ IIndexCheckpointManager primaryCheckpointManager = getIndexCheckpointManagerProvider().get(primaryRef);
+ IndexCheckpoint latestPrimaryCheckpoint = primaryCheckpointManager.getLatest();
+ Assert.assertEquals(latestPrimaryCheckpoint.getLastComponentId(), min);
+ createSecondaryIndex();
+ JobId jobId = nc.newJobId();
+ loadTaskCtx = nc.createTestContext(jobId, 0, false);
+ Pair<SecondaryIndexInfo, LSMIndexBulkLoadOperatorNodePushable> infoAndOp =
+ nc.getBulkLoadSecondaryOperator(loadTaskCtx, dataset, KEY_TYPES, RECORD_TYPE, META_TYPE, null,
+ KEY_INDEXES, KEY_INDICATORS_LIST, storageManager, secondaryIndex, RECORDS_PER_COMPONENT);
+ indexLoadOp = infoAndOp.getRight();
+ secondaryIndexInfo = infoAndOp.getLeft();
+ actor.add(new Request(Request.Action.LOAD_OPEN));
+ actor.add(new Request(Request.Action.LOAD_CLOSE));
+ ensureDone(actor);
+ latestPrimaryCheckpoint = primaryCheckpointManager.getLatest();
+ Assert.assertEquals(latestPrimaryCheckpoint.getLastComponentId(), min);
+ ILSMIndexFileManager secondaryFileManager = (ILSMIndexFileManager) fileManagerField.get(secondaryLsmBtree);
+ final ResourceReference secondaryRef = ResourceReference.of(
+ secondaryFileManager.getRelFlushFileReference().getInsertIndexFileReference().getAbsolutePath());
+ IIndexCheckpointManager secondaryCheckpointManager = getIndexCheckpointManagerProvider().get(secondaryRef);
+ IndexCheckpoint latestSecondaryCheckpoint = secondaryCheckpointManager.getLatest();
+ Assert.assertEquals(latestSecondaryCheckpoint.getLastComponentId(), min);
+ } catch (Throwable e) {
+ e.printStackTrace();
+ throw e;
+ }
+ }
+
+ protected IIndexCheckpointManagerProvider getIndexCheckpointManagerProvider() {
+ return ncAppCtx.getIndexCheckpointManagerProvider();
+ }
+
+ private void ensureDone(Actor actor) throws InterruptedException {
+ Request req = new Request(Request.Action.DUMMY);
+ actor.add(req);
+ req.await();
+ }
+
+ private static class Request {
+ enum Action {
+ DUMMY,
+ INSERT_OPEN,
+ LOAD_OPEN,
+ INSERT_PATCH,
+ INDEX_LOAD_PATCH,
+ FLUSH_DATASET,
+ INSERT_CLOSE,
+ LOAD_CLOSE,
+ }
+
+ private final Action action;
+ private volatile boolean done;
+
+ public Request(Action action) {
+ this.action = action;
+ done = false;
+ }
+
+ synchronized void complete() {
+ done = true;
+ notifyAll();
+ }
+
+ synchronized void await() throws InterruptedException {
+ while (!done) {
+ wait();
+ }
+ }
+ }
+
+ public class Actor extends SingleThreadEventProcessor<Request> {
+ private final RecordTupleGenerator primaryInsertTupleGenerator;
+ private final FrameTupleAppender tupleAppender;
+
+ public Actor(String name) throws HyracksDataException {
+ super(name);
+ primaryInsertTupleGenerator = new RecordTupleGenerator(RECORD_TYPE, META_TYPE, KEY_INDEXES, KEY_INDICATORS,
+ RECORD_GEN_FUNCTION, UNIQUE_RECORD_FIELDS, META_GEN_FUNCTION, UNIQUE_META_FIELDS);
+ tupleAppender = new FrameTupleAppender(new VSizeFrame(taskCtx));
+ }
+
+ @Override
+ protected void handle(Request req) throws Exception {
+ try {
+ switch (req.action) {
+ case FLUSH_DATASET:
+ if (tupleAppender.getTupleCount() > 0) {
+ tupleAppender.write(insertOp, true);
+ }
+ dsLifecycleMgr.flushDataset(dataset.getDatasetId(), false);
+ break;
+ case INSERT_CLOSE:
+ insertOp.close();
+ break;
+ case INSERT_OPEN:
+ insertOp.open();
+ break;
+ case LOAD_OPEN:
+ indexLoadOp.open();
+ break;
+ case LOAD_CLOSE:
+ indexLoadOp.close();
+ break;
+ case INSERT_PATCH:
+ for (int j = 0; j < RECORDS_PER_COMPONENT; j++) {
+ ITupleReference tuple = primaryInsertTupleGenerator.next();
+ DataflowUtils.addTupleToFrame(tupleAppender, tuple, insertOp);
+ }
+ if (tupleAppender.getTupleCount() > 0) {
+ tupleAppender.write(insertOp, true);
+ }
+ StorageTestUtils.waitForOperations(primaryLsmBtree);
+ break;
+ case INDEX_LOAD_PATCH:
+ TupleGenerator secondaryLoadTupleGenerator =
+ new TupleGenerator(SECONDARY_INDEX_VALUE_GENERATOR, secondaryIndexInfo.getSerdes(), 0);
+ FrameTupleAppender secondaryTupleAppender = new FrameTupleAppender(new VSizeFrame(loadTaskCtx));
+ for (int j = 0; j < RECORDS_PER_COMPONENT; j++) {
+ ITupleReference tuple = secondaryLoadTupleGenerator.next();
+ DataflowUtils.addTupleToFrame(secondaryTupleAppender, tuple, indexLoadOp);
+ }
+ if (secondaryTupleAppender.getTupleCount() > 0) {
+ secondaryTupleAppender.write(indexLoadOp, true);
+ }
+ break;
+ default:
+ break;
+ }
+ } catch (Throwable th) {
+ th.printStackTrace();
+ throw th;
+ } finally {
+ req.complete();
+ }
+ }
+ }
+}
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/ComponentRollbackTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/ComponentRollbackTest.java
index a33bda1..017c59f 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/ComponentRollbackTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/ComponentRollbackTest.java
@@ -27,7 +27,7 @@
import org.apache.asterix.app.bootstrap.TestNodeController;
import org.apache.asterix.app.bootstrap.TestNodeController.PrimaryIndexInfo;
-import org.apache.asterix.app.data.gen.TupleGenerator;
+import org.apache.asterix.app.data.gen.RecordTupleGenerator;
import org.apache.asterix.app.nc.NCAppRuntimeContext;
import org.apache.asterix.common.api.IDatasetLifecycleManager;
import org.apache.asterix.common.dataflow.LSMInsertDeleteOperatorNodePushable;
@@ -122,7 +122,7 @@
// allow all operations
StorageTestUtils.allowAllOps(lsmBtree);
insertOp.open();
- TupleGenerator tupleGenerator = StorageTestUtils.getTupleGenerator();
+ RecordTupleGenerator tupleGenerator = StorageTestUtils.getTupleGenerator();
VSizeFrame frame = new VSizeFrame(ctx);
FrameTupleAppender tupleAppender = new FrameTupleAppender(frame);
for (int j = 0; j < StorageTestUtils.TOTAL_NUM_OF_RECORDS; j++) {
@@ -190,7 +190,7 @@
// allow all operations
StorageTestUtils.allowAllOps(lsmBtree);
insertOp.open();
- TupleGenerator tupleGenerator = StorageTestUtils.getTupleGenerator();
+ RecordTupleGenerator tupleGenerator = StorageTestUtils.getTupleGenerator();
VSizeFrame frame = new VSizeFrame(ctx);
FrameTupleAppender tupleAppender = new FrameTupleAppender(frame);
for (int j = 0; j < StorageTestUtils.TOTAL_NUM_OF_RECORDS; j++) {
@@ -272,7 +272,7 @@
StorageTestUtils.allowAllOps(lsmBtree);
lsmBtree.clearSearchCallbacks();
insertOp.open();
- TupleGenerator tupleGenerator = StorageTestUtils.getTupleGenerator();
+ RecordTupleGenerator tupleGenerator = StorageTestUtils.getTupleGenerator();
VSizeFrame frame = new VSizeFrame(ctx);
FrameTupleAppender tupleAppender = new FrameTupleAppender(frame);
for (int j = 0; j < StorageTestUtils.TOTAL_NUM_OF_RECORDS; j++) {
@@ -356,7 +356,7 @@
// allow all operations
StorageTestUtils.allowAllOps(lsmBtree);
insertOp.open();
- TupleGenerator tupleGenerator = StorageTestUtils.getTupleGenerator();
+ RecordTupleGenerator tupleGenerator = StorageTestUtils.getTupleGenerator();
VSizeFrame frame = new VSizeFrame(ctx);
FrameTupleAppender tupleAppender = new FrameTupleAppender(frame);
for (int j = 0; j < StorageTestUtils.TOTAL_NUM_OF_RECORDS; j++) {
@@ -411,7 +411,7 @@
StorageTestUtils.allowAllOps(lsmBtree);
lsmBtree.clearMergeCallbacks();
insertOp.open();
- TupleGenerator tupleGenerator = StorageTestUtils.getTupleGenerator();
+ RecordTupleGenerator tupleGenerator = StorageTestUtils.getTupleGenerator();
VSizeFrame frame = new VSizeFrame(ctx);
FrameTupleAppender tupleAppender = new FrameTupleAppender(frame);
for (int j = 0; j < StorageTestUtils.TOTAL_NUM_OF_RECORDS; j++) {
@@ -474,7 +474,7 @@
// allow all operations
StorageTestUtils.allowAllOps(lsmBtree);
insertOp.open();
- TupleGenerator tupleGenerator = StorageTestUtils.getTupleGenerator();
+ RecordTupleGenerator tupleGenerator = StorageTestUtils.getTupleGenerator();
VSizeFrame frame = new VSizeFrame(ctx);
FrameTupleAppender tupleAppender = new FrameTupleAppender(frame);
for (int j = 0; j < StorageTestUtils.TOTAL_NUM_OF_RECORDS; j++) {
@@ -533,7 +533,7 @@
// allow all operations
StorageTestUtils.allowAllOps(lsmBtree);
insertOp.open();
- TupleGenerator tupleGenerator = StorageTestUtils.getTupleGenerator();
+ RecordTupleGenerator tupleGenerator = StorageTestUtils.getTupleGenerator();
VSizeFrame frame = new VSizeFrame(ctx);
FrameTupleAppender tupleAppender = new FrameTupleAppender(frame);
for (int j = 0; j < StorageTestUtils.TOTAL_NUM_OF_RECORDS; j++) {
@@ -594,7 +594,7 @@
StorageTestUtils.allowAllOps(lsmBtree);
lsmBtree.clearMergeCallbacks();
insertOp.open();
- TupleGenerator tupleGenerator = StorageTestUtils.getTupleGenerator();
+ RecordTupleGenerator tupleGenerator = StorageTestUtils.getTupleGenerator();
VSizeFrame frame = new VSizeFrame(ctx);
FrameTupleAppender tupleAppender = new FrameTupleAppender(frame);
for (int j = 0; j < StorageTestUtils.TOTAL_NUM_OF_RECORDS; j++) {
@@ -664,7 +664,7 @@
StorageTestUtils.allowAllOps(lsmBtree);
lsmBtree.clearMergeCallbacks();
insertOp.open();
- TupleGenerator tupleGenerator = StorageTestUtils.getTupleGenerator();
+ RecordTupleGenerator tupleGenerator = StorageTestUtils.getTupleGenerator();
VSizeFrame frame = new VSizeFrame(ctx);
FrameTupleAppender tupleAppender = new FrameTupleAppender(frame);
for (int j = 0; j < StorageTestUtils.TOTAL_NUM_OF_RECORDS; j++) {
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/IoCallbackFailureTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/IoCallbackFailureTest.java
index eb16cf4..b618727 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/IoCallbackFailureTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/IoCallbackFailureTest.java
@@ -22,7 +22,7 @@
import org.apache.asterix.app.bootstrap.TestNodeController;
import org.apache.asterix.app.bootstrap.TestNodeController.PrimaryIndexInfo;
-import org.apache.asterix.app.data.gen.TupleGenerator;
+import org.apache.asterix.app.data.gen.RecordTupleGenerator;
import org.apache.asterix.app.nc.NCAppRuntimeContext;
import org.apache.asterix.common.api.IDatasetLifecycleManager;
import org.apache.asterix.common.dataflow.LSMInsertDeleteOperatorNodePushable;
@@ -118,7 +118,7 @@
throws Exception {
NCAppRuntimeContext ncAppCtx = nc.getAppRuntimeContext();
IDatasetLifecycleManager dsLifecycleMgr = ncAppCtx.getDatasetLifecycleManager();
- TupleGenerator tupleGenerator = StorageTestUtils.getTupleGenerator();
+ RecordTupleGenerator tupleGenerator = StorageTestUtils.getTupleGenerator();
ITransactionContext txnCtx = nc.getTransactionManager().beginTransaction(nc.getTxnJobId(ctx),
new TransactionOptions(ITransactionManager.AtomicityLevel.ENTITY_LEVEL));
boolean failed = false;
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/LSMFlushRecoveryTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/LSMFlushRecoveryTest.java
index e4623fd..79e6368 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/LSMFlushRecoveryTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/LSMFlushRecoveryTest.java
@@ -29,7 +29,7 @@
import org.apache.asterix.app.bootstrap.TestNodeController;
import org.apache.asterix.app.bootstrap.TestNodeController.PrimaryIndexInfo;
import org.apache.asterix.app.bootstrap.TestNodeController.SecondaryIndexInfo;
-import org.apache.asterix.app.data.gen.TupleGenerator;
+import org.apache.asterix.app.data.gen.RecordTupleGenerator;
import org.apache.asterix.app.nc.NCAppRuntimeContext;
import org.apache.asterix.common.api.IDatasetLifecycleManager;
import org.apache.asterix.common.config.DatasetConfig.IndexType;
@@ -92,7 +92,7 @@
private static IIndexDataflowHelper[] secondaryIndexDataflowHelpers;
private static ITransactionContext txnCtx;
private static LSMInsertDeleteOperatorNodePushable[] insertOps;
- private static TupleGenerator tupleGenerator;
+ private static RecordTupleGenerator tupleGenerator;
private static final int NUM_PARTITIONS = 2;
private static final int PARTITION_0 = 0;
@@ -478,6 +478,8 @@
ILSMMemoryComponent primaryMemComponent = primaryIndexes[partitionIndex].getCurrentMemoryComponent();
ILSMMemoryComponent secondaryMemComponent = secondaryIndexes[partitionIndex].getCurrentMemoryComponent();
Assert.assertEquals(primaryMemComponent.getId(), secondaryMemComponent.getId());
+ Assert.assertEquals(primaryIndexes[partitionIndex].getCurrentMemoryComponentIndex(),
+ secondaryIndexes[partitionIndex].getCurrentMemoryComponentIndex());
}
List<ILSMDiskComponent> primaryDiskComponents = primaryIndexes[partitionIndex].getDiskComponents();
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/LogMarkerTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/LogMarkerTest.java
index 2121327..e2c99b0 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/LogMarkerTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/LogMarkerTest.java
@@ -26,10 +26,9 @@
import org.apache.asterix.app.bootstrap.TestNodeController;
import org.apache.asterix.app.bootstrap.TestNodeController.PrimaryIndexInfo;
+import org.apache.asterix.app.data.gen.RecordTupleGenerator;
+import org.apache.asterix.app.data.gen.RecordTupleGenerator.GenerationFunction;
import org.apache.asterix.app.data.gen.TestTupleCounterFrameWriter;
-import org.apache.asterix.app.data.gen.TupleGenerator;
-import org.apache.asterix.app.data.gen.TupleGenerator.GenerationFunction;
-import org.apache.asterix.common.config.DatasetConfig.DatasetType;
import org.apache.asterix.common.dataflow.LSMInsertDeleteOperatorNodePushable;
import org.apache.asterix.common.transactions.ILogRecord;
import org.apache.asterix.common.transactions.ITransactionContext;
@@ -37,10 +36,7 @@
import org.apache.asterix.common.transactions.TransactionOptions;
import org.apache.asterix.external.util.DataflowUtils;
import org.apache.asterix.file.StorageComponentProvider;
-import org.apache.asterix.metadata.entities.Dataset;
import org.apache.asterix.metadata.entities.Index;
-import org.apache.asterix.metadata.entities.InternalDatasetDetails;
-import org.apache.asterix.metadata.entities.InternalDatasetDetails.PartitioningStrategy;
import org.apache.asterix.om.types.ARecordType;
import org.apache.asterix.om.types.BuiltinType;
import org.apache.asterix.om.types.IAType;
@@ -113,22 +109,20 @@
StorageComponentProvider storageManager = new StorageComponentProvider();
List<List<String>> partitioningKeys = new ArrayList<>();
partitioningKeys.add(Collections.singletonList("key"));
- Dataset dataset = new Dataset(DATAVERSE_NAME, DATASET_NAME, DATAVERSE_NAME, DATA_TYPE_NAME, NODE_GROUP_NAME,
- NoMergePolicyFactory.NAME, null, new InternalDatasetDetails(null, PartitioningStrategy.HASH,
- partitioningKeys, null, null, null, false, null),
- null, DatasetType.INTERNAL, DATASET_ID, 0);
try {
- PrimaryIndexInfo indexInfo = nc.createPrimaryIndex(dataset, KEY_TYPES, RECORD_TYPE, META_TYPE, null,
- storageManager, KEY_INDEXES, KEY_INDICATORS_LIST, 0);
+ PrimaryIndexInfo indexInfo = nc.createPrimaryIndex(StorageTestUtils.DATASET, KEY_TYPES, RECORD_TYPE,
+ META_TYPE, null, storageManager, KEY_INDEXES, KEY_INDICATORS_LIST, 0);
JobId jobId = nc.newJobId();
IHyracksTaskContext ctx = nc.createTestContext(jobId, 0, true);
ITransactionContext txnCtx = nc.getTransactionManager().beginTransaction(nc.getTxnJobId(ctx),
new TransactionOptions(ITransactionManager.AtomicityLevel.ENTITY_LEVEL));
- LSMInsertDeleteOperatorNodePushable insertOp = nc.getInsertPipeline(ctx, dataset, KEY_TYPES,
- RECORD_TYPE, META_TYPE, null, KEY_INDEXES, KEY_INDICATORS_LIST, storageManager, null).getLeft();
+ LSMInsertDeleteOperatorNodePushable insertOp =
+ nc.getInsertPipeline(ctx, StorageTestUtils.DATASET, KEY_TYPES, RECORD_TYPE, META_TYPE, null,
+ KEY_INDEXES, KEY_INDICATORS_LIST, storageManager, null).getLeft();
insertOp.open();
- TupleGenerator tupleGenerator = new TupleGenerator(RECORD_TYPE, META_TYPE, KEY_INDEXES, KEY_INDICATORS,
- RECORD_GEN_FUNCTION, UNIQUE_RECORD_FIELDS, META_GEN_FUNCTION, UNIQUE_META_FIELDS);
+ RecordTupleGenerator tupleGenerator =
+ new RecordTupleGenerator(RECORD_TYPE, META_TYPE, KEY_INDEXES, KEY_INDICATORS,
+ RECORD_GEN_FUNCTION, UNIQUE_RECORD_FIELDS, META_GEN_FUNCTION, UNIQUE_META_FIELDS);
VSizeFrame frame = new VSizeFrame(ctx);
VSizeFrame marker = new VSizeFrame(ctx);
FrameTupleAppender tupleAppender = new FrameTupleAppender(frame);
@@ -178,9 +172,9 @@
nc.newJobId();
TestTupleCounterFrameWriter countOp = create(nc.getSearchOutputDesc(KEY_TYPES, RECORD_TYPE, META_TYPE),
Collections.emptyList(), Collections.emptyList(), false);
- IPushRuntime emptyTupleOp = nc.getFullScanPipeline(countOp, ctx, dataset, KEY_TYPES, RECORD_TYPE,
- META_TYPE, new NoMergePolicyFactory(), null, null, KEY_INDEXES, KEY_INDICATORS_LIST,
- storageManager);
+ IPushRuntime emptyTupleOp = nc.getFullScanPipeline(countOp, ctx, StorageTestUtils.DATASET, KEY_TYPES,
+ RECORD_TYPE, META_TYPE, new NoMergePolicyFactory(), null, null, KEY_INDEXES,
+ KEY_INDICATORS_LIST, storageManager);
emptyTupleOp.open();
emptyTupleOp.close();
Assert.assertEquals(NUM_OF_RECORDS, countOp.getCount());
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/MultiPartitionLSMIndexTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/MultiPartitionLSMIndexTest.java
index 1795c93..a7225a1 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/MultiPartitionLSMIndexTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/MultiPartitionLSMIndexTest.java
@@ -29,8 +29,8 @@
import org.apache.asterix.app.bootstrap.TestNodeController;
import org.apache.asterix.app.bootstrap.TestNodeController.PrimaryIndexInfo;
import org.apache.asterix.app.bootstrap.TestNodeController.SecondaryIndexInfo;
-import org.apache.asterix.app.data.gen.TupleGenerator;
-import org.apache.asterix.app.data.gen.TupleGenerator.GenerationFunction;
+import org.apache.asterix.app.data.gen.RecordTupleGenerator;
+import org.apache.asterix.app.data.gen.RecordTupleGenerator.GenerationFunction;
import org.apache.asterix.app.nc.NCAppRuntimeContext;
import org.apache.asterix.common.api.IDatasetLifecycleManager;
import org.apache.asterix.common.config.DatasetConfig.DatasetType;
@@ -619,14 +619,14 @@
public class Actor extends SingleThreadEventProcessor<Request> {
private final int partition;
- private final TupleGenerator tupleGenerator;
+ private final RecordTupleGenerator tupleGenerator;
private final VSizeFrame frame;
private final FrameTupleAppender tupleAppender;
public Actor(String name, int partition) throws HyracksDataException {
super(name);
this.partition = partition;
- tupleGenerator = new TupleGenerator(RECORD_TYPE, META_TYPE, KEY_INDEXES, KEY_INDICATORS,
+ tupleGenerator = new RecordTupleGenerator(RECORD_TYPE, META_TYPE, KEY_INDEXES, KEY_INDICATORS,
RECORD_GEN_FUNCTION, UNIQUE_RECORD_FIELDS, META_GEN_FUNCTION, UNIQUE_META_FIELDS);
frame = new VSizeFrame(taskCtxs[partition]);
tupleAppender = new FrameTupleAppender(frame);
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/SearchCursorComponentSwitchTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/SearchCursorComponentSwitchTest.java
index 72026a2..61c1fb2 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/SearchCursorComponentSwitchTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/SearchCursorComponentSwitchTest.java
@@ -26,8 +26,8 @@
import org.apache.asterix.app.bootstrap.TestNodeController;
import org.apache.asterix.app.bootstrap.TestNodeController.PrimaryIndexInfo;
import org.apache.asterix.app.data.gen.TestTupleCounterFrameWriter;
-import org.apache.asterix.app.data.gen.TupleGenerator;
-import org.apache.asterix.app.data.gen.TupleGenerator.GenerationFunction;
+import org.apache.asterix.app.data.gen.RecordTupleGenerator;
+import org.apache.asterix.app.data.gen.RecordTupleGenerator.GenerationFunction;
import org.apache.asterix.app.nc.NCAppRuntimeContext;
import org.apache.asterix.common.api.IDatasetLifecycleManager;
import org.apache.asterix.common.config.DatasetConfig.DatasetType;
@@ -161,8 +161,8 @@
// except search
lsmBtree.clearSearchCallbacks();
insertOp.open();
- TupleGenerator tupleGenerator = new TupleGenerator(RECORD_TYPE, META_TYPE, KEY_INDEXES, KEY_INDICATORS,
- RECORD_GEN_FUNCTION, UNIQUE_RECORD_FIELDS, META_GEN_FUNCTION, UNIQUE_META_FIELDS);
+ RecordTupleGenerator tupleGenerator = new RecordTupleGenerator(RECORD_TYPE, META_TYPE, KEY_INDEXES,
+ KEY_INDICATORS, RECORD_GEN_FUNCTION, UNIQUE_RECORD_FIELDS, META_GEN_FUNCTION, UNIQUE_META_FIELDS);
VSizeFrame frame = new VSizeFrame(ctx);
FrameTupleAppender tupleAppender = new FrameTupleAppender(frame);
Searcher firstSearcher = null;
@@ -207,8 +207,8 @@
// except search
lsmBtree.clearSearchCallbacks();
insertOp.open();
- TupleGenerator tupleGenerator = new TupleGenerator(RECORD_TYPE, META_TYPE, KEY_INDEXES, KEY_INDICATORS,
- RECORD_GEN_FUNCTION, UNIQUE_RECORD_FIELDS, META_GEN_FUNCTION, UNIQUE_META_FIELDS);
+ RecordTupleGenerator tupleGenerator = new RecordTupleGenerator(RECORD_TYPE, META_TYPE, KEY_INDEXES,
+ KEY_INDICATORS, RECORD_GEN_FUNCTION, UNIQUE_RECORD_FIELDS, META_GEN_FUNCTION, UNIQUE_META_FIELDS);
VSizeFrame frame = new VSizeFrame(ctx);
FrameTupleAppender tupleAppender = new FrameTupleAppender(frame);
Searcher firstSearcher = null;
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/StorageTestUtils.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/StorageTestUtils.java
index d08fc72..589e8b2 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/StorageTestUtils.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/StorageTestUtils.java
@@ -33,9 +33,9 @@
import org.apache.asterix.app.bootstrap.TestNodeController;
import org.apache.asterix.app.bootstrap.TestNodeController.PrimaryIndexInfo;
+import org.apache.asterix.app.data.gen.RecordTupleGenerator;
+import org.apache.asterix.app.data.gen.RecordTupleGenerator.GenerationFunction;
import org.apache.asterix.app.data.gen.TestTupleCounterFrameWriter;
-import org.apache.asterix.app.data.gen.TupleGenerator;
-import org.apache.asterix.app.data.gen.TupleGenerator.GenerationFunction;
import org.apache.asterix.common.api.IDatasetLifecycleManager;
import org.apache.asterix.common.config.DatasetConfig.DatasetType;
import org.apache.asterix.common.context.DatasetInfo;
@@ -100,7 +100,7 @@
private StorageTestUtils() {
}
- static void allowAllOps(TestLsmBtree lsmBtree) {
+ public static void allowAllOps(TestLsmBtree lsmBtree) {
lsmBtree.clearModifyCallbacks();
lsmBtree.clearFlushCallbacks();
lsmBtree.clearSearchCallbacks();
@@ -118,6 +118,12 @@
KEY_INDICATORS_LIST, partition);
}
+ public static PrimaryIndexInfo createPrimaryIndex(TestNodeController nc, Dataset dataset, int partition)
+ throws HyracksDataException, RemoteException, ACIDException, AlgebricksException {
+ return nc.createPrimaryIndex(dataset, KEY_TYPES, RECORD_TYPE, META_TYPE, null, STORAGE_MANAGER, KEY_INDEXES,
+ KEY_INDICATORS_LIST, partition);
+ }
+
public static LSMInsertDeleteOperatorNodePushable getInsertPipeline(TestNodeController nc, IHyracksTaskContext ctx)
throws HyracksDataException, RemoteException, ACIDException, AlgebricksException {
return getInsertPipeline(nc, ctx, null);
@@ -131,13 +137,27 @@
}
public static LSMInsertDeleteOperatorNodePushable getInsertPipeline(TestNodeController nc, IHyracksTaskContext ctx,
+ Dataset dataset, Index secondaryIndex, IndexOperation op)
+ throws HyracksDataException, RemoteException, ACIDException, AlgebricksException {
+ return nc.getInsertPipeline(ctx, dataset, KEY_TYPES, RECORD_TYPE, META_TYPE, null, KEY_INDEXES,
+ KEY_INDICATORS_LIST, STORAGE_MANAGER, secondaryIndex, op).getLeft();
+ }
+
+ public static LSMInsertDeleteOperatorNodePushable getInsertPipeline(TestNodeController nc, IHyracksTaskContext ctx,
Index secondaryIndex) throws HyracksDataException, RemoteException, ACIDException, AlgebricksException {
return nc.getInsertPipeline(ctx, DATASET, KEY_TYPES, RECORD_TYPE, META_TYPE, null, KEY_INDEXES,
KEY_INDICATORS_LIST, STORAGE_MANAGER, secondaryIndex).getLeft();
}
- public static TupleGenerator getTupleGenerator() {
- return new TupleGenerator(RECORD_TYPE, META_TYPE, KEY_INDEXES, KEY_INDICATORS, RECORD_GEN_FUNCTION,
+ public static LSMInsertDeleteOperatorNodePushable getInsertPipeline(TestNodeController nc, IHyracksTaskContext ctx,
+ Dataset dataset, Index secondaryIndex)
+ throws HyracksDataException, RemoteException, ACIDException, AlgebricksException {
+ return nc.getInsertPipeline(ctx, dataset, KEY_TYPES, RECORD_TYPE, META_TYPE, null, KEY_INDEXES,
+ KEY_INDICATORS_LIST, STORAGE_MANAGER, secondaryIndex).getLeft();
+ }
+
+ public static RecordTupleGenerator getTupleGenerator() {
+ return new RecordTupleGenerator(RECORD_TYPE, META_TYPE, KEY_INDEXES, KEY_INDICATORS, RECORD_GEN_FUNCTION,
UNIQUE_RECORD_FIELDS, META_GEN_FUNCTION, UNIQUE_META_FIELDS);
}
@@ -146,6 +166,11 @@
searchAndAssertCount(nc, partition, DATASET, STORAGE_MANAGER, numOfRecords);
}
+ public static void searchAndAssertCount(TestNodeController nc, Dataset dataset, int partition, int numOfRecords)
+ throws HyracksDataException, AlgebricksException {
+ searchAndAssertCount(nc, partition, dataset, STORAGE_MANAGER, numOfRecords);
+ }
+
public static void searchAndAssertCount(TestNodeController nc, int partition, Dataset dataset,
StorageComponentProvider storageManager, int numOfRecords)
throws HyracksDataException, AlgebricksException {
@@ -182,6 +207,11 @@
flushPartition(dslLifecycleMgr, lsmBtree, DATASET, async);
}
+ public static void flushPartition(IDatasetLifecycleManager dslLifecycleMgr, Dataset dataset, TestLsmBtree lsmBtree,
+ boolean async) throws Exception {
+ flushPartition(dslLifecycleMgr, lsmBtree, dataset, async);
+ }
+
public static void flushPartition(IDatasetLifecycleManager dslLifecycleMgr, TestLsmBtree lsmBtree, Dataset dataset,
boolean async) throws Exception {
waitForOperations(lsmBtree);
@@ -211,6 +241,11 @@
flush(dsLifecycleMgr, lsmBtree, DATASET, async);
}
+ public static void flush(IDatasetLifecycleManager dsLifecycleMgr, Dataset dataset, TestLsmBtree lsmBtree,
+ boolean async) throws Exception {
+ flush(dsLifecycleMgr, lsmBtree, dataset, async);
+ }
+
public static void flush(IDatasetLifecycleManager dsLifecycleMgr, TestLsmBtree lsmBtree, Dataset dataset,
boolean async) throws Exception {
waitForOperations(lsmBtree);
@@ -240,6 +275,11 @@
this(nc, partition, DATASET, STORAGE_MANAGER, lsmBtree, numOfRecords);
}
+ public Searcher(TestNodeController nc, Dataset dataset, int partition, TestLsmBtree lsmBtree,
+ int numOfRecords) {
+ this(nc, partition, dataset, STORAGE_MANAGER, lsmBtree, numOfRecords);
+ }
+
public Searcher(TestNodeController nc, int partition, Dataset dataset, StorageComponentProvider storageManager,
TestLsmBtree lsmBtree, int numOfRecords) {
lsmBtree.addSearchCallback(new ITestOpCallback<Semaphore>() {
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestDataset.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestDataset.java
index 20875a3..bcf68b5 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestDataset.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestDataset.java
@@ -21,6 +21,7 @@
import java.util.Map;
import org.apache.asterix.common.config.DatasetConfig.DatasetType;
+import org.apache.asterix.common.transactions.TxnId;
import org.apache.asterix.metadata.IDatasetDetails;
import org.apache.asterix.metadata.declared.MetadataProvider;
import org.apache.asterix.metadata.entities.Dataset;
@@ -28,9 +29,14 @@
import org.apache.asterix.metadata.utils.DatasetUtil;
import org.apache.asterix.om.types.ARecordType;
import org.apache.asterix.transaction.management.resource.DatasetLocalResourceFactory;
+import org.apache.asterix.transaction.management.runtime.CommitRuntime;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.runtime.base.IPushRuntime;
+import org.apache.hyracks.algebricks.runtime.base.IPushRuntimeFactory;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
import org.apache.hyracks.api.dataflow.value.ITypeTraits;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallbackFactory;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMMergePolicyFactory;
import org.apache.hyracks.storage.common.IResourceFactory;
@@ -48,6 +54,19 @@
}
@Override
+ public IPushRuntimeFactory getCommitRuntimeFactory(MetadataProvider metadataProvider,
+ int[] primaryKeyFieldPermutation, boolean isSink) throws AlgebricksException {
+ return new IPushRuntimeFactory() {
+ @Override
+ public IPushRuntime[] createPushRuntime(IHyracksTaskContext ctx) throws HyracksDataException {
+ return new IPushRuntime[] { new CommitRuntime(ctx, new TxnId(ctx.getJobletContext().getJobId().getId()),
+ getDatasetId(), primaryKeyFieldPermutation, true,
+ ctx.getTaskAttemptId().getTaskId().getPartition(), true) };
+ }
+ };
+ }
+
+ @Override
public IResourceFactory getResourceFactory(MetadataProvider mdProvider, Index index, ARecordType recordType,
ARecordType metaType, ILSMMergePolicyFactory mergePolicyFactory, Map<String, String> mergePolicyProperties)
throws AlgebricksException {
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TransactionAbortTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TransactionAbortTest.java
index 7a3e475..bee2f8d 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TransactionAbortTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TransactionAbortTest.java
@@ -23,7 +23,7 @@
import org.apache.asterix.app.bootstrap.TestNodeController;
import org.apache.asterix.app.bootstrap.TestNodeController.PrimaryIndexInfo;
-import org.apache.asterix.app.data.gen.TupleGenerator;
+import org.apache.asterix.app.data.gen.RecordTupleGenerator;
import org.apache.asterix.app.nc.NCAppRuntimeContext;
import org.apache.asterix.common.api.IDatasetLifecycleManager;
import org.apache.asterix.common.dataflow.LSMInsertDeleteOperatorNodePushable;
@@ -68,7 +68,7 @@
private static IHyracksTaskContext abortCtx;
private static ITransactionContext abortTxnCtx;
private static LSMInsertDeleteOperatorNodePushable abortOp;
- private static TupleGenerator tupleGenerator;
+ private static RecordTupleGenerator tupleGenerator;
@Rule
public TestRule watcher = new TestMethodTracer();
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/logging/CheckpointingTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/logging/CheckpointingTest.java
index 418282e..3634bf1 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/logging/CheckpointingTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/logging/CheckpointingTest.java
@@ -18,6 +18,10 @@
*/
package org.apache.asterix.test.logging;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.spy;
+
import java.io.File;
import java.nio.file.Path;
import java.util.ArrayList;
@@ -26,10 +30,9 @@
import java.util.List;
import org.apache.asterix.app.bootstrap.TestNodeController;
-import org.apache.asterix.app.data.gen.TupleGenerator;
-import org.apache.asterix.app.data.gen.TupleGenerator.GenerationFunction;
+import org.apache.asterix.app.data.gen.RecordTupleGenerator;
+import org.apache.asterix.app.data.gen.RecordTupleGenerator.GenerationFunction;
import org.apache.asterix.app.nc.RecoveryManager;
-import org.apache.asterix.common.config.DatasetConfig.DatasetType;
import org.apache.asterix.common.dataflow.LSMInsertDeleteOperatorNodePushable;
import org.apache.asterix.common.exceptions.ACIDException;
import org.apache.asterix.common.transactions.Checkpoint;
@@ -43,14 +46,12 @@
import org.apache.asterix.common.utils.TransactionUtil;
import org.apache.asterix.external.util.DataflowUtils;
import org.apache.asterix.file.StorageComponentProvider;
-import org.apache.asterix.metadata.entities.Dataset;
import org.apache.asterix.metadata.entities.Index;
-import org.apache.asterix.metadata.entities.InternalDatasetDetails;
-import org.apache.asterix.metadata.entities.InternalDatasetDetails.PartitioningStrategy;
import org.apache.asterix.om.types.ARecordType;
import org.apache.asterix.om.types.BuiltinType;
import org.apache.asterix.om.types.IAType;
import org.apache.asterix.test.common.TestHelper;
+import org.apache.asterix.test.dataflow.StorageTestUtils;
import org.apache.asterix.transaction.management.service.logging.LogManager;
import org.apache.asterix.transaction.management.service.recovery.AbstractCheckpointManager;
import org.apache.asterix.transaction.management.service.transaction.TransactionManager;
@@ -60,17 +61,12 @@
import org.apache.hyracks.api.job.JobId;
import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
-import org.apache.hyracks.storage.am.lsm.common.impls.NoMergePolicyFactory;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.stubbing.Answer;
-import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.doAnswer;
-import static org.mockito.Mockito.spy;
-
public class CheckpointingTest {
private static final String TEST_CONFIG_FILE_NAME = "cc-small-txn-log-partition.conf";
@@ -116,23 +112,21 @@
nc.init();
List<List<String>> partitioningKeys = new ArrayList<>();
partitioningKeys.add(Collections.singletonList("key"));
- Dataset dataset = new Dataset(DATAVERSE_NAME, DATASET_NAME, DATAVERSE_NAME, DATA_TYPE_NAME, NODE_GROUP_NAME,
- NoMergePolicyFactory.NAME, null, new InternalDatasetDetails(null, PartitioningStrategy.HASH,
- partitioningKeys, null, null, null, false, null),
- null, DatasetType.INTERNAL, DATASET_ID, 0);
try {
- nc.createPrimaryIndex(dataset, KEY_TYPES, RECORD_TYPE, META_TYPE, null, storageManager, KEY_INDEXES,
- KEY_INDICATOR_LIST, 0);
+ nc.createPrimaryIndex(StorageTestUtils.DATASET, KEY_TYPES, RECORD_TYPE, META_TYPE, null, storageManager,
+ KEY_INDEXES, KEY_INDICATOR_LIST, 0);
JobId jobId = nc.newJobId();
IHyracksTaskContext ctx = nc.createTestContext(jobId, 0, false);
ITransactionContext txnCtx = nc.getTransactionManager().beginTransaction(nc.getTxnJobId(ctx),
new TransactionOptions(ITransactionManager.AtomicityLevel.ENTITY_LEVEL));
// Prepare insert operation
- LSMInsertDeleteOperatorNodePushable insertOp = nc.getInsertPipeline(ctx, dataset, KEY_TYPES,
- RECORD_TYPE, META_TYPE, null, KEY_INDEXES, KEY_INDICATOR_LIST, storageManager, null).getLeft();
+ LSMInsertDeleteOperatorNodePushable insertOp =
+ nc.getInsertPipeline(ctx, StorageTestUtils.DATASET, KEY_TYPES, RECORD_TYPE, META_TYPE, null,
+ KEY_INDEXES, KEY_INDICATOR_LIST, storageManager, null).getLeft();
insertOp.open();
- TupleGenerator tupleGenerator = new TupleGenerator(RECORD_TYPE, META_TYPE, KEY_INDEXES, KEY_INDICATOR,
- RECORD_GEN_FUNCTION, UNIQUE_RECORD_FIELDS, META_GEN_FUNCTION, UNIQUE_META_FIELDS);
+ RecordTupleGenerator tupleGenerator =
+ new RecordTupleGenerator(RECORD_TYPE, META_TYPE, KEY_INDEXES, KEY_INDICATOR,
+ RECORD_GEN_FUNCTION, UNIQUE_RECORD_FIELDS, META_GEN_FUNCTION, UNIQUE_META_FIELDS);
VSizeFrame frame = new VSizeFrame(ctx);
FrameTupleAppender tupleAppender = new FrameTupleAppender(frame);
@@ -197,8 +191,9 @@
nc.getTransactionManager().beginTransaction(nc.getTxnJobId(ctx2),
new TransactionOptions(ITransactionManager.AtomicityLevel.ENTITY_LEVEL));
// Prepare insert operation
- LSMInsertDeleteOperatorNodePushable insertOp2 = nc.getInsertPipeline(ctx2, dataset, KEY_TYPES,
- RECORD_TYPE, META_TYPE, null, KEY_INDEXES, KEY_INDICATOR_LIST, storageManager, null).getLeft();
+ LSMInsertDeleteOperatorNodePushable insertOp2 =
+ nc.getInsertPipeline(ctx2, StorageTestUtils.DATASET, KEY_TYPES, RECORD_TYPE, META_TYPE, null,
+ KEY_INDEXES, KEY_INDICATOR_LIST, storageManager, null).getLeft();
insertOp2.open();
VSizeFrame frame2 = new VSizeFrame(ctx2);
FrameTupleAppender tupleAppender2 = new FrameTupleAppender(frame2);
@@ -220,6 +215,7 @@
}
}
Thread.UncaughtExceptionHandler h = new Thread.UncaughtExceptionHandler() {
+ @Override
public void uncaughtException(Thread th, Throwable ex) {
threadException = true;
exception = ex;
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/ExecutionTestUtil.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/ExecutionTestUtil.java
index c7ae2df..62c882d 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/ExecutionTestUtil.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/ExecutionTestUtil.java
@@ -25,7 +25,6 @@
import org.apache.asterix.api.common.AsterixHyracksIntegrationUtil;
import org.apache.asterix.common.api.INcApplicationContext;
-import org.apache.asterix.common.config.GlobalConfig;
import org.apache.asterix.common.dataflow.ICcApplicationContext;
import org.apache.asterix.common.library.ILibraryManager;
import org.apache.asterix.external.util.ExternalDataConstants;
@@ -107,6 +106,10 @@
tearDown(cleanup, integrationUtil, true);
}
+ public static void tearDown(boolean cleanup, boolean stopHdfs) throws Exception {
+ tearDown(cleanup, integrationUtil, stopHdfs);
+ }
+
public static void tearDown(boolean cleanup, AsterixHyracksIntegrationUtil integrationUtil, boolean stopHdfs)
throws Exception {
// validateBufferCacheState(); <-- Commented out until bug is fixed -->
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/INcApplicationContext.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/INcApplicationContext.java
index 9eb6259..b6581ec2 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/INcApplicationContext.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/INcApplicationContext.java
@@ -27,6 +27,7 @@
import org.apache.asterix.common.replication.IReplicationManager;
import org.apache.asterix.common.storage.IIndexCheckpointManagerProvider;
import org.apache.asterix.common.storage.IReplicaManager;
+import org.apache.asterix.common.transactions.IRecoveryManagerFactory;
import org.apache.asterix.common.transactions.ITransactionSubsystem;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.api.application.INCServiceContext;
@@ -65,7 +66,8 @@
IResourceIdFactory getResourceIdFactory();
- void initialize(boolean initialRun) throws IOException, AlgebricksException;
+ void initialize(IRecoveryManagerFactory recoveryManagerFactory, boolean initialRun)
+ throws IOException, AlgebricksException;
void setShuttingdown(boolean b);
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetInfo.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetInfo.java
index f4d764a..6e2e320 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetInfo.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetInfo.java
@@ -55,7 +55,7 @@
this.setRegistered(false);
this.setMemoryAllocated(false);
this.logManager = logManager;
- waitLog.setLogType(LogType.WAIT);
+ waitLog.setLogType(LogType.WAIT_FOR_FLUSHES);
waitLog.computeAndSetLogSize();
}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
index e5d18cf..50a4bef 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
@@ -23,9 +23,7 @@
import java.io.IOException;
import java.io.OutputStream;
import java.util.ArrayList;
-import java.util.Collection;
import java.util.Collections;
-import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@@ -34,7 +32,7 @@
import org.apache.asterix.common.api.IDatasetMemoryManager;
import org.apache.asterix.common.config.StorageProperties;
import org.apache.asterix.common.dataflow.DatasetLocalResource;
-import org.apache.asterix.common.exceptions.ACIDException;
+import org.apache.asterix.common.dataflow.LSMIndexUtil;
import org.apache.asterix.common.ioopcallbacks.LSMIOOperationCallback;
import org.apache.asterix.common.metadata.MetadataIndexImmutableProperties;
import org.apache.asterix.common.replication.IReplicationStrategy;
@@ -42,19 +40,16 @@
import org.apache.asterix.common.storage.IIndexCheckpointManagerProvider;
import org.apache.asterix.common.transactions.ILogManager;
import org.apache.asterix.common.transactions.LogRecord;
+import org.apache.asterix.common.transactions.LogType;
import org.apache.asterix.common.utils.StoragePathUtil;
-import org.apache.asterix.common.utils.TransactionUtil;
import org.apache.hyracks.api.exceptions.ErrorCode;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.lifecycle.ILifeCycleComponent;
-import org.apache.hyracks.storage.am.common.impls.NoOpIndexAccessParameters;
-import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentId;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentIdGenerator;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
-import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMOperationTracker;
import org.apache.hyracks.storage.am.lsm.common.api.IVirtualBufferCache;
-import org.apache.hyracks.storage.am.lsm.common.impls.LSMComponentId;
+import org.apache.hyracks.storage.am.lsm.common.impls.FlushOperation;
import org.apache.hyracks.storage.am.lsm.common.impls.LSMComponentIdGenerator;
import org.apache.hyracks.storage.common.IIndex;
import org.apache.hyracks.storage.common.ILocalResourceRepository;
@@ -70,7 +65,7 @@
private final ILocalResourceRepository resourceRepository;
private final IDatasetMemoryManager memoryManager;
private final ILogManager logManager;
- private final LogRecord logRecord;
+ private final LogRecord waitLog;
private final int numPartitions;
private volatile boolean stopped = false;
private final IIndexCheckpointManagerProvider indexCheckpointManagerProvider;
@@ -84,7 +79,9 @@
this.memoryManager = memoryManager;
this.indexCheckpointManagerProvider = indexCheckpointManagerProvider;
this.numPartitions = numPartitions;
- logRecord = new LogRecord();
+ waitLog = new LogRecord();
+ waitLog.setLogType(LogType.WAIT_FOR_FLUSHES);
+ waitLog.computeAndSetLogSize();
}
@Override
@@ -371,7 +368,9 @@
@Override
public synchronized void flushAllDatasets() throws HyracksDataException {
for (DatasetResource dsr : datasets.values()) {
- flushDatasetOpenIndexes(dsr, false);
+ if (dsr.getDatasetInfo().isOpen()) {
+ flushDatasetOpenIndexes(dsr, false);
+ }
}
}
@@ -423,77 +422,48 @@
*/
private void flushDatasetOpenIndexes(DatasetResource dsr, boolean asyncFlush) throws HyracksDataException {
DatasetInfo dsInfo = dsr.getDatasetInfo();
+ if (!dsInfo.isOpen()) {
+ throw new IllegalStateException("flushDatasetOpenIndexes is called on a dataset that is closed");
+ }
if (dsInfo.isExternal()) {
// no memory components for external dataset
return;
}
+ // ensure all in-flight flushes gets scheduled
+ logManager.log(waitLog);
for (PrimaryIndexOperationTracker primaryOpTracker : dsr.getOpTrackers()) {
// flush each partition one by one
if (primaryOpTracker.getNumActiveOperations() > 0) {
throw new IllegalStateException(
"flushDatasetOpenIndexes is called on a dataset with currently active operations");
}
- int partition = primaryOpTracker.getPartition();
- Collection<ILSMIndex> indexes = dsInfo.getDatasetPartitionOpenIndexes(partition);
- ILSMIndex flushIndex = null;
- for (ILSMIndex lsmIndex : indexes) {
- if (!lsmIndex.isCurrentMutableComponentEmpty()) {
- flushIndex = lsmIndex;
- break;
- }
+ primaryOpTracker.setFlushOnExit(true);
+ primaryOpTracker.flushIfNeeded();
+ }
+ // ensure requested flushes were scheduled
+ logManager.log(waitLog);
+ if (!asyncFlush) {
+ List<FlushOperation> flushes = new ArrayList<>();
+ for (PrimaryIndexOperationTracker primaryOpTracker : dsr.getOpTrackers()) {
+ flushes.addAll(primaryOpTracker.getScheduledFlushes());
}
- if (flushIndex == null) {
- // all open indexes are empty, nothing to flush
- continue;
- }
- LSMComponentId componentId = (LSMComponentId) flushIndex.getCurrentMemoryComponent().getId();
- ILSMComponentIdGenerator idGenerator = getComponentIdGenerator(dsInfo.getDatasetID(), partition);
- idGenerator.refresh();
- if (dsInfo.isDurable()) {
- synchronized (logRecord) {
- TransactionUtil.formFlushLogRecord(logRecord, dsInfo.getDatasetID(), partition,
- componentId.getMinId(), componentId.getMaxId(), null);
- try {
- logManager.log(logRecord);
- } catch (ACIDException e) {
- throw new HyracksDataException("could not write flush log while closing dataset", e);
- }
-
- try {
- //notification will come from LogBuffer class (notifyFlushTerminator)
- logRecord.wait();
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw HyracksDataException.create(e);
- }
- }
- }
- long flushLsn = logRecord.getLSN();
- ILSMComponentId nextComponentId = idGenerator.getId();
- Map<String, Object> flushMap = new HashMap<>();
- flushMap.put(LSMIOOperationCallback.KEY_FLUSH_LOG_LSN, flushLsn);
- flushMap.put(LSMIOOperationCallback.KEY_NEXT_COMPONENT_ID, nextComponentId);
- for (ILSMIndex index : indexes) {
- ILSMIndexAccessor accessor = index.createAccessor(NoOpIndexAccessParameters.INSTANCE);
- accessor.getOpContext().setParameters(flushMap);
- accessor.scheduleFlush();
- }
- if (!asyncFlush) {
- // Wait for the above flush op.
- dsInfo.waitForIO();
- }
+ LSMIndexUtil.waitFor(flushes);
}
}
private void closeDataset(DatasetResource dsr) throws HyracksDataException {
// First wait for any ongoing IO operations
DatasetInfo dsInfo = dsr.getDatasetInfo();
- dsInfo.waitForIO();
try {
flushDatasetOpenIndexes(dsr, false);
} catch (Exception e) {
throw HyracksDataException.create(e);
}
+ // wait for merges that were scheduled due to the above flush
+ // ideally, we shouldn't need this since merges should still work.
+ // They don't need a special memory budget but there is a problem
+ // for some merge policies that need to access dataset info (correlated prefix)
+ dsInfo.waitForIO();
for (IndexInfo iInfo : dsInfo.getIndexes().values()) {
closeIndex(iInfo);
}
@@ -505,7 +475,9 @@
public synchronized void closeAllDatasets() throws HyracksDataException {
ArrayList<DatasetResource> openDatasets = new ArrayList<>(datasets.values());
for (DatasetResource dsr : openDatasets) {
- closeDataset(dsr);
+ if (dsr.isOpen()) {
+ closeDataset(dsr);
+ }
}
}
@@ -612,7 +584,7 @@
@Override
public void flushDataset(IReplicationStrategy replicationStrategy) throws HyracksDataException {
for (DatasetResource dsr : datasets.values()) {
- if (replicationStrategy.isMatch(dsr.getDatasetID())) {
+ if (dsr.isOpen() && replicationStrategy.isMatch(dsr.getDatasetID())) {
flushDatasetOpenIndexes(dsr, false);
}
}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java
index 681669f..52c8962 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java
@@ -19,7 +19,10 @@
package org.apache.asterix.common.context;
+import java.util.ArrayList;
+import java.util.Collection;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
@@ -37,16 +40,19 @@
import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent.ComponentState;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentId;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentIdGenerator;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMMemoryComponent;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMOperationTracker;
+import org.apache.hyracks.storage.am.lsm.common.api.IoOperationCompleteListener;
import org.apache.hyracks.storage.am.lsm.common.api.LSMOperationType;
+import org.apache.hyracks.storage.am.lsm.common.impls.FlushOperation;
import org.apache.hyracks.storage.am.lsm.common.impls.LSMComponentId;
import org.apache.hyracks.storage.common.IModificationOperationCallback;
import org.apache.hyracks.storage.common.ISearchOperationCallback;
-public class PrimaryIndexOperationTracker extends BaseOperationTracker {
+public class PrimaryIndexOperationTracker extends BaseOperationTracker implements IoOperationCompleteListener {
private final int partition;
// Number of active operations on an ILSMIndex instance.
private final AtomicInteger numActiveOperations;
@@ -54,6 +60,7 @@
private final ILSMComponentIdGenerator idGenerator;
private boolean flushOnExit = false;
private boolean flushLogCreated = false;
+ private final Map<String, FlushOperation> scheduledFlushes = new HashMap<>();
public PrimaryIndexOperationTracker(int datasetID, int partition, ILogManager logManager, DatasetInfo dsInfo,
ILSMComponentIdGenerator idGenerator) {
@@ -107,6 +114,7 @@
}
if (needsFlush || flushOnExit) {
+ flushOnExit = false;
// make the current mutable components READABLE_UNWRITABLE to stop coming modify operations from entering
// them until the current flush is scheduled.
LSMComponentId primaryId = null;
@@ -143,7 +151,6 @@
throw new IllegalStateException("Primary index not found in dataset " + dsInfo.getDatasetID());
}
LogRecord logRecord = new LogRecord();
- flushOnExit = false;
if (dsInfo.isDurable()) {
/*
* Generate a FLUSH log.
@@ -182,16 +189,36 @@
Map<String, Object> flushMap = new HashMap<>();
flushMap.put(LSMIOOperationCallback.KEY_FLUSH_LOG_LSN, flushLsn);
flushMap.put(LSMIOOperationCallback.KEY_NEXT_COMPONENT_ID, nextComponentId);
- for (ILSMIndex lsmIndex : dsInfo.getDatasetPartitionOpenIndexes(partition)) {
- ILSMIndexAccessor accessor = lsmIndex.createAccessor(NoOpIndexAccessParameters.INSTANCE);
- accessor.getOpContext().setParameters(flushMap);
- accessor.scheduleFlush();
+ synchronized (scheduledFlushes) {
+ for (ILSMIndex lsmIndex : dsInfo.getDatasetPartitionOpenIndexes(partition)) {
+ ILSMIndexAccessor accessor = lsmIndex.createAccessor(NoOpIndexAccessParameters.INSTANCE);
+ accessor.getOpContext().setParameters(flushMap);
+ ILSMIOOperation flush = accessor.scheduleFlush();
+ scheduledFlushes.put(flush.getTarget().getRelativePath(), (FlushOperation) flush);
+ flush.addCompleteListener(this);
+ }
}
} finally {
flushLogCreated = false;
}
}
+ @Override
+ public void completed(ILSMIOOperation operation) {
+ synchronized (scheduledFlushes) {
+ scheduledFlushes.remove(operation.getTarget().getRelativePath());
+ }
+ }
+
+ public List<FlushOperation> getScheduledFlushes() {
+ synchronized (scheduledFlushes) {
+ Collection<FlushOperation> scheduled = scheduledFlushes.values();
+ List<FlushOperation> flushes = new ArrayList<FlushOperation>(scheduled.size());
+ flushes.addAll(scheduled);
+ return flushes;
+ }
+ }
+
public int getNumActiveOperations() {
return numActiveOperations.get();
}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/LSMIndexUtil.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/LSMIndexUtil.java
index 99ab2d0..71d16f7 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/LSMIndexUtil.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/LSMIndexUtil.java
@@ -18,9 +18,12 @@
*/
package org.apache.asterix.common.dataflow;
+import java.util.List;
+
import org.apache.asterix.common.ioopcallbacks.LSMIOOperationCallback;
import org.apache.asterix.common.transactions.ILogManager;
import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation;
import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndex;
public class LSMIndexUtil {
@@ -41,4 +44,15 @@
}
}
}
+
+ public static void waitFor(List<? extends ILSMIOOperation> ioOperations) throws HyracksDataException {
+ for (int i = 0; i < ioOperations.size(); i++) {
+ try {
+ ioOperations.get(i).sync();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw HyracksDataException.create(e);
+ }
+ }
+ }
}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMIOOperationCallback.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMIOOperationCallback.java
index 50f5906..ea53d68 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMIOOperationCallback.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMIOOperationCallback.java
@@ -20,19 +20,21 @@
package org.apache.asterix.common.ioopcallbacks;
import java.util.ArrayDeque;
+import java.util.Collection;
import java.util.Deque;
import java.util.List;
import java.util.Map;
-import java.util.Optional;
import org.apache.asterix.common.context.DatasetInfo;
import org.apache.asterix.common.storage.IIndexCheckpointManagerProvider;
import org.apache.asterix.common.storage.ResourceReference;
import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.io.FileReference;
import org.apache.hyracks.data.std.primitive.LongPointable;
import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
import org.apache.hyracks.storage.am.common.api.IMetadataPageManager;
import org.apache.hyracks.storage.am.common.freepage.MutableArrayValueReference;
+import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentId;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
@@ -45,13 +47,19 @@
import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndexFileManager;
import org.apache.hyracks.storage.am.lsm.common.impls.DiskComponentMetadata;
import org.apache.hyracks.storage.am.lsm.common.impls.FlushOperation;
+import org.apache.hyracks.storage.am.lsm.common.impls.LSMComponentId;
import org.apache.hyracks.storage.am.lsm.common.util.ComponentUtils;
import org.apache.hyracks.storage.am.lsm.common.util.LSMComponentIdUtils;
+import org.apache.logging.log4j.Level;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
// A single LSMIOOperationCallback per LSM index used to perform actions around Flush and Merge operations
public class LSMIOOperationCallback implements ILSMIOOperationCallback {
+ private static final Logger LOGGER = LogManager.getLogger();
public static final String KEY_FLUSH_LOG_LSN = "FlushLogLsn";
public static final String KEY_NEXT_COMPONENT_ID = "NextComponentId";
+ public static final String KEY_FLUSHED_COMPONENT_ID = "FlushedComponentId";
private static final String KEY_FIRST_LSN = "FirstLsn";
private static final MutableArrayValueReference KEY_METADATA_FLUSH_LOG_LSN =
new MutableArrayValueReference(KEY_FLUSH_LOG_LSN.getBytes());
@@ -83,8 +91,11 @@
if (operation.getStatus() == LSMIOOperationStatus.FAILURE) {
return;
}
- if (operation.getIOOpertionType() == LSMIOOperationType.FLUSH) {
- Map<String, Object> map = operation.getAccessor().getOpContext().getParameters();
+ if (operation.getIOOpertionType() == LSMIOOperationType.LOAD) {
+ Map<String, Object> map = operation.getParameters();
+ putComponentIdIntoMetadata(operation.getNewComponent(), (LSMComponentId) map.get(KEY_FLUSHED_COMPONENT_ID));
+ } else if (operation.getIOOpertionType() == LSMIOOperationType.FLUSH) {
+ Map<String, Object> map = operation.getParameters();
putLSNIntoMetadata(operation.getNewComponent(), (Long) map.get(KEY_FLUSH_LOG_LSN));
putComponentIdIntoMetadata(operation.getNewComponent(),
((FlushOperation) operation).getFlushingComponent().getId());
@@ -104,16 +115,64 @@
if (operation.getStatus() == LSMIOOperationStatus.FAILURE) {
return;
}
- if (operation.getIOOpertionType() == LSMIOOperationType.FLUSH) {
- Map<String, Object> map = operation.getAccessor().getOpContext().getParameters();
- final Long lsn = (Long) map.get(KEY_FLUSH_LOG_LSN);
- final Optional<String> componentFile =
- operation.getNewComponent().getLSMComponentPhysicalFiles().stream().findAny();
- if (componentFile.isPresent()) {
- final ResourceReference ref = ResourceReference.of(componentFile.get());
- final String componentEndTime = AbstractLSMIndexFileManager.getComponentEndTime(ref.getName());
- indexCheckpointManagerProvider.get(ref).flushed(componentEndTime, lsn);
+ if (operation.getIOOpertionType() != LSMIOOperationType.LOAD
+ && operation.getAccessor().getOpContext().getOperation() == IndexOperation.DELETE_COMPONENTS) {
+ deleteComponentsFromCheckpoint(operation);
+ } else if (operation.getIOOpertionType() == LSMIOOperationType.FLUSH
+ || operation.getIOOpertionType() == LSMIOOperationType.LOAD) {
+ addComponentToCheckpoint(operation);
+ }
+ }
+
+ private void addComponentToCheckpoint(ILSMIOOperation operation) throws HyracksDataException {
+ // will always update the checkpoint file even if no new component was created
+ FileReference target = operation.getTarget();
+ Map<String, Object> map = operation.getParameters();
+ final Long lsn =
+ operation.getIOOpertionType() == LSMIOOperationType.FLUSH ? (Long) map.get(KEY_FLUSH_LOG_LSN) : 0L;
+ final LSMComponentId id = (LSMComponentId) map.get(KEY_FLUSHED_COMPONENT_ID);
+ final ResourceReference ref = ResourceReference.of(target.getAbsolutePath());
+ final String componentEndTime = AbstractLSMIndexFileManager.getComponentEndTime(ref.getName());
+ indexCheckpointManagerProvider.get(ref).flushed(componentEndTime, lsn, id.getMaxId());
+ }
+
+ private void deleteComponentsFromCheckpoint(ILSMIOOperation operation) throws HyracksDataException {
+ // component was deleted... if a flush, do nothing.. if a merge, must update the checkpoint file
+ if (operation.getIOOpertionType() == LSMIOOperationType.MERGE) {
+ // Get component id of the last disk component
+ LSMComponentId mostRecentComponentId =
+ getMostRecentComponentId(operation.getAccessor().getOpContext().getComponentsToBeMerged());
+ // Update the checkpoint file
+ FileReference target = operation.getTarget();
+ final ResourceReference ref = ResourceReference.of(target.getAbsolutePath());
+ indexCheckpointManagerProvider.get(ref).setLastComponentId(mostRecentComponentId.getMaxId());
+ } else if (operation.getIOOpertionType() != LSMIOOperationType.FLUSH) {
+ throw new IllegalStateException("Unexpected IO operation: " + operation.getIOOpertionType());
+ }
+ }
+
+ private LSMComponentId getMostRecentComponentId(Collection<ILSMDiskComponent> deletedComponents)
+ throws HyracksDataException {
+ // must sync on opTracker to ensure list of components doesn't change
+ synchronized (lsmIndex.getOperationTracker()) {
+ List<ILSMDiskComponent> diskComponents = lsmIndex.getDiskComponents();
+ if (diskComponents.isEmpty()) {
+ LOGGER.log(Level.INFO, "There are no disk components");
+ return LSMComponentId.EMPTY_INDEX_LAST_COMPONENT_ID;
}
+ if (deletedComponents.contains(diskComponents.get(diskComponents.size() - 1))) {
+ LOGGER.log(Level.INFO, "All disk components have been deleted");
+ return LSMComponentId.EMPTY_INDEX_LAST_COMPONENT_ID;
+ }
+ int mostRecentComponentIndex = 0;
+ for (int i = 0; i < diskComponents.size(); i++) {
+ if (!deletedComponents.contains(diskComponents.get(i))) {
+ break;
+ }
+ mostRecentComponentIndex++;
+ }
+ ILSMDiskComponent mostRecentDiskComponent = diskComponents.get(mostRecentComponentIndex);
+ return (LSMComponentId) mostRecentDiskComponent.getId();
}
}
@@ -153,14 +212,6 @@
LSMComponentIdUtils.persist(componentId, newComponent.getMetadata());
}
- /**
- * Used during the recovery process to force refresh the next component id
- */
- public void forceRefreshNextId(ILSMComponentId nextComponentId) {
- componentIds.clear();
- componentIds.add(nextComponentId);
- }
-
public synchronized void setFirstLsnForCurrentMemoryComponent(long firstLsn) {
this.firstLsnForCurrentMemoryComponent = firstLsn;
if (pendingFlushes == 0) {
@@ -195,9 +246,11 @@
dsInfo.declareActiveIOOperation();
if (operation.getIOOpertionType() == LSMIOOperationType.FLUSH) {
pendingFlushes++;
+ FlushOperation flush = (FlushOperation) operation;
Map<String, Object> map = operation.getAccessor().getOpContext().getParameters();
Long flushLsn = (Long) map.get(KEY_FLUSH_LOG_LSN);
map.put(KEY_FIRST_LSN, firstLsnForCurrentMemoryComponent);
+ map.put(KEY_FLUSHED_COMPONENT_ID, flush.getFlushingComponent().getId());
componentIds.add((ILSMComponentId) map.get(KEY_NEXT_COMPONENT_ID));
firstLsnForCurrentMemoryComponent = flushLsn; // Advance the first lsn for new component
}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IIndexCheckpointManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IIndexCheckpointManager.java
index b008f11..2c0872c 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IIndexCheckpointManager.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IIndexCheckpointManager.java
@@ -27,31 +27,33 @@
/**
* Initializes the first checkpoint of an index with low watermark {@code lsn}
*
+ * @param componentTimestamp
* @param lsn
* @throws HyracksDataException
*/
- void init(long lsn) throws HyracksDataException;
+ void init(String componentTimestamp, long lsn) throws HyracksDataException;
/**
- * Called when a new LSM disk component is flushed. When called, the index checkpoiint is updated
+ * Called when a new LSM disk component is flushed. When called, the index checkpoint is updated
* with the latest valid {@code componentTimestamp} and low watermark {@code lsn}
*
* @param componentTimestamp
* @param lsn
* @throws HyracksDataException
*/
- void flushed(String componentTimestamp, long lsn) throws HyracksDataException;
+ void flushed(String componentTimestamp, long lsn, long componentId) throws HyracksDataException;
/**
- * Called when a new LSM disk component is replicated from master. When called, the index checkpoiint is updated
+ * Called when a new LSM disk component is replicated from master. When called, the index checkpoint is updated
* with the latest valid {@code componentTimestamp} and the local lsn mapping of {@code masterLsn} is set as the
* new low watermark.
*
* @param componentTimestamp
* @param masterLsn
+ * @param componentId
* @throws HyracksDataException
*/
- void replicated(String componentTimestamp, long masterLsn) throws HyracksDataException;
+ void replicated(String componentTimestamp, long masterLsn, long componentId) throws HyracksDataException;
/**
* Called when a flush log is received and replicated from master. The mapping between
@@ -89,13 +91,37 @@
* Gets the index last valid component timestamp if the index has any components. Otherwise {@link Optional#empty()}
*
* @return the index last valid component timestamp
+ * @throws HyracksDataException
*/
- Optional<String> getValidComponentTimestamp();
+ Optional<String> getValidComponentTimestamp() throws HyracksDataException;
/**
* Gets the number of valid checkpoints the index has.
*
* @return the number of valid checkpoints
+ * @throws HyracksDataException
*/
- int getCheckpointCount();
+ int getCheckpointCount() throws HyracksDataException;
+
+ /**
+ * @return the latest checkpoint
+ * @throws HyracksDataException
+ */
+ IndexCheckpoint getLatest() throws HyracksDataException;
+
+ /**
+ * Advance the last valid component timestamp. Used for replicated bulkloaded components
+ *
+ * @param timeStamp
+ * @throws HyracksDataException
+ */
+ void advanceValidComponentTimestamp(String timeStamp) throws HyracksDataException;
+
+ /**
+ * Set the last component id. Used during recovery or after component delete
+ *
+ * @param componentId
+ * @throws HyracksDataException
+ */
+ void setLastComponentId(long componentId) throws HyracksDataException;
}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IndexCheckpoint.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IndexCheckpoint.java
index 6e845e1..73d3122 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IndexCheckpoint.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IndexCheckpoint.java
@@ -23,6 +23,7 @@
import java.util.Map;
import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.storage.am.lsm.common.impls.LSMComponentId;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.core.JsonProcessingException;
@@ -35,24 +36,28 @@
private long id;
private String validComponentTimestamp;
private long lowWatermark;
+ private long lastComponentId;
private Map<Long, Long> masterNodeFlushMap;
- public static IndexCheckpoint first(long lowWatermark) {
+ public static IndexCheckpoint first(String lastComponentTimestamp, long lowWatermark) {
IndexCheckpoint firstCheckpoint = new IndexCheckpoint();
firstCheckpoint.id = INITIAL_CHECKPOINT_ID;
firstCheckpoint.lowWatermark = lowWatermark;
- firstCheckpoint.validComponentTimestamp = null;
+ firstCheckpoint.validComponentTimestamp = lastComponentTimestamp;
+ firstCheckpoint.lastComponentId = LSMComponentId.EMPTY_INDEX_LAST_COMPONENT_ID.getMaxId();
firstCheckpoint.masterNodeFlushMap = new HashMap<>();
return firstCheckpoint;
}
- public static IndexCheckpoint next(IndexCheckpoint latest, long lowWatermark, String validComponentTimestamp) {
+ public static IndexCheckpoint next(IndexCheckpoint latest, long lowWatermark, String validComponentTimestamp,
+ long lastComponentId) {
if (lowWatermark < latest.getLowWatermark()) {
throw new IllegalStateException("Low watermark should always be increasing");
}
IndexCheckpoint next = new IndexCheckpoint();
next.id = latest.getId() + 1;
next.lowWatermark = lowWatermark;
+ next.lastComponentId = lastComponentId;
next.validComponentTimestamp = validComponentTimestamp;
next.masterNodeFlushMap = latest.getMasterNodeFlushMap();
// remove any lsn from the map that wont be used anymore
@@ -72,6 +77,10 @@
return lowWatermark;
}
+ public long getLastComponentId() {
+ return lastComponentId;
+ }
+
public Map<Long, Long> getMasterNodeFlushMap() {
return masterNodeFlushMap;
}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/IRecoveryManagerFactory.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/IRecoveryManagerFactory.java
new file mode 100644
index 0000000..1da5c9c
--- /dev/null
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/IRecoveryManagerFactory.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.transactions;
+
+import org.apache.hyracks.api.application.INCServiceContext;
+
+public interface IRecoveryManagerFactory {
+
+ /**
+ * Create the local recovery manager
+ *
+ * @param serviceCtx
+ * the service context
+ * @param txnSubsystem
+ * the transaction subsystem
+ * @return the recovery manager
+ */
+ IRecoveryManager createRecoveryManager(INCServiceContext serviceCtx, ITransactionSubsystem txnSubsystem);
+}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/LogRecord.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/LogRecord.java
index 5fdb4e2..0c3b21d 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/LogRecord.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/LogRecord.java
@@ -24,7 +24,6 @@
import java.util.zip.CRC32;
import org.apache.asterix.common.context.PrimaryIndexOperationTracker;
-import org.apache.commons.lang3.ArrayUtils;
import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
import org.apache.hyracks.storage.am.common.tuples.SimpleTupleReference;
import org.apache.hyracks.storage.am.common.tuples.SimpleTupleWriter;
@@ -271,6 +270,7 @@
computeAndSetLogSize();
break;
case LogType.WAIT:
+ case LogType.WAIT_FOR_FLUSHES:
computeAndSetLogSize();
break;
case LogType.JOB_COMMIT:
@@ -462,6 +462,7 @@
logSize = FLUSH_LOG_SIZE;
break;
case LogType.WAIT:
+ case LogType.WAIT_FOR_FLUSHES:
logSize = WAIT_LOG_SIZE;
break;
case LogType.FILTER:
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/LogType.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/LogType.java
index f02b0de..2d76a11 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/LogType.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/LogType.java
@@ -28,6 +28,7 @@
public static final byte WAIT = 6;
public static final byte FILTER = 7;
public static final byte MARKER = 8;
+ public static final byte WAIT_FOR_FLUSHES = 9;
private static final String STRING_UPDATE = "UPDATE";
private static final String STRING_JOB_COMMIT = "JOB_COMMIT";
@@ -37,6 +38,7 @@
private static final String STRING_WAIT = "WAIT";
private static final String STRING_FILTER = "FILTER";
private static final String STRING_MARKER = "MARKER";
+ private static final String STRING_WAIT_FOR_FLUSHES = "WAIT_FOR_FLUSHES";
private static final String STRING_UNKNOWN_LOG_TYPE = "UNKNOWN_LOG_TYPE";
public static String toString(byte logType) {
@@ -53,6 +55,8 @@
return STRING_FLUSH;
case LogType.WAIT:
return STRING_WAIT;
+ case LogType.WAIT_FOR_FLUSHES:
+ return STRING_WAIT_FOR_FLUSHES;
case LogType.FILTER:
return STRING_FILTER;
case LogType.MARKER:
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StoragePathUtil.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StoragePathUtil.java
index 6b13468..aa2c7af 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StoragePathUtil.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StoragePathUtil.java
@@ -19,17 +19,17 @@
package org.apache.asterix.common.utils;
import java.io.File;
+import java.nio.file.Path;
import java.nio.file.Paths;
-import java.util.function.Function;
import org.apache.asterix.common.cluster.ClusterPartition;
-import org.apache.asterix.common.storage.IndexPathElements;
import org.apache.asterix.common.storage.ResourceReference;
import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
import org.apache.hyracks.algebricks.common.utils.Pair;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.io.FileSplit;
+import org.apache.hyracks.api.io.IIOManager;
import org.apache.hyracks.api.io.MappedFileSplit;
import org.apache.hyracks.dataflow.std.file.ConstantFileSplitProvider;
import org.apache.hyracks.dataflow.std.file.IFileSplitProvider;
@@ -143,4 +143,16 @@
public static String getIndexNameFromPath(String path) {
return Paths.get(path).getFileName().toString();
}
+
+ /**
+ * Get the path of the index containing the passed reference
+ *
+ * @param ioManager
+ * @param ref
+ * @return
+ * @throws HyracksDataException
+ */
+ public static Path getIndexPath(IIOManager ioManager, ResourceReference ref) throws HyracksDataException {
+ return ioManager.resolve(ref.getRelativePath().toString()).getFile().toPath();
+ }
}
diff --git a/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/LSMIOOperationCallbackTest.java b/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/LSMIOOperationCallbackTest.java
index 7af7b6e..29a2aa0 100644
--- a/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/LSMIOOperationCallbackTest.java
+++ b/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/LSMIOOperationCallbackTest.java
@@ -19,6 +19,9 @@
package org.apache.asterix.test.ioopcallbacks;
+import java.text.Format;
+import java.text.SimpleDateFormat;
+import java.util.Date;
import java.util.HashMap;
import java.util.Map;
@@ -35,6 +38,7 @@
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMMemoryComponent;
+import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndexFileManager;
import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMMemoryComponent;
import org.apache.hyracks.storage.am.lsm.common.impls.DiskComponentMetadata;
import org.apache.hyracks.storage.am.lsm.common.impls.FlushOperation;
@@ -60,6 +64,15 @@
* 7. destroy
*/
+ private static final Format FORMATTER =
+ new SimpleDateFormat(AbstractLSMIndexFileManager.COMPONENT_TIMESTAMP_FORMAT);
+
+ private static String getComponentFileName() {
+ Date date = new Date();
+ String ts = FORMATTER.format(date);
+ return ts + '_' + ts;
+ }
+
@Test
public void testNormalSequence() throws HyracksDataException {
int numMemoryComponents = 2;
@@ -81,7 +94,7 @@
flushMap.put(LSMIOOperationCallback.KEY_NEXT_COMPONENT_ID, nextComponentId);
ILSMIndexAccessor firstAccessor = new TestLSMIndexAccessor(new TestLSMIndexOperationContext(mockIndex));
firstAccessor.getOpContext().setParameters(flushMap);
- FileReference firstTarget = new FileReference(Mockito.mock(IODeviceHandle.class), "path");
+ FileReference firstTarget = new FileReference(Mockito.mock(IODeviceHandle.class), getComponentFileName());
LSMComponentFileReferences firstFiles = new LSMComponentFileReferences(firstTarget, firstTarget, firstTarget);
FlushOperation firstFlush = new TestFlushOperation(firstAccessor, firstTarget, callback, indexId, firstFiles,
new LSMComponentId(0, 0));
@@ -97,7 +110,7 @@
flushMap.put(LSMIOOperationCallback.KEY_NEXT_COMPONENT_ID, nextComponentId);
ILSMIndexAccessor secondAccessor = new TestLSMIndexAccessor(new TestLSMIndexOperationContext(mockIndex));
secondAccessor.getOpContext().setParameters(flushMap);
- FileReference secondTarget = new FileReference(Mockito.mock(IODeviceHandle.class), "path");
+ FileReference secondTarget = new FileReference(Mockito.mock(IODeviceHandle.class), getComponentFileName());
LSMComponentFileReferences secondFiles =
new LSMComponentFileReferences(secondTarget, secondTarget, secondTarget);
FlushOperation secondFlush = new TestFlushOperation(secondAccessor, secondTarget, callback, indexId,
@@ -175,7 +188,7 @@
flushMap.put(LSMIOOperationCallback.KEY_NEXT_COMPONENT_ID, expectedId);
ILSMIndexAccessor accessor = new TestLSMIndexAccessor(new TestLSMIndexOperationContext(mockIndex));
accessor.getOpContext().setParameters(flushMap);
- FileReference target = new FileReference(Mockito.mock(IODeviceHandle.class), "path");
+ FileReference target = new FileReference(Mockito.mock(IODeviceHandle.class), getComponentFileName());
LSMComponentFileReferences files = new LSMComponentFileReferences(target, target, target);
FlushOperation flush =
new TestFlushOperation(accessor, target, callback, indexId, files, new LSMComponentId(0, 0));
@@ -210,7 +223,7 @@
IIndexCheckpointManagerProvider indexCheckpointManagerProvider =
Mockito.mock(IIndexCheckpointManagerProvider.class);
IIndexCheckpointManager indexCheckpointManager = Mockito.mock(IIndexCheckpointManager.class);
- Mockito.doNothing().when(indexCheckpointManager).flushed(Mockito.any(), Mockito.anyLong());
+ Mockito.doNothing().when(indexCheckpointManager).flushed(Mockito.any(), Mockito.anyLong(), Mockito.anyLong());
Mockito.doReturn(indexCheckpointManager).when(indexCheckpointManagerProvider).get(Mockito.any());
return indexCheckpointManagerProvider;
}
diff --git a/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/TestLSMIndexAccessor.java b/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/TestLSMIndexAccessor.java
index 9ac143c..c2621d8 100644
--- a/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/TestLSMIndexAccessor.java
+++ b/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/TestLSMIndexAccessor.java
@@ -141,7 +141,7 @@
}
@Override
- public void scheduleReplication(List<ILSMDiskComponent> diskComponents, boolean bulkload, LSMOperationType opType)
+ public void scheduleReplication(List<ILSMDiskComponent> diskComponents, LSMOperationType opType)
throws HyracksDataException {
throw new UnsupportedOperationException();
}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalFilesIndexCreateOperatorDescriptor.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalFilesIndexCreateOperatorDescriptor.java
index 79dc396..df4c093 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalFilesIndexCreateOperatorDescriptor.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalFilesIndexCreateOperatorDescriptor.java
@@ -18,8 +18,11 @@
*/
package org.apache.asterix.external.operators;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
+import org.apache.asterix.common.ioopcallbacks.LSMIOOperationCallback;
import org.apache.asterix.external.indexing.ExternalFile;
import org.apache.asterix.external.indexing.FileIndexTupleTranslator;
import org.apache.hyracks.api.comm.IFrameWriter;
@@ -36,7 +39,8 @@
import org.apache.hyracks.storage.am.common.api.IIndexBuilderFactory;
import org.apache.hyracks.storage.am.common.api.IIndexDataflowHelper;
import org.apache.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
-import org.apache.hyracks.storage.common.IIndex;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
+import org.apache.hyracks.storage.am.lsm.common.impls.LSMComponentId;
import org.apache.hyracks.storage.common.IIndexBulkLoader;
/**
@@ -74,10 +78,13 @@
// Open the index
indexHelper.open();
try {
- IIndex index = indexHelper.getIndexInstance();
+ ILSMIndex index = (ILSMIndex) indexHelper.getIndexInstance();
+ Map<String, Object> parameters = new HashMap<>();
+ parameters.put(LSMIOOperationCallback.KEY_FLUSHED_COMPONENT_ID,
+ LSMComponentId.DEFAULT_COMPONENT_ID);
// Create bulk loader
IIndexBulkLoader bulkLoader =
- index.createBulkLoader(BTree.DEFAULT_FILL_FACTOR, false, files.size(), false);
+ index.createBulkLoader(BTree.DEFAULT_FILL_FACTOR, false, files.size(), false, parameters);
// Load files
for (ExternalFile file : files) {
bulkLoader.add(filesTupleTranslator.getTupleFromFile(file));
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalFilesIndexModificationOperatorDescriptor.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalFilesIndexModificationOperatorDescriptor.java
index 4bc2867..3bada4a 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalFilesIndexModificationOperatorDescriptor.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalFilesIndexModificationOperatorDescriptor.java
@@ -18,7 +18,9 @@
*/
package org.apache.asterix.external.operators;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import org.apache.asterix.common.exceptions.ErrorCode;
import org.apache.asterix.external.indexing.ExternalFile;
@@ -69,9 +71,10 @@
indexHelper.open();
IIndex index = indexHelper.getIndexInstance();
LSMTwoPCBTreeBulkLoader bulkLoader = null;
+ Map<String, Object> parameters = new HashMap<>();
try {
bulkLoader = (LSMTwoPCBTreeBulkLoader) ((ExternalBTree) index)
- .createTransactionBulkLoader(BTree.DEFAULT_FILL_FACTOR, false, files.size());
+ .createTransactionBulkLoader(BTree.DEFAULT_FILL_FACTOR, false, files.size(), parameters);
// Load files
// The files must be ordered according to their numbers
for (ExternalFile file : files) {
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalIndexBulkLoadOperatorNodePushable.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalIndexBulkLoadOperatorNodePushable.java
index 573de5d..74bc0dc 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalIndexBulkLoadOperatorNodePushable.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalIndexBulkLoadOperatorNodePushable.java
@@ -18,12 +18,18 @@
*/
package org.apache.asterix.external.operators;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.asterix.common.ioopcallbacks.LSMIOOperationCallback;
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
import org.apache.hyracks.storage.am.common.dataflow.IndexBulkLoadOperatorNodePushable;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
import org.apache.hyracks.storage.am.lsm.common.api.ITwoPCIndex;
+import org.apache.hyracks.storage.am.lsm.common.impls.LSMComponentId;
public class ExternalIndexBulkLoadOperatorNodePushable extends IndexBulkLoadOperatorNodePushable {
@@ -43,4 +49,12 @@
super.open();
((ITwoPCIndex) index).setCurrentVersion(version);
}
+
+ @Override
+ protected void initializeBulkLoader() throws HyracksDataException {
+ Map<String, Object> parameters = new HashMap<>();
+ parameters.put(LSMIOOperationCallback.KEY_FLUSHED_COMPONENT_ID, LSMComponentId.DEFAULT_COMPONENT_ID);
+ bulkLoader = ((ILSMIndex) index).createBulkLoader(fillFactor, verifyInput, numElementsHint, checkIfEmptyIndex,
+ parameters);
+ }
}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalIndexBulkModifyOperatorNodePushable.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalIndexBulkModifyOperatorNodePushable.java
index aaca3f1..57e2917 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalIndexBulkModifyOperatorNodePushable.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalIndexBulkModifyOperatorNodePushable.java
@@ -19,6 +19,8 @@
package org.apache.asterix.external.operators;
import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.Map;
import org.apache.asterix.external.indexing.FilesIndexDescription;
import org.apache.asterix.om.base.AMutableInt32;
@@ -60,8 +62,9 @@
try {
writer.open();
// Transactional BulkLoader
- bulkLoader =
- ((ITwoPCIndex) index).createTransactionBulkLoader(fillFactor, verifyInput, deletedFiles.length);
+ Map<String, Object> parameters = new HashMap<>();
+ bulkLoader = ((ITwoPCIndex) index).createTransactionBulkLoader(fillFactor, verifyInput, deletedFiles.length,
+ parameters);
// Delete files
for (int i = 0; i < deletedFiles.length; i++) {
fileNumber.setValue(deletedFiles[i]);
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/CheckpointPartitionIndexesTask.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/CheckpointPartitionIndexesTask.java
index 84922cd..d4d601c 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/CheckpointPartitionIndexesTask.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/CheckpointPartitionIndexesTask.java
@@ -22,16 +22,20 @@
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.OutputStream;
+import java.nio.file.Path;
import java.util.Collection;
import org.apache.asterix.common.api.INcApplicationContext;
-import org.apache.asterix.replication.api.IReplicationWorker;
import org.apache.asterix.common.storage.DatasetResourceReference;
import org.apache.asterix.common.storage.IIndexCheckpointManager;
import org.apache.asterix.common.storage.IIndexCheckpointManagerProvider;
+import org.apache.asterix.common.utils.StoragePathUtil;
import org.apache.asterix.replication.api.IReplicaTask;
+import org.apache.asterix.replication.api.IReplicationWorker;
import org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository;
import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.io.IIOManager;
+import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndexFileManager;
import org.apache.hyracks.storage.common.LocalResource;
/**
@@ -51,13 +55,27 @@
appCtx.getIndexCheckpointManagerProvider();
PersistentLocalResourceRepository resRepo =
(PersistentLocalResourceRepository) appCtx.getLocalResourceRepository();
+ final IIOManager ioManager = appCtx.getIoManager();
final Collection<LocalResource> partitionResources = resRepo.getPartitionResources(partition).values();
final long currentLSN = appCtx.getTransactionSubsystem().getLogManager().getAppendLSN();
for (LocalResource ls : partitionResources) {
- final IIndexCheckpointManager indexCheckpointManager =
- indexCheckpointManagerProvider.get(DatasetResourceReference.of(ls));
+ DatasetResourceReference ref = DatasetResourceReference.of(ls);
+ final IIndexCheckpointManager indexCheckpointManager = indexCheckpointManagerProvider.get(ref);
indexCheckpointManager.delete();
- indexCheckpointManager.init(currentLSN);
+ // Get most recent timestamp of existing files to avoid deletion
+ Path indexPath = StoragePathUtil.getIndexPath(ioManager, ref);
+ String[] files = indexPath.toFile().list(AbstractLSMIndexFileManager.COMPONENT_FILES_FILTER);
+ if (files == null) {
+ throw HyracksDataException
+ .create(new IOException(indexPath + " is not a directory or an IO Error occurred"));
+ }
+ String mostRecentTimestamp = null;
+ for (String file : files) {
+ String nextTimeStamp = AbstractLSMIndexFileManager.getComponentEndTime(file);
+ mostRecentTimestamp = mostRecentTimestamp == null || nextTimeStamp.compareTo(mostRecentTimestamp) > 0
+ ? nextTimeStamp : mostRecentTimestamp;
+ }
+ indexCheckpointManager.init(mostRecentTimestamp, currentLSN);
}
ReplicationProtocol.sendAck(worker.getChannel(), worker.getReusableBuffer());
}
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/MarkComponentValidTask.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/MarkComponentValidTask.java
index 57474ef..a4f9b43 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/MarkComponentValidTask.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/MarkComponentValidTask.java
@@ -34,6 +34,7 @@
import org.apache.asterix.common.storage.ResourceReference;
import org.apache.asterix.replication.api.IReplicaTask;
import org.apache.asterix.replication.api.IReplicationWorker;
+import org.apache.asterix.replication.sync.IndexSynchronizer;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndexFileManager;
@@ -43,17 +44,21 @@
public class MarkComponentValidTask implements IReplicaTask {
private final long masterLsn;
+ private final long lastComponentId;
private final String file;
- public MarkComponentValidTask(String file, long masterLsn) {
+ public MarkComponentValidTask(String file, long masterLsn, long lastComponentId) {
this.file = file;
+ this.lastComponentId = lastComponentId;
this.masterLsn = masterLsn;
}
@Override
public void perform(INcApplicationContext appCtx, IReplicationWorker worker) {
try {
- if (masterLsn > 0) {
+ if (masterLsn == IndexSynchronizer.BULKLOAD_LSN) {
+ updateBulkLoadedLastComponentTimestamp(appCtx);
+ } else if (masterLsn != IndexSynchronizer.MERGE_LSN) {
ensureComponentLsnFlushed(appCtx);
}
// delete mask
@@ -65,6 +70,15 @@
}
}
+ private void updateBulkLoadedLastComponentTimestamp(INcApplicationContext appCtx) throws HyracksDataException {
+ final ResourceReference indexRef = ResourceReference.of(file);
+ final IIndexCheckpointManagerProvider checkpointManagerProvider = appCtx.getIndexCheckpointManagerProvider();
+ final IIndexCheckpointManager indexCheckpointManager = checkpointManagerProvider.get(indexRef);
+ final String componentEndTime = AbstractLSMIndexFileManager.getComponentEndTime(indexRef.getName());
+ indexCheckpointManager.advanceValidComponentTimestamp(componentEndTime);
+
+ }
+
private void ensureComponentLsnFlushed(INcApplicationContext appCtx)
throws HyracksDataException, InterruptedException {
final ResourceReference indexRef = ResourceReference.of(file);
@@ -82,7 +96,7 @@
replicationTimeOut -= TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTime);
}
final String componentEndTime = AbstractLSMIndexFileManager.getComponentEndTime(indexRef.getName());
- indexCheckpointManager.replicated(componentEndTime, masterLsn);
+ indexCheckpointManager.replicated(componentEndTime, masterLsn, lastComponentId);
}
}
@@ -97,6 +111,7 @@
final DataOutputStream dos = new DataOutputStream(out);
dos.writeUTF(file);
dos.writeLong(masterLsn);
+ dos.writeLong(lastComponentId);
} catch (IOException e) {
throw HyracksDataException.create(e);
}
@@ -105,6 +120,7 @@
public static MarkComponentValidTask create(DataInput input) throws IOException {
final String indexFile = input.readUTF();
final long lsn = input.readLong();
- return new MarkComponentValidTask(indexFile, lsn);
+ final long lastComponentId = input.readLong();
+ return new MarkComponentValidTask(indexFile, lsn, lastComponentId);
}
}
\ No newline at end of file
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ReplicateFileTask.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ReplicateFileTask.java
index ca0fcca..20663d1 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ReplicateFileTask.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ReplicateFileTask.java
@@ -30,12 +30,12 @@
import org.apache.asterix.common.api.INcApplicationContext;
import org.apache.asterix.common.exceptions.ReplicationException;
-import org.apache.asterix.replication.api.IReplicationWorker;
import org.apache.asterix.common.storage.IIndexCheckpointManager;
import org.apache.asterix.common.storage.IIndexCheckpointManagerProvider;
import org.apache.asterix.common.storage.ResourceReference;
import org.apache.asterix.common.utils.StorageConstants;
import org.apache.asterix.replication.api.IReplicaTask;
+import org.apache.asterix.replication.api.IReplicationWorker;
import org.apache.asterix.replication.management.NetworkingUtil;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.io.FileReference;
@@ -98,7 +98,7 @@
final IIndexCheckpointManager indexCheckpointManager = checkpointManagerProvider.get(indexRef);
final long currentLSN = appCtx.getTransactionSubsystem().getLogManager().getAppendLSN();
indexCheckpointManager.delete();
- indexCheckpointManager.init(currentLSN);
+ indexCheckpointManager.init(null, currentLSN);
LOGGER.info(() -> "Checkpoint index: " + indexRef);
}
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/IndexSynchronizer.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/IndexSynchronizer.java
index 0e07ac7..30a5595 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/IndexSynchronizer.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/IndexSynchronizer.java
@@ -37,12 +37,15 @@
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexOperationContext;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexReplicationJob;
import org.apache.hyracks.storage.am.lsm.common.api.LSMOperationType;
+import org.apache.hyracks.storage.am.lsm.common.impls.LSMComponentId;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
public class IndexSynchronizer {
private static final Logger LOGGER = LogManager.getLogger();
+ public static final long MERGE_LSN = -1;
+ public static final long BULKLOAD_LSN = -2;
private final IReplicationJob job;
private final INcApplicationContext appCtx;
@@ -91,7 +94,8 @@
final FileSynchronizer fileSynchronizer = new FileSynchronizer(appCtx, replica);
job.getJobFiles().stream().map(StoragePathUtil::getFileRelativePath).forEach(fileSynchronizer::replicate);
// send mark component valid
- MarkComponentValidTask markValidTask = new MarkComponentValidTask(indexFile, getReplicatedComponentLsn());
+ MarkComponentValidTask markValidTask =
+ new MarkComponentValidTask(indexFile, getReplicatedComponentLsn(), getReplicatedComponentId());
ReplicationProtocol.sendTo(replica, markValidTask);
ReplicationProtocol.waitForAck(replica);
LOGGER.debug("Replicated component ({}) to replica {}", indexFile, replica);
@@ -118,6 +122,12 @@
private long getReplicatedComponentLsn() throws HyracksDataException {
final ILSMIndexReplicationJob indexReplJob = (ILSMIndexReplicationJob) job;
+ if (indexReplJob.getLSMOpType() == LSMOperationType.MERGE) {
+ return MERGE_LSN;
+ } else if (indexReplJob.getLSMOpType() == LSMOperationType.LOAD) {
+ return BULKLOAD_LSN;
+ }
+
if (indexReplJob.getLSMOpType() != LSMOperationType.FLUSH) {
return LSMIOOperationCallback.INVALID_LSN;
}
@@ -126,4 +136,14 @@
return ((LSMIOOperationCallback) lsmIndex.getIOOperationCallback())
.getComponentLSN(ctx.getComponentsToBeReplicated());
}
+
+ private long getReplicatedComponentId() throws HyracksDataException {
+ final ILSMIndexReplicationJob indexReplJob = (ILSMIndexReplicationJob) job;
+ if (indexReplJob.getLSMOpType() != LSMOperationType.FLUSH) {
+ return -1L;
+ }
+ final ILSMIndexOperationContext ctx = indexReplJob.getLSMIndexOperationContext();
+ LSMComponentId id = (LSMComponentId) ctx.getComponentsToBeReplicated().get(0).getId();
+ return id.getMinId();
+ }
}
\ No newline at end of file
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMIndexBulkLoadOperatorNodePushable.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMIndexBulkLoadOperatorNodePushable.java
index 2415556..4130490 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMIndexBulkLoadOperatorNodePushable.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMIndexBulkLoadOperatorNodePushable.java
@@ -18,10 +18,13 @@
*/
package org.apache.asterix.runtime.operators;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import org.apache.asterix.common.api.IDatasetLifecycleManager;
import org.apache.asterix.common.api.INcApplicationContext;
+import org.apache.asterix.common.ioopcallbacks.LSMIOOperationCallback;
import org.apache.asterix.runtime.operators.LSMIndexBulkLoadOperatorDescriptor.BulkLoadUsage;
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
@@ -33,7 +36,6 @@
import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
import org.apache.hyracks.storage.am.lsm.common.impls.LSMComponentId;
-import org.apache.hyracks.storage.am.lsm.common.impls.LSMIndexDiskComponentBulkLoader;
import org.apache.hyracks.storage.am.lsm.common.util.LSMComponentIdUtils;
public class LSMIndexBulkLoadOperatorNodePushable extends IndexBulkLoadOperatorNodePushable {
@@ -71,29 +73,26 @@
@Override
protected void initializeBulkLoader() throws HyracksDataException {
ILSMIndex targetIndex = (ILSMIndex) index;
+ Map<String, Object> parameters = new HashMap<>();
+ parameters.put(LSMIOOperationCallback.KEY_FLUSHED_COMPONENT_ID, LSMComponentId.DEFAULT_COMPONENT_ID);
if (usage.equals(BulkLoadUsage.LOAD)) {
- // for a loaded dataset, we use the default Id 0 which is guaranteed to be smaller
- // than Ids of all memory components
-
- // TODO handle component Id for datasets loaded multiple times
- // TODO move this piece of code to io operation callback
- bulkLoader = targetIndex.createBulkLoader(fillFactor, verifyInput, numElementsHint, checkIfEmptyIndex);
- ILSMDiskComponent diskComponent = ((LSMIndexDiskComponentBulkLoader) bulkLoader).getComponent();
- LSMComponentIdUtils.persist(LSMComponentId.DEFAULT_COMPONENT_ID, diskComponent.getMetadata());
+ bulkLoader = targetIndex.createBulkLoader(fillFactor, verifyInput, numElementsHint, checkIfEmptyIndex,
+ parameters);
} else {
primaryIndexHelper.open();
primaryIndex = (ILSMIndex) primaryIndexHelper.getIndexInstance();
List<ILSMDiskComponent> primaryComponents = primaryIndex.getDiskComponents();
- bulkLoader = targetIndex.createBulkLoader(fillFactor, verifyInput, numElementsHint, checkIfEmptyIndex);
if (!primaryComponents.isEmpty()) {
- // TODO move this piece of code to io operation callback
- // Ideally, this should be done in io operation callback when a bulk load operation is finished
- // However, currently we don't have an extensible callback mechanism to support this
ILSMComponentId bulkloadId = LSMComponentIdUtils.union(primaryComponents.get(0).getId(),
primaryComponents.get(primaryComponents.size() - 1).getId());
- ILSMDiskComponent diskComponent = ((LSMIndexDiskComponentBulkLoader) bulkLoader).getComponent();
- LSMComponentIdUtils.persist(bulkloadId, diskComponent.getMetadata());
+ parameters.put(LSMIOOperationCallback.KEY_FLUSHED_COMPONENT_ID, bulkloadId);
+ } else {
+ parameters.put(LSMIOOperationCallback.KEY_FLUSHED_COMPONENT_ID,
+ LSMComponentId.EMPTY_INDEX_LAST_COMPONENT_ID);
}
+ bulkLoader = targetIndex.createBulkLoader(fillFactor, verifyInput, numElementsHint, checkIfEmptyIndex,
+ parameters);
+
}
}
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryIndexBulkLoadNodePushable.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryIndexBulkLoadNodePushable.java
index 6d9ec47..1029d6f 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryIndexBulkLoadNodePushable.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryIndexBulkLoadNodePushable.java
@@ -19,7 +19,10 @@
package org.apache.asterix.runtime.operators;
import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.asterix.common.ioopcallbacks.LSMIOOperationCallback;
import org.apache.asterix.runtime.operators.LSMSecondaryIndexCreationTupleProcessorNodePushable.DeletedTupleCounter;
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
@@ -32,9 +35,9 @@
import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
import org.apache.hyracks.storage.am.lsm.common.impls.LSMIndexDiskComponentBulkLoader;
-import org.apache.hyracks.storage.am.lsm.common.util.LSMComponentIdUtils;
/**
+ * Note: only used with correlated merge policy
* This operator node is used to bulk load incoming tuples (scanned from the primary index)
* into multiple disk components of the secondary index.
* Incoming tuple format:
@@ -182,14 +185,12 @@
private void loadNewComponent(int componentPos) throws HyracksDataException {
endCurrentComponent();
-
int numTuples = getNumDeletedTuples(componentPos);
ILSMDiskComponent primaryComponent = primaryIndex.getDiskComponents().get(componentPos);
- componentBulkLoader =
- (LSMIndexDiskComponentBulkLoader) secondaryIndex.createBulkLoader(1.0f, false, numTuples, false);
- ILSMDiskComponent diskComponent = componentBulkLoader.getComponent();
- // TODO move this piece of code to io operation callback
- LSMComponentIdUtils.persist(primaryComponent.getId(), diskComponent.getMetadata());
+ Map<String, Object> parameters = new HashMap<>();
+ parameters.put(LSMIOOperationCallback.KEY_FLUSHED_COMPONENT_ID, primaryComponent.getId());
+ componentBulkLoader = (LSMIndexDiskComponentBulkLoader) secondaryIndex.createBulkLoader(1.0f, false, numTuples,
+ false, parameters);
}
private void addAntiMatterTuple(ITupleReference tuple) throws HyracksDataException {
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/std/FlushDatasetOperatorDescriptor.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/std/FlushDatasetOperatorDescriptor.java
index d61e9a0..7e42d14 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/std/FlushDatasetOperatorDescriptor.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/std/FlushDatasetOperatorDescriptor.java
@@ -22,6 +22,7 @@
import org.apache.asterix.common.api.IDatasetLifecycleManager;
import org.apache.asterix.common.api.INcApplicationContext;
+import org.apache.asterix.common.context.DatasetInfo;
import org.apache.asterix.common.exceptions.ACIDException;
import org.apache.asterix.common.transactions.DatasetId;
import org.apache.asterix.common.transactions.ILockManager;
@@ -82,7 +83,13 @@
// lock the dataset granule
lockManager.lock(datasetId, -1, LockMode.S, txnCtx);
// flush the dataset synchronously
- datasetLifeCycleManager.flushDataset(datasetId.getId(), false);
+ DatasetInfo datasetInfo = datasetLifeCycleManager.getDatasetInfo(datasetId.getId());
+ // TODO: Remove the isOpen check and let it fail if flush is requested for a dataset that is closed
+ synchronized (datasetLifeCycleManager) {
+ if (datasetInfo.isOpen()) {
+ datasetLifeCycleManager.flushDataset(datasetId.getId(), false);
+ }
+ }
} catch (ACIDException e) {
throw HyracksDataException.create(e);
}
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
index 0375c30..e1963cb 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
@@ -186,7 +186,7 @@
}
resourceCache.put(resource.getPath(), resource);
- indexCheckpointManagerProvider.get(DatasetResourceReference.of(resource)).init(0);
+ indexCheckpointManagerProvider.get(DatasetResourceReference.of(resource)).init(null, 0);
//if replication enabled, send resource metadata info to remote nodes
if (isReplicationEnabled) {
createReplicationJob(ReplicationOperation.REPLICATE, resourceFile);
@@ -429,15 +429,20 @@
}
private void deleteIndexInvalidComponents(File index) throws IOException, ParseException {
+ final Format formatter = THREAD_LOCAL_FORMATTER.get();
+ final File[] indexComponentFiles = index.listFiles(COMPONENT_FILES_FILTER);
+ if (indexComponentFiles == null) {
+ throw new IOException(index + " doesn't exist or an IO error occurred");
+ }
final Optional<String> validComponentTimestamp = getIndexCheckpointManager(index).getValidComponentTimestamp();
if (!validComponentTimestamp.isPresent()) {
- // index doesn't have any components
- return;
- }
- final Format formatter = THREAD_LOCAL_FORMATTER.get();
- final Date validTimestamp = (Date) formatter.parseObject(validComponentTimestamp.get());
- final File[] indexComponentFiles = index.listFiles(COMPONENT_FILES_FILTER);
- if (indexComponentFiles != null) {
+ // index doesn't have any valid component, delete all
+ for (File componentFile : indexComponentFiles) {
+ LOGGER.info(() -> "Deleting invalid component file: " + componentFile.getAbsolutePath());
+ Files.delete(componentFile.toPath());
+ }
+ } else {
+ final Date validTimestamp = (Date) formatter.parseObject(validComponentTimestamp.get());
for (File componentFile : indexComponentFiles) {
// delete any file with startTime > validTimestamp
final String fileStartTimeStr =
@@ -505,7 +510,8 @@
* e.g. a component file 2018-01-08-01-08-50-439_2018-01-08-01-08-50-439_b
* will return a component id 2018-01-08-01-08-50-439_2018-01-08-01-08-50-439
*
- * @param componentFile any component file
+ * @param componentFile
+ * any component file
* @return The component id
*/
public static String getComponentId(String componentFile) {
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogBuffer.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogBuffer.java
index 21268e5..4085fb4 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogBuffer.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogBuffer.java
@@ -89,8 +89,7 @@
public void append(ILogRecord logRecord, long appendLsn) {
logRecord.writeLogRecord(appendBuffer);
- if (logRecord.getLogSource() == LogSource.LOCAL && logRecord.getLogType() != LogType.FLUSH
- && logRecord.getLogType() != LogType.WAIT) {
+ if (isLocalTransactionLog(logRecord)) {
logRecord.getTxnCtx().setLastLSN(appendLsn);
}
@@ -100,13 +99,10 @@
LOGGER.info("append()| appendOffset: " + appendOffset);
}
if (logRecord.getLogSource() == LogSource.LOCAL) {
- if (logRecord.getLogType() == LogType.JOB_COMMIT || logRecord.getLogType() == LogType.ABORT
- || logRecord.getLogType() == LogType.WAIT) {
+ if (syncPendingNonFlushLog(logRecord)) {
logRecord.isFlushed(false);
syncCommitQ.add(logRecord);
- }
- if (logRecord.getLogType() == LogType.FLUSH) {
- logRecord.isFlushed(false);
+ } else if (logRecord.getLogType() == LogType.FLUSH) {
flushQ.add(logRecord);
}
} else if (logRecord.getLogSource() == LogSource.REMOTE && (logRecord.getLogType() == LogType.JOB_COMMIT
@@ -117,6 +113,16 @@
}
}
+ private boolean syncPendingNonFlushLog(ILogRecord logRecord) {
+ return logRecord.getLogType() == LogType.JOB_COMMIT || logRecord.getLogType() == LogType.ABORT
+ || logRecord.getLogType() == LogType.WAIT || logRecord.getLogType() == LogType.WAIT_FOR_FLUSHES;
+ }
+
+ private boolean isLocalTransactionLog(ILogRecord logRecord) {
+ return logRecord.getLogSource() == LogSource.LOCAL && logRecord.getLogType() != LogType.FLUSH
+ && logRecord.getLogType() != LogType.WAIT && logRecord.getLogType() != LogType.WAIT_FOR_FLUSHES;
+ }
+
@Override
public void setFileChannel(FileChannel fileChannel) {
this.fileChannel = fileChannel;
@@ -231,7 +237,8 @@
notifyJobTermination();
} else if (logRecord.getLogType() == LogType.FLUSH) {
notifyFlushTermination();
- } else if (logRecord.getLogType() == LogType.WAIT) {
+ } else if (logRecord.getLogType() == LogType.WAIT
+ || logRecord.getLogType() == LogType.WAIT_FOR_FLUSHES) {
notifyWaitTermination();
}
} else if (logRecord.getLogSource() == LogSource.REMOTE && (logRecord.getLogType() == LogType.JOB_COMMIT
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java
index 736de07..be227ec 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java
@@ -32,7 +32,6 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
-import java.util.HashMap;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
@@ -134,13 +133,33 @@
@Override
public void log(ILogRecord logRecord) {
- if (logRecord.getLogType() == LogType.FLUSH) {
- flushLogsQ.add(logRecord);
- return;
+ if (!logToFlushQueue(logRecord)) {
+ appendToLogTail(logRecord);
}
- appendToLogTail(logRecord);
}
+ @SuppressWarnings("squid:S2445")
+ protected boolean logToFlushQueue(ILogRecord logRecord) {
+ //Remote flush logs do not need to be flushed separately since they may not trigger local flush
+ if ((logRecord.getLogType() == LogType.FLUSH && logRecord.getLogSource() == LogSource.LOCAL)
+ || logRecord.getLogType() == LogType.WAIT_FOR_FLUSHES) {
+ logRecord.isFlushed(false);
+ flushLogsQ.add(logRecord);
+ if (logRecord.getLogType() == LogType.WAIT_FOR_FLUSHES) {
+ InvokeUtil.doUninterruptibly(() -> {
+ synchronized (logRecord) {
+ while (!logRecord.isFlushed()) {
+ logRecord.wait();
+ }
+ }
+ });
+ }
+ return true;
+ }
+ return false;
+ }
+
+ @SuppressWarnings("squid:S2445")
protected void appendToLogTail(ILogRecord logRecord) {
syncAppendToLogTail(logRecord);
if (waitForFlush(logRecord) && !logRecord.isFlushed()) {
@@ -161,7 +180,7 @@
synchronized void syncAppendToLogTail(ILogRecord logRecord) {
if (logRecord.getLogSource() == LogSource.LOCAL && logRecord.getLogType() != LogType.FLUSH
- && logRecord.getLogType() != LogType.WAIT) {
+ && logRecord.getLogType() != LogType.WAIT && logRecord.getLogType() != LogType.WAIT_FOR_FLUSHES) {
ITransactionContext txnCtx = logRecord.getTxnCtx();
if (txnCtx.getTxnState() == ITransactionManager.ABORTED && logRecord.getLogType() != LogType.ABORT) {
throw new ACIDException(
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManagerWithReplication.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManagerWithReplication.java
index 1e13883..d2e9629 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManagerWithReplication.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManagerWithReplication.java
@@ -40,9 +40,11 @@
super(txnSubsystem);
}
+ @SuppressWarnings("squid:S2445")
@Override
public void log(ILogRecord logRecord) {
- boolean shouldReplicate = logRecord.getLogSource() == LogSource.LOCAL && logRecord.getLogType() != LogType.WAIT;
+ boolean shouldReplicate = logRecord.getLogSource() == LogSource.LOCAL && logRecord.getLogType() != LogType.WAIT
+ && logRecord.getLogType() != LogType.WAIT_FOR_FLUSHES;
if (shouldReplicate) {
switch (logRecord.getLogType()) {
case LogType.ENTITY_COMMIT:
@@ -63,16 +65,12 @@
}
}
logRecord.setReplicate(shouldReplicate);
-
- //Remote flush logs do not need to be flushed separately since they may not trigger local flush
- if (logRecord.getLogType() == LogType.FLUSH && logRecord.getLogSource() == LogSource.LOCAL) {
- flushLogsQ.add(logRecord);
- return;
+ if (!logToFlushQueue(logRecord)) {
+ appendToLogTail(logRecord);
}
-
- appendToLogTail(logRecord);
}
+ @SuppressWarnings("squid:S2445")
@Override
protected void appendToLogTail(ILogRecord logRecord) {
syncAppendToLogTail(logRecord);
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/api/ITwoPCIndexBulkLoader.java b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/api/ITwoPCIndexBulkLoader.java
index 16c0afa..c7be7eb 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/api/ITwoPCIndexBulkLoader.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/api/ITwoPCIndexBulkLoader.java
@@ -39,6 +39,6 @@
/**
* Abort the bulk modify op
*/
- public void abort();
+ public void abort() throws HyracksDataException;
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/ExternalBTree.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/ExternalBTree.java
index cfffbe1..a504f7e 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/ExternalBTree.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/ExternalBTree.java
@@ -22,6 +22,7 @@
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
+import java.util.Map;
import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
import org.apache.hyracks.api.exceptions.HyracksDataException;
@@ -395,16 +396,16 @@
// For initial load
@Override
- public IIndexBulkLoader createBulkLoader(float fillLevel, boolean verifyInput, long numElementsHint)
- throws HyracksDataException {
- return new LSMTwoPCBTreeBulkLoader(fillLevel, verifyInput, numElementsHint, false);
+ public IIndexBulkLoader createBulkLoader(float fillLevel, boolean verifyInput, long numElementsHint,
+ Map<String, Object> parameters) throws HyracksDataException {
+ return new LSMTwoPCBTreeBulkLoader(fillLevel, verifyInput, numElementsHint, false, parameters);
}
// For transaction bulk load <- could consolidate with the above method ->
@Override
- public IIndexBulkLoader createTransactionBulkLoader(float fillLevel, boolean verifyInput, long numElementsHint)
- throws HyracksDataException {
- return new LSMTwoPCBTreeBulkLoader(fillLevel, verifyInput, numElementsHint, true);
+ public IIndexBulkLoader createTransactionBulkLoader(float fillLevel, boolean verifyInput, long numElementsHint,
+ Map<String, Object> parameters) throws HyracksDataException {
+ return new LSMTwoPCBTreeBulkLoader(fillLevel, verifyInput, numElementsHint, true, parameters);
}
// The bulk loader used for both initial loading and transaction
@@ -412,19 +413,35 @@
public class LSMTwoPCBTreeBulkLoader implements IIndexBulkLoader, ITwoPCIndexBulkLoader {
private final ILSMDiskComponent component;
private final ILSMDiskComponentBulkLoader componentBulkLoader;
+ private final LoadOperation loadOp;
private final boolean isTransaction;
public LSMTwoPCBTreeBulkLoader(float fillFactor, boolean verifyInput, long numElementsHint,
- boolean isTransaction) throws HyracksDataException {
+ boolean isTransaction, Map<String, Object> parameters) throws HyracksDataException {
this.isTransaction = isTransaction;
// Create the appropriate target
+ LSMComponentFileReferences componentFileRefs;
if (isTransaction) {
- component = createTransactionTarget();
+ try {
+ componentFileRefs = fileManager.getNewTransactionFileReference();
+ } catch (IOException e) {
+ throw HyracksDataException.create(e);
+ }
+ component = createDiskComponent(transactionComponentFactory,
+ componentFileRefs.getInsertIndexFileReference(), null,
+ componentFileRefs.getBloomFilterFileReference(), true);
} else {
- component = createBulkLoadTarget();
+ componentFileRefs = fileManager.getRelFlushFileReference();
+ component =
+ createDiskComponent(bulkLoadComponentFactory, componentFileRefs.getInsertIndexFileReference(),
+ componentFileRefs.getDeleteIndexFileReference(),
+ componentFileRefs.getBloomFilterFileReference(), true);
}
- LoadOperation loadOp = new LoadOperation(ioOpCallback, getIndexIdentifier());
+ loadOp = new LoadOperation(componentFileRefs, ioOpCallback, getIndexIdentifier(), parameters);
+ loadOp.setNewComponent(component);
+ ioOpCallback.scheduled(loadOp);
+ ioOpCallback.beforeOperation(loadOp);
componentBulkLoader =
component.createBulkLoader(loadOp, fillFactor, verifyInput, numElementsHint, false, true, true);
}
@@ -438,16 +455,23 @@
@Override
public void end() throws HyracksDataException {
- componentBulkLoader.end();
- if (component.getComponentSize() > 0) {
- if (isTransaction) {
- // Since this is a transaction component, validate and
- // deactivate. it could later be added or deleted
- component.markAsValid(durable);
- component.deactivate();
- } else {
- getHarness().addBulkLoadedComponent(component);
+ try {
+ ioOpCallback.afterOperation(loadOp);
+ componentBulkLoader.end();
+ if (component.getComponentSize() > 0) {
+ if (isTransaction) {
+ // Since this is a transaction component, validate and
+ // deactivate. it could later be added or deleted
+ component.markAsValid(durable);
+ ioOpCallback.afterFinalize(loadOp);
+ component.deactivate();
+ } else {
+ ioOpCallback.afterFinalize(loadOp);
+ getHarness().addBulkLoadedComponent(component);
+ }
}
+ } finally {
+ ioOpCallback.completed(loadOp);
}
}
@@ -459,26 +483,13 @@
}
@Override
- public void abort() {
+ public void abort() throws HyracksDataException {
try {
componentBulkLoader.abort();
- } catch (Exception e) {
- // Do nothing
+ } finally {
+ ioOpCallback.completed(loadOp);
}
}
-
- // This method is used to create a target for a bulk modify operation. This
- // component must then be either committed or deleted
- private ILSMDiskComponent createTransactionTarget() throws HyracksDataException {
- LSMComponentFileReferences componentFileRefs;
- try {
- componentFileRefs = fileManager.getNewTransactionFileReference();
- } catch (IOException e) {
- throw HyracksDataException.create(e);
- }
- return createDiskComponent(transactionComponentFactory, componentFileRefs.getInsertIndexFileReference(),
- null, componentFileRefs.getBloomFilterFileReference(), true);
- }
}
// The accessor for disk only indexes don't use modification callback and always carry the target index version with them
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/ExternalBTreeWithBuddy.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/ExternalBTreeWithBuddy.java
index 3e762c1..b727a39 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/ExternalBTreeWithBuddy.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/ExternalBTreeWithBuddy.java
@@ -21,6 +21,7 @@
import java.io.IOException;
import java.util.LinkedList;
import java.util.List;
+import java.util.Map;
import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
import org.apache.hyracks.api.exceptions.ErrorCode;
@@ -216,16 +217,16 @@
// For initial load
@Override
- public IIndexBulkLoader createBulkLoader(float fillLevel, boolean verifyInput, long numElementsHint)
- throws HyracksDataException {
- return new LSMTwoPCBTreeWithBuddyBulkLoader(fillLevel, verifyInput, 0, false);
+ public IIndexBulkLoader createBulkLoader(float fillLevel, boolean verifyInput, long numElementsHint,
+ Map<String, Object> parameters) throws HyracksDataException {
+ return new LSMTwoPCBTreeWithBuddyBulkLoader(fillLevel, verifyInput, 0, false, parameters);
}
// For transaction bulk load <- could consolidate with the above method ->
@Override
- public IIndexBulkLoader createTransactionBulkLoader(float fillLevel, boolean verifyInput, long numElementsHint)
- throws HyracksDataException {
- return new LSMTwoPCBTreeWithBuddyBulkLoader(fillLevel, verifyInput, numElementsHint, true);
+ public IIndexBulkLoader createTransactionBulkLoader(float fillLevel, boolean verifyInput, long numElementsHint,
+ Map<String, Object> parameters) throws HyracksDataException {
+ return new LSMTwoPCBTreeWithBuddyBulkLoader(fillLevel, verifyInput, numElementsHint, true, parameters);
}
@Override
@@ -494,20 +495,37 @@
// modifications
public class LSMTwoPCBTreeWithBuddyBulkLoader implements IIndexBulkLoader, ITwoPCIndexBulkLoader {
private final ILSMDiskComponent component;
+ private final LoadOperation loadOp;
private final ILSMDiskComponentBulkLoader componentBulkLoader;
private final boolean isTransaction;
public LSMTwoPCBTreeWithBuddyBulkLoader(float fillFactor, boolean verifyInput, long numElementsHint,
- boolean isTransaction) throws HyracksDataException {
+ boolean isTransaction, Map<String, Object> parameters) throws HyracksDataException {
this.isTransaction = isTransaction;
// Create the appropriate target
+ LSMComponentFileReferences componentFileRefs;
if (isTransaction) {
- component = createTransactionTarget();
+ try {
+ componentFileRefs = fileManager.getNewTransactionFileReference();
+ } catch (IOException e) {
+ throw HyracksDataException.create(e);
+ }
+ component =
+ createDiskComponent(bulkLoadComponentFactory, componentFileRefs.getInsertIndexFileReference(),
+ componentFileRefs.getDeleteIndexFileReference(),
+ componentFileRefs.getBloomFilterFileReference(), true);
} else {
- component = createBulkLoadTarget();
+ componentFileRefs = fileManager.getRelFlushFileReference();
+ component =
+ createDiskComponent(bulkLoadComponentFactory, componentFileRefs.getInsertIndexFileReference(),
+ componentFileRefs.getDeleteIndexFileReference(),
+ componentFileRefs.getBloomFilterFileReference(), true);
}
- LoadOperation loadOp = new LoadOperation(ioOpCallback, getIndexIdentifier());
+ loadOp = new LoadOperation(componentFileRefs, ioOpCallback, getIndexIdentifier(), parameters);
+ loadOp.setNewComponent(component);
+ ioOpCallback.scheduled(loadOp);
+ ioOpCallback.beforeOperation(loadOp);
componentBulkLoader =
component.createBulkLoader(loadOp, fillFactor, verifyInput, numElementsHint, false, true, false);
}
@@ -519,16 +537,23 @@
@Override
public void end() throws HyracksDataException {
- componentBulkLoader.end();
- if (component.getComponentSize() > 0) {
- if (isTransaction) {
- // Since this is a transaction component, validate and
- // deactivate. it could later be added or deleted
- component.markAsValid(durable);
- component.deactivate();
- } else {
- getHarness().addBulkLoadedComponent(component);
+ try {
+ ioOpCallback.afterOperation(loadOp);
+ componentBulkLoader.end();
+ if (component.getComponentSize() > 0) {
+ if (isTransaction) {
+ // Since this is a transaction component, validate and
+ // deactivate. it could later be added or deleted
+ component.markAsValid(durable);
+ ioOpCallback.afterFinalize(loadOp);
+ component.deactivate();
+ } else {
+ ioOpCallback.afterFinalize(loadOp);
+ getHarness().addBulkLoadedComponent(component);
+ }
}
+ } finally {
+ ioOpCallback.completed(loadOp);
}
}
@@ -538,26 +563,17 @@
}
@Override
- public void abort() {
+ public void abort() throws HyracksDataException {
try {
- componentBulkLoader.abort();
- } catch (Exception e) {
+ try {
+ componentBulkLoader.abort();
+ } finally {
+ ioOpCallback.afterFinalize(loadOp);
+ }
+ } finally {
+ ioOpCallback.completed(loadOp);
}
}
-
- // This method is used to create a target for a bulk modify operation. This
- // component must then eventually be either committed or deleted
- private ILSMDiskComponent createTransactionTarget() throws HyracksDataException {
- LSMComponentFileReferences componentFileRefs;
- try {
- componentFileRefs = fileManager.getNewTransactionFileReference();
- } catch (IOException e) {
- throw HyracksDataException.create(e);
- }
- return createDiskComponent(bulkLoadComponentFactory, componentFileRefs.getInsertIndexFileReference(),
- componentFileRefs.getDeleteIndexFileReference(), componentFileRefs.getBloomFilterFileReference(),
- true);
- }
}
@Override
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeFlushOperation.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeFlushOperation.java
index 7acc59f..2ad4fee 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeFlushOperation.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeFlushOperation.java
@@ -26,11 +26,13 @@
public class LSMBTreeFlushOperation extends FlushOperation {
private final FileReference bloomFilterFlushTarget;
+ private final LSMComponentFileReferences fileReferences;
public LSMBTreeFlushOperation(ILSMIndexAccessor accessor, FileReference flushTarget,
FileReference bloomFilterFlushTarget, ILSMIOOperationCallback callback, String indexIdentifier) {
super(accessor, flushTarget, callback, indexIdentifier);
this.bloomFilterFlushTarget = bloomFilterFlushTarget;
+ fileReferences = new LSMComponentFileReferences(target, null, bloomFilterFlushTarget);
}
public FileReference getBloomFilterTarget() {
@@ -39,6 +41,6 @@
@Override
public LSMComponentFileReferences getComponentFiles() {
- return new LSMComponentFileReferences(target, null, bloomFilterFlushTarget);
+ return fileReferences;
}
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMComponent.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMComponent.java
index 1bfea53..fef59e7 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMComponent.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMComponent.java
@@ -148,4 +148,9 @@
* @throws HyracksDataException
*/
void schedule(LSMIOOperationType ioOperationType) throws HyracksDataException;
+
+ /**
+ * @return the number of readers inside a component
+ */
+ int getReaderCount();
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMComponentId.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMComponentId.java
index 5662862..c3835eb 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMComponentId.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMComponentId.java
@@ -38,4 +38,14 @@
boolean missing();
IdCompareResult compareTo(ILSMComponentId id);
+
+ /**
+ * @return the min Id
+ */
+ long getMinId();
+
+ /**
+ * @return the max Id
+ */
+ long getMaxId();
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMHarness.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMHarness.java
index 8a041ec..c4a0352 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMHarness.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMHarness.java
@@ -169,13 +169,11 @@
* the operation context
* @param diskComponents
* the disk component to be replicated
- * @param bulkload
- * true if the components were bulk loaded, false otherwise
* @param opType
* The operation type
* @throws HyracksDataException
*/
- void scheduleReplication(ILSMIndexOperationContext ctx, List<ILSMDiskComponent> diskComponents, boolean bulkload,
+ void scheduleReplication(ILSMIndexOperationContext ctx, List<ILSMDiskComponent> diskComponents,
LSMOperationType opType) throws HyracksDataException;
/**
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIOOperation.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIOOperation.java
index 88dcc5a..3245455 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIOOperation.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIOOperation.java
@@ -18,6 +18,7 @@
*/
package org.apache.hyracks.storage.am.lsm.common.api;
+import java.util.Map;
import java.util.concurrent.Callable;
import org.apache.hyracks.api.exceptions.HyracksDataException;
@@ -135,4 +136,16 @@
* @throws InterruptedException
*/
void sync() throws InterruptedException;
+
+ /**
+ * Add a listener for operation complete event
+ *
+ * @param listener
+ */
+ void addCompleteListener(IoOperationCompleteListener listener);
+
+ /**
+ * Get parameters passed when calling this IO operation
+ */
+ Map<String, Object> getParameters();
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIndex.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIndex.java
index 1374524..5cb05a7 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIndex.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIndex.java
@@ -20,6 +20,7 @@
package org.apache.hyracks.storage.am.lsm.common.api;
import java.util.List;
+import java.util.Map;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.replication.IReplicationJob.ReplicationOperation;
@@ -28,6 +29,7 @@
import org.apache.hyracks.storage.am.lsm.common.impls.LSMHarness;
import org.apache.hyracks.storage.common.IIndex;
import org.apache.hyracks.storage.common.IIndexAccessParameters;
+import org.apache.hyracks.storage.common.IIndexBulkLoader;
import org.apache.hyracks.storage.common.IIndexCursor;
import org.apache.hyracks.storage.common.ISearchPredicate;
@@ -128,7 +130,7 @@
boolean isCurrentMutableComponentEmpty() throws HyracksDataException;
- void scheduleReplication(ILSMIndexOperationContext ctx, List<ILSMDiskComponent> diskComponents, boolean bulkload,
+ void scheduleReplication(ILSMIndexOperationContext ctx, List<ILSMDiskComponent> diskComponents,
ReplicationOperation operation, LSMOperationType opType) throws HyracksDataException;
boolean isMemoryComponentsAllocated();
@@ -185,4 +187,28 @@
*/
void cleanUpFilesForFailedOperation(ILSMIOOperation operation);
+ /**
+ * @return the absolute path of the index
+ */
+ String getIndexIdentifier();
+
+ /**
+ * Create a bulk loader
+ *
+ * @param fillFactor
+ * @param verifyInput
+ * @param numElementsHint
+ * @param checkIfEmptyIndex
+ * @param parameters
+ * @return
+ * @throws HyracksDataException
+ */
+ IIndexBulkLoader createBulkLoader(float fillFactor, boolean verifyInput, long numElementsHint,
+ boolean checkIfEmptyIndex, Map<String, Object> parameters) throws HyracksDataException;
+
+ /**
+ * Reset the current memory component id to 0.
+ */
+ void resetCurrentComponentIndex();
+
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIndexAccessor.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIndexAccessor.java
index 34844d6..42d3ab7 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIndexAccessor.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIndexAccessor.java
@@ -186,13 +186,11 @@
*
* @param diskComponents
* the components to be replicated
- * @param bulkload
- * true if the components were bulkloaded, false otherwise
* @param opType
* the operation type
* @throws HyracksDataException
*/
- void scheduleReplication(List<ILSMDiskComponent> diskComponents, boolean bulkload, LSMOperationType opType)
+ void scheduleReplication(List<ILSMDiskComponent> diskComponents, LSMOperationType opType)
throws HyracksDataException;
/**
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ITwoPCIndex.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ITwoPCIndex.java
index fc1d5f4..74a5d60 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ITwoPCIndex.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ITwoPCIndex.java
@@ -19,6 +19,7 @@
package org.apache.hyracks.storage.am.lsm.common.api;
import java.util.List;
+import java.util.Map;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.storage.common.IIndexBulkLoader;
@@ -30,8 +31,8 @@
* This function is used to create a BulkLoader for a transaction that is capable of insertions and deletions
* and the bulk loaded component is hidden from the index
*/
- public IIndexBulkLoader createTransactionBulkLoader(float fillLevel, boolean verifyInput, long numElementsHint)
- throws HyracksDataException;
+ public IIndexBulkLoader createTransactionBulkLoader(float fillLevel, boolean verifyInput, long numElementsHint,
+ Map<String, Object> parameters) throws HyracksDataException;
/**
* This function is used to commit the previous transaction if it was resulted in creating any components
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/IoOperationCompleteListener.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/IoOperationCompleteListener.java
new file mode 100644
index 0000000..e5ba81e
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/IoOperationCompleteListener.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.storage.am.lsm.common.api;
+
+public interface IoOperationCompleteListener {
+
+ /**
+ * Called when an IO operation completes
+ *
+ * @param operation
+ */
+ void completed(ILSMIOOperation operation);
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/LSMOperationType.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/LSMOperationType.java
index 63d2697..e200bfd 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/LSMOperationType.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/LSMOperationType.java
@@ -22,6 +22,7 @@
SEARCH,
MODIFICATION,
FORCE_MODIFICATION,
+ LOAD,
FLUSH,
MERGE,
REPLICATE,
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractIoOperation.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractIoOperation.java
index b3252d9..fc9a362 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractIoOperation.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractIoOperation.java
@@ -18,6 +18,10 @@
*/
package org.apache.hyracks.storage.am.lsm.common.impls;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
import org.apache.hyracks.api.io.FileReference;
import org.apache.hyracks.api.io.IODeviceHandle;
import org.apache.hyracks.api.util.ExceptionUtils;
@@ -25,6 +29,7 @@
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
+import org.apache.hyracks.storage.am.lsm.common.api.IoOperationCompleteListener;
public abstract class AbstractIoOperation implements ILSMIOOperation {
@@ -36,6 +41,7 @@
private LSMIOOperationStatus status = LSMIOOperationStatus.SUCCESS;
private ILSMDiskComponent newComponent;
private boolean completed = false;
+ private List<IoOperationCompleteListener> completeListeners;
public AbstractIoOperation(ILSMIndexAccessor accessor, FileReference target, ILSMIOOperationCallback callback,
String indexIdentifier) {
@@ -77,6 +83,7 @@
@Override
public void setFailure(Throwable failure) {
+ status = LSMIOOperationStatus.FAILURE;
this.failure = ExceptionUtils.suppress(this.failure, failure);
}
@@ -107,6 +114,12 @@
}
callback.completed(this);
completed = true;
+ if (completeListeners != null) {
+ for (IoOperationCompleteListener listener : completeListeners) {
+ listener.completed(this);
+ }
+ completeListeners = null;
+ }
notifyAll();
}
@@ -116,4 +129,21 @@
wait();
}
}
+
+ @Override
+ public Map<String, Object> getParameters() {
+ return accessor.getOpContext().getParameters();
+ }
+
+ @Override
+ public synchronized void addCompleteListener(IoOperationCompleteListener listener) {
+ if (completed) {
+ listener.completed(this);
+ } else {
+ if (completeListeners == null) {
+ completeListeners = new LinkedList<>();
+ }
+ completeListeners.add(listener);
+ }
+ }
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMComponent.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMComponent.java
index 84d2fe5..574a371 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMComponent.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMComponent.java
@@ -51,4 +51,9 @@
public final AbstractLSMIndex getLsmIndex() {
return lsmIndex;
}
+
+ @Override
+ public int getReaderCount() {
+ return readerCount;
+ }
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java
index a999b25..c9fb328 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java
@@ -385,7 +385,8 @@
List<ILSMDiskComponent> mergingComponents = ctx.getComponentsToBeMerged();
// Merge operation can fail if another merge is already scheduled on those components
// This should be guarded against by the merge policy but we still protect against here
- if (isDeactivating || mergingComponents.size() < 2 && ctx.getOperation() != IndexOperation.DELETE_COMPONENTS) {
+ if (isDeactivating
+ || (mergingComponents.size() < 2 && ctx.getOperation() != IndexOperation.DELETE_COMPONENTS)) {
return NoOpIoOperation.INSTANCE;
}
for (int i = 0; i < mergingComponents.size(); i++) {
@@ -398,7 +399,7 @@
mergeCtx.setOperation(ctx.getOperation());
mergeCtx.getComponentHolder().addAll(mergingComponents);
propagateMap(ctx, mergeCtx);
- mergingComponents.stream().map(ILSMDiskComponent.class::cast).forEach(mergeCtx.getComponentsToBeMerged()::add);
+ mergingComponents.stream().forEach(mergeCtx.getComponentsToBeMerged()::add);
ILSMDiskComponent firstComponent = mergingComponents.get(0);
ILSMDiskComponent lastComponent = mergingComponents.get(mergingComponents.size() - 1);
LSMComponentFileReferences mergeFileRefs = getMergeFileReferences(firstComponent, lastComponent);
@@ -494,16 +495,27 @@
@Override
public final IIndexBulkLoader createBulkLoader(float fillLevel, boolean verifyInput, long numElementsHint,
boolean checkIfEmptyIndex) throws HyracksDataException {
+ return createBulkLoader(fillLevel, verifyInput, numElementsHint, checkIfEmptyIndex, null);
+ }
+
+ @Override
+ public IIndexBulkLoader createBulkLoader(float fillFactor, boolean verifyInput, long numElementsHint,
+ boolean checkIfEmptyIndex, Map<String, Object> parameters) throws HyracksDataException {
if (checkIfEmptyIndex && !isEmptyIndex()) {
throw HyracksDataException.create(ErrorCode.LOAD_NON_EMPTY_INDEX);
}
- return createBulkLoader(fillLevel, verifyInput, numElementsHint);
+ return createBulkLoader(fillFactor, verifyInput, numElementsHint, parameters);
}
- public IIndexBulkLoader createBulkLoader(float fillLevel, boolean verifyInput, long numElementsHint)
- throws HyracksDataException {
+ public IIndexBulkLoader createBulkLoader(float fillLevel, boolean verifyInput, long numElementsHint,
+ Map<String, Object> parameters) throws HyracksDataException {
AbstractLSMIndexOperationContext opCtx = createOpContext(NoOpIndexAccessParameters.INSTANCE);
- LoadOperation loadOp = new LoadOperation(ioOpCallback, getIndexIdentifier());
+ opCtx.setParameters(parameters);
+ LSMComponentFileReferences componentFileRefs = fileManager.getRelFlushFileReference();
+ LoadOperation loadOp = new LoadOperation(componentFileRefs, ioOpCallback, getIndexIdentifier(), parameters);
+ loadOp.setNewComponent(createDiskComponent(bulkLoadComponentFactory,
+ componentFileRefs.getInsertIndexFileReference(), componentFileRefs.getDeleteIndexFileReference(),
+ componentFileRefs.getBloomFilterFileReference(), true));
ioOpCallback.scheduled(loadOp);
opCtx.setIoOperation(loadOp);
return new LSMIndexDiskComponentBulkLoader(this, opCtx, fillLevel, verifyInput, numElementsHint);
@@ -679,7 +691,7 @@
@Override
public void scheduleReplication(ILSMIndexOperationContext ctx, List<ILSMDiskComponent> lsmComponents,
- boolean bulkload, ReplicationOperation operation, LSMOperationType opType) throws HyracksDataException {
+ ReplicationOperation operation, LSMOperationType opType) throws HyracksDataException {
//get set of files to be replicated for this component
Set<String> componentFiles = new HashSet<>();
@@ -689,7 +701,7 @@
}
ReplicationExecutionType executionType;
- if (bulkload) {
+ if (opType == LSMOperationType.LOAD) {
executionType = ReplicationExecutionType.SYNC;
} else {
executionType = ReplicationExecutionType.ASYNC;
@@ -796,6 +808,30 @@
}
@Override
+ public void resetCurrentComponentIndex() {
+ synchronized (lsmHarness.getOperationTracker()) {
+ // validate no reader in any of the memory components and that all of them are INVALID
+ for (ILSMMemoryComponent c : memoryComponents) {
+ if (c.getReaderCount() > 0) {
+ throw new IllegalStateException(
+ "Attempt to reset current component index while readers are inside the components. " + c);
+ }
+ if (c.getState() != ComponentState.INACTIVE) {
+ throw new IllegalStateException(
+ "Attempt to reset current component index while a component is not INACTIVE. " + c);
+ }
+ }
+ currentMutableComponentId.set(0);
+ memoryComponents.get(0);
+ try {
+ memoryComponents.get(0).resetId(null, true);
+ } catch (HyracksDataException e) {
+ throw new IllegalStateException(e);
+ }
+ }
+ }
+
+ @Override
public final ILSMDiskComponent flush(ILSMIOOperation operation) throws HyracksDataException {
ILSMIndexAccessor accessor = operation.getAccessor();
ILSMIndexOperationContext opCtx = accessor.getOpContext();
@@ -840,7 +876,8 @@
}
}
- protected String getIndexIdentifier() {
+ @Override
+ public String getIndexIdentifier() {
return fileManager.getBaseDir().getAbsolutePath();
}
@@ -864,5 +901,4 @@
protected abstract ILSMDiskComponent doFlush(ILSMIOOperation operation) throws HyracksDataException;
protected abstract ILSMDiskComponent doMerge(ILSMIOOperation operation) throws HyracksDataException;
-
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMMemoryComponent.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMMemoryComponent.java
index 9ee1ff8..ab00dff 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMMemoryComponent.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMMemoryComponent.java
@@ -339,7 +339,9 @@
LOGGER.log(Level.INFO, "Component Id was reset from " + this.componentId + " to " + componentId);
}
this.componentId = componentId;
- LSMComponentIdUtils.persist(this.componentId, metadata);
+ if (componentId != null) {
+ LSMComponentIdUtils.persist(this.componentId, metadata);
+ }
}
@Override
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/ChainedLSMDiskComponentBulkLoader.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/ChainedLSMDiskComponentBulkLoader.java
index 232b42c..6e0606a 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/ChainedLSMDiskComponentBulkLoader.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/ChainedLSMDiskComponentBulkLoader.java
@@ -25,8 +25,9 @@
import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponentBulkLoader;
-import org.apache.hyracks.util.annotations.CriticalPath;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation.LSMIOOperationStatus;
+import org.apache.hyracks.util.annotations.CriticalPath;
/**
* Class encapsulates a chain of operations, happening during an LSM disk component bulkload
@@ -51,6 +52,7 @@
bulkloaderChain.add(bulkloader);
}
+ @SuppressWarnings("squid:S1181")
@Override
@CriticalPath
public void add(ITupleReference tuple) throws HyracksDataException {
@@ -60,7 +62,8 @@
for (int i = 0; i < bulkloadersCount; i++) {
t = bulkloaderChain.get(i).add(t);
}
- } catch (Exception e) {
+ } catch (Throwable e) {
+ operation.setFailure(e);
cleanupArtifacts();
throw e;
}
@@ -69,6 +72,7 @@
}
}
+ @SuppressWarnings("squid:S1181")
@Override
@CriticalPath
public void delete(ITupleReference tuple) throws HyracksDataException {
@@ -78,7 +82,8 @@
for (int i = 0; i < bulkloadersCount; i++) {
t = bulkloaderChain.get(i).delete(t);
}
- } catch (Exception e) {
+ } catch (Throwable e) {
+ operation.setFailure(e);
cleanupArtifacts();
throw e;
}
@@ -112,6 +117,7 @@
@Override
public void abort() throws HyracksDataException {
+ operation.setStatus(LSMIOOperationStatus.FAILURE);
for (IChainedComponentBulkLoader lsmOperation : bulkloaderChain) {
lsmOperation.abort();
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/EmptyComponent.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/EmptyComponent.java
index ced04e7..4c2ddb6 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/EmptyComponent.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/EmptyComponent.java
@@ -86,7 +86,7 @@
@Override
public ILSMComponentId getId() {
- return LSMComponentId.MISSING_COMPONENT_ID;
+ return LSMComponentId.EMPTY_INDEX_LAST_COMPONENT_ID;
}
@Override
@@ -156,4 +156,9 @@
public void schedule(LSMIOOperationType ioOperationType) throws HyracksDataException {
// Do nothing
}
+
+ @Override
+ public int getReaderCount() {
+ return 0;
+ }
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/ExternalIndexHarness.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/ExternalIndexHarness.java
index 1ca91b3..ab70ba1 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/ExternalIndexHarness.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/ExternalIndexHarness.java
@@ -122,7 +122,7 @@
if (replicationEnabled) {
componentsToBeReplicated.clear();
componentsToBeReplicated.add((ILSMDiskComponent) c);
- lsmIndex.scheduleReplication(null, componentsToBeReplicated, false,
+ lsmIndex.scheduleReplication(null, componentsToBeReplicated,
ReplicationOperation.DELETE, opType);
}
((ILSMDiskComponent) c).deactivateAndDestroy();
@@ -142,7 +142,7 @@
if (replicationEnabled) {
componentsToBeReplicated.clear();
componentsToBeReplicated.add(newComponent);
- triggerReplication(componentsToBeReplicated, false, opType);
+ triggerReplication(componentsToBeReplicated, opType);
}
mergePolicy.diskComponentAdded(lsmIndex, fullMergeIsRequested.get());
}
@@ -200,7 +200,7 @@
if (replicationEnabled) {
componentsToBeReplicated.clear();
componentsToBeReplicated.add(c);
- triggerReplication(componentsToBeReplicated, true, LSMOperationType.MERGE);
+ triggerReplication(componentsToBeReplicated, LSMOperationType.LOAD);
}
// Enter the component
enterComponent(c);
@@ -292,7 +292,7 @@
if (replicationEnabled) {
componentsToBeReplicated.clear();
componentsToBeReplicated.add(diskComponent);
- lsmIndex.scheduleReplication(null, componentsToBeReplicated, false, ReplicationOperation.DELETE, null);
+ lsmIndex.scheduleReplication(null, componentsToBeReplicated, ReplicationOperation.DELETE, null);
}
diskComponent.deactivateAndDestroy();
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/FlushOperation.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/FlushOperation.java
index b61355f..b2a2e48 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/FlushOperation.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/FlushOperation.java
@@ -23,11 +23,10 @@
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.io.FileReference;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent;
-import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
-public abstract class FlushOperation extends AbstractIoOperation implements Comparable<ILSMIOOperation> {
+public abstract class FlushOperation extends AbstractIoOperation {
public FlushOperation(ILSMIndexAccessor accessor, FileReference target, ILSMIOOperationCallback callback,
String indexIdentifier) {
@@ -65,14 +64,6 @@
}
@Override
- public int compareTo(ILSMIOOperation o) {
- if (o instanceof FlushOperation) {
- return target.getFile().getName().compareTo(((FlushOperation) o).getTarget().getFile().getName());
- }
- return -1;
- }
-
- @Override
public boolean equals(Object o) {
return (o instanceof FlushOperation)
&& Objects.equals(target.getFile().getName(), ((FlushOperation) o).target.getFile().getName());
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMComponentId.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMComponentId.java
index 442af56..c7990a6 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMComponentId.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMComponentId.java
@@ -25,11 +25,11 @@
public static final long NOT_FOUND = -1;
- // Use to handle legacy datasets which do not have the component Id
- public static final ILSMComponentId MISSING_COMPONENT_ID = new LSMComponentId(NOT_FOUND, NOT_FOUND);
+ // Used to represent an empty index with no components
+ public static final LSMComponentId EMPTY_INDEX_LAST_COMPONENT_ID = new LSMComponentId(NOT_FOUND, NOT_FOUND);
// A default component id used for bulk loaded component
- public static final ILSMComponentId DEFAULT_COMPONENT_ID = new LSMComponentId(0, 0);
+ public static final LSMComponentId DEFAULT_COMPONENT_ID = new LSMComponentId(0, 0);
private long minId;
@@ -46,10 +46,12 @@
this.maxId = maxId;
}
+ @Override
public long getMinId() {
return this.minId;
}
+ @Override
public long getMaxId() {
return this.maxId;
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java
index 53e6a57..7a87a13 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java
@@ -247,7 +247,7 @@
try {
//schedule a replication job to delete these inactive disk components from replicas
if (replicationEnabled) {
- lsmIndex.scheduleReplication(null, inactiveDiskComponentsToBeDeleted, false,
+ lsmIndex.scheduleReplication(null, inactiveDiskComponentsToBeDeleted,
ReplicationOperation.DELETE, opType);
}
for (ILSMDiskComponent c : inactiveDiskComponentsToBeDeleted) {
@@ -271,10 +271,12 @@
// newComponent is null if the flush op. was not performed.
if (!failedOperation && newComponent != null) {
lsmIndex.addDiskComponent(newComponent);
+ // TODO: The following should also replicate component Id
+ // even if empty component
if (replicationEnabled && newComponent != EmptyComponent.INSTANCE) {
componentsToBeReplicated.clear();
componentsToBeReplicated.add(newComponent);
- triggerReplication(componentsToBeReplicated, false, opType);
+ triggerReplication(componentsToBeReplicated, opType);
}
mergePolicy.diskComponentAdded(lsmIndex, false);
}
@@ -286,7 +288,7 @@
if (replicationEnabled && newComponent != EmptyComponent.INSTANCE) {
componentsToBeReplicated.clear();
componentsToBeReplicated.add(newComponent);
- triggerReplication(componentsToBeReplicated, false, opType);
+ triggerReplication(componentsToBeReplicated, opType);
}
mergePolicy.diskComponentAdded(lsmIndex, fullMergeIsRequested.get());
}
@@ -619,7 +621,7 @@
if (replicationEnabled) {
componentsToBeReplicated.clear();
componentsToBeReplicated.add(c);
- triggerReplication(componentsToBeReplicated, true, LSMOperationType.MERGE);
+ triggerReplication(componentsToBeReplicated, LSMOperationType.LOAD);
}
mergePolicy.diskComponentAdded(lsmIndex, false);
}
@@ -630,20 +632,20 @@
return opTracker;
}
- protected void triggerReplication(List<ILSMDiskComponent> lsmComponents, boolean bulkload, LSMOperationType opType)
+ protected void triggerReplication(List<ILSMDiskComponent> lsmComponents, LSMOperationType opType)
throws HyracksDataException {
ILSMIndexAccessor accessor = lsmIndex.createAccessor(NoOpIndexAccessParameters.INSTANCE);
- accessor.scheduleReplication(lsmComponents, bulkload, opType);
+ accessor.scheduleReplication(lsmComponents, opType);
}
@Override
public void scheduleReplication(ILSMIndexOperationContext ctx, List<ILSMDiskComponent> lsmComponents,
- boolean bulkload, LSMOperationType opType) throws HyracksDataException {
+ LSMOperationType opType) throws HyracksDataException {
//enter the LSM components to be replicated to prevent them from being deleted until they are replicated
if (!getAndEnterComponents(ctx, LSMOperationType.REPLICATE, false)) {
return;
}
- lsmIndex.scheduleReplication(ctx, lsmComponents, bulkload, ReplicationOperation.REPLICATE, opType);
+ lsmIndex.scheduleReplication(ctx, lsmComponents, ReplicationOperation.REPLICATE, opType);
}
@Override
@@ -740,7 +742,6 @@
ILSMIOOperation ioOperation = null;
synchronized (opTracker) {
waitForFlushesAndMerges();
- ensureNoFailedFlush();
// We always start with the memory component
ILSMMemoryComponent memComponent = lsmIndex.getCurrentMemoryComponent();
deleteMemoryComponent = predicate.test(memComponent);
@@ -769,9 +770,6 @@
List<ILSMDiskComponent> toBeDeleted;
synchronized (opTracker) {
waitForFlushesAndMerges();
- // Ensure that current memory component is empty and that no failed flushes happened so far
- // This is a workaround until ASTERIXDB-2106 is fixed
- ensureNoFailedFlush();
List<ILSMDiskComponent> diskComponents = lsmIndex.getDiskComponents();
for (ILSMDiskComponent component : diskComponents) {
if (predicate.test(component)) {
@@ -804,21 +802,6 @@
}
}
- /**
- * This can only be called in the steady state where:
- * 1. no scheduled flushes
- * 2. no incoming data
- *
- * @throws HyracksDataException
- */
- private void ensureNoFailedFlush() throws HyracksDataException {
- for (ILSMMemoryComponent memoryComponent : lsmIndex.getMemoryComponents()) {
- if (memoryComponent.getState() == ComponentState.READABLE_UNWRITABLE) {
- throw HyracksDataException.create(ErrorCode.A_FLUSH_OPERATION_HAS_FAILED);
- }
- }
- }
-
private void waitForFlushesAndMerges() throws HyracksDataException {
while (flushingOrMerging()) {
try {
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMIndexDiskComponentBulkLoader.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMIndexDiskComponentBulkLoader.java
index 416106d..10074f9 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMIndexDiskComponentBulkLoader.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMIndexDiskComponentBulkLoader.java
@@ -35,33 +35,41 @@
boolean verifyInput, long numElementsHint) throws HyracksDataException {
this.lsmIndex = lsmIndex;
this.opCtx = opCtx;
- opCtx.getIoOperation().setNewComponent(lsmIndex.createBulkLoadTarget());
this.componentBulkLoader = opCtx.getIoOperation().getNewComponent().createBulkLoader(opCtx.getIoOperation(),
fillFactor, verifyInput, numElementsHint, false, true, true);
- lsmIndex.getIOOperationCallback().beforeOperation(opCtx.getIoOperation());
}
public ILSMDiskComponent getComponent() {
return opCtx.getIoOperation().getNewComponent();
}
+ @SuppressWarnings("squid:S1181")
@Override
public void add(ITupleReference tuple) throws HyracksDataException {
- componentBulkLoader.add(tuple);
+ try {
+ componentBulkLoader.add(tuple);
+ } catch (Throwable th) {
+ opCtx.getIoOperation().setFailure(th);
+ throw th;
+ }
}
+ @SuppressWarnings("squid:S1181")
public void delete(ITupleReference tuple) throws HyracksDataException {
- componentBulkLoader.delete(tuple);
+ try {
+ componentBulkLoader.delete(tuple);
+ } catch (Throwable th) {
+ opCtx.getIoOperation().setFailure(th);
+ throw th;
+ }
}
@Override
public void end() throws HyracksDataException {
try {
try {
+ lsmIndex.getIOOperationCallback().afterOperation(opCtx.getIoOperation());
componentBulkLoader.end();
- if (opCtx.getIoOperation().getNewComponent().getComponentSize() > 0) {
- lsmIndex.getIOOperationCallback().afterOperation(opCtx.getIoOperation());
- }
} catch (Throwable th) { // NOSONAR Must not call afterFinalize without setting failure
opCtx.getIoOperation().setStatus(LSMIOOperationStatus.FAILURE);
opCtx.getIoOperation().setFailure(th);
@@ -69,7 +77,8 @@
} finally {
lsmIndex.getIOOperationCallback().afterFinalize(opCtx.getIoOperation());
}
- if (opCtx.getIoOperation().getNewComponent().getComponentSize() > 0) {
+ if (opCtx.getIoOperation().getStatus() == LSMIOOperationStatus.SUCCESS
+ && opCtx.getIoOperation().getNewComponent().getComponentSize() > 0) {
lsmIndex.getHarness().addBulkLoadedComponent(opCtx.getIoOperation().getNewComponent());
}
} finally {
@@ -80,7 +89,6 @@
@Override
public void abort() throws HyracksDataException {
opCtx.getIoOperation().setStatus(LSMIOOperationStatus.FAILURE);
- opCtx.getIoOperation().setNewComponent(null);
try {
try {
componentBulkLoader.abort();
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMTreeIndexAccessor.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMTreeIndexAccessor.java
index f2c8d35..8412b8c 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMTreeIndexAccessor.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMTreeIndexAccessor.java
@@ -148,12 +148,12 @@
}
@Override
- public void scheduleReplication(List<ILSMDiskComponent> lsmComponents, boolean bulkload, LSMOperationType opType)
+ public void scheduleReplication(List<ILSMDiskComponent> lsmComponents, LSMOperationType opType)
throws HyracksDataException {
ctx.setOperation(IndexOperation.REPLICATE);
ctx.getComponentsToBeReplicated().clear();
ctx.getComponentsToBeReplicated().addAll(lsmComponents);
- lsmHarness.scheduleReplication(ctx, lsmComponents, bulkload, opType);
+ lsmHarness.scheduleReplication(ctx, lsmComponents, opType);
}
@Override
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LoadOperation.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LoadOperation.java
index 3d63a6a..21c52d0 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LoadOperation.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LoadOperation.java
@@ -18,13 +18,21 @@
*/
package org.apache.hyracks.storage.am.lsm.common.impls;
+import java.util.Map;
+
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback;
public class LoadOperation extends AbstractIoOperation {
- public LoadOperation(ILSMIOOperationCallback callback, String indexIdentifier) {
- super(null, null, callback, indexIdentifier);
+ private final LSMComponentFileReferences fileReferences;
+ private final Map<String, Object> parameters;
+
+ public LoadOperation(LSMComponentFileReferences fileReferences, ILSMIOOperationCallback callback,
+ String indexIdentifier, Map<String, Object> parameters) {
+ super(null, fileReferences.getInsertIndexFileReference(), callback, indexIdentifier);
+ this.fileReferences = fileReferences;
+ this.parameters = parameters;
}
@Override
@@ -39,11 +47,16 @@
@Override
public LSMComponentFileReferences getComponentFiles() {
- return null;
+ return fileReferences;
}
@Override
public void sync() {
throw new UnsupportedOperationException();
}
+
+ @Override
+ public Map<String, Object> getParameters() {
+ return parameters;
+ }
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/NoOpIoOperation.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/NoOpIoOperation.java
index 6d7e7ec..f57c4ef 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/NoOpIoOperation.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/NoOpIoOperation.java
@@ -19,6 +19,8 @@
package org.apache.hyracks.storage.am.lsm.common.impls;
+import java.util.Map;
+
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.io.FileReference;
import org.apache.hyracks.api.io.IODeviceHandle;
@@ -26,6 +28,7 @@
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
+import org.apache.hyracks.storage.am.lsm.common.api.IoOperationCompleteListener;
public class NoOpIoOperation implements ILSMIOOperation {
public static final NoOpIoOperation INSTANCE = new NoOpIoOperation();
@@ -113,4 +116,14 @@
// No Op
}
+ @Override
+ public void addCompleteListener(IoOperationCompleteListener listener) {
+ listener.completed(this);
+ }
+
+ @Override
+ public Map<String, Object> getParameters() {
+ return null;
+ }
+
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/TracedIOOperation.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/TracedIOOperation.java
index 5ada349..f1172f3 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/TracedIOOperation.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/TracedIOOperation.java
@@ -19,6 +19,8 @@
package org.apache.hyracks.storage.am.lsm.common.impls;
+import java.util.Map;
+
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.io.FileReference;
import org.apache.hyracks.api.io.IODeviceHandle;
@@ -26,6 +28,7 @@
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
+import org.apache.hyracks.storage.am.lsm.common.api.IoOperationCompleteListener;
import org.apache.hyracks.util.trace.ITracer;
import org.apache.hyracks.util.trace.ITracer.Scope;
import org.apache.hyracks.util.trace.TraceUtils;
@@ -54,10 +57,7 @@
if (tracer.isEnabled(traceCategory)) {
tracer.instant("schedule-" + ioOpName, traceCategory, Scope.p,
"{\"path\": \"" + ioOp.getTarget().getRelativePath() + "\"}");
- }
- if (tracer.isEnabled(traceCategory)) {
- return ioOp instanceof Comparable ? new ComparableTracedIOOperation(ioOp, tracer, traceCategory)
- : new TracedIOOperation(ioOp, tracer, traceCategory);
+ return new TracedIOOperation(ioOp, tracer, traceCategory);
}
return ioOp;
}
@@ -152,33 +152,14 @@
public void sync() throws InterruptedException {
ioOp.sync();
}
+
+ @Override
+ public void addCompleteListener(IoOperationCompleteListener listener) {
+ ioOp.addCompleteListener(listener);
+ }
+
+ @Override
+ public Map<String, Object> getParameters() {
+ return ioOp.getParameters();
+ }
}
-
-class ComparableTracedIOOperation extends TracedIOOperation implements Comparable<ILSMIOOperation> {
-
- protected ComparableTracedIOOperation(ILSMIOOperation ioOp, ITracer trace, long traceCategory) {
- super(ioOp, trace, traceCategory);
- }
-
- @Override
- public int hashCode() {
- return this.ioOp.hashCode();
- }
-
- @Override
- public boolean equals(Object other) {
- return other instanceof ILSMIOOperation && compareTo((ILSMIOOperation) other) == 0;
- }
-
- @Override
- public int compareTo(ILSMIOOperation other) {
- final ILSMIOOperation myIoOp = this.ioOp;
- if (myIoOp instanceof Comparable && other instanceof ComparableTracedIOOperation) {
- return ((Comparable) myIoOp).compareTo(((ComparableTracedIOOperation) other).getIoOp());
- }
- LOGGER.warn("Comparing ioOps of type " + myIoOp.getClass().getSimpleName() + " and "
- + other.getClass().getSimpleName() + " in " + getClass().getSimpleName());
- return Integer.signum(hashCode() - other.hashCode());
- }
-
-}
\ No newline at end of file
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/util/LSMComponentIdUtils.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/util/LSMComponentIdUtils.java
index 6d4b0a7..1779527 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/util/LSMComponentIdUtils.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/util/LSMComponentIdUtils.java
@@ -25,8 +25,11 @@
import org.apache.hyracks.storage.am.lsm.common.api.IComponentMetadata;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentId;
import org.apache.hyracks.storage.am.lsm.common.impls.LSMComponentId;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
public class LSMComponentIdUtils {
+ private static final Logger LOGGER = LogManager.getLogger();
private static final MutableArrayValueReference COMPONENT_ID_MIN_KEY =
new MutableArrayValueReference("Component_Id_Min".getBytes());
@@ -43,7 +46,9 @@
long minId = ComponentUtils.getLong(metadata, COMPONENT_ID_MIN_KEY, LSMComponentId.NOT_FOUND, buffer);
long maxId = ComponentUtils.getLong(metadata, COMPONENT_ID_MAX_KEY, LSMComponentId.NOT_FOUND, buffer);
if (minId == LSMComponentId.NOT_FOUND || maxId == LSMComponentId.NOT_FOUND) {
- return LSMComponentId.MISSING_COMPONENT_ID;
+ LOGGER.warn("Invalid component id {} was persisted to a component metadata",
+ LSMComponentId.EMPTY_INDEX_LAST_COMPONENT_ID);
+ return LSMComponentId.EMPTY_INDEX_LAST_COMPONENT_ID;
} else {
return new LSMComponentId(minId, maxId);
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexAccessor.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexAccessor.java
index a6471f8..8e39b62 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexAccessor.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexAccessor.java
@@ -104,12 +104,12 @@
}
@Override
- public void scheduleReplication(List<ILSMDiskComponent> lsmComponents, boolean bulkload, LSMOperationType opType)
+ public void scheduleReplication(List<ILSMDiskComponent> lsmComponents, LSMOperationType opType)
throws HyracksDataException {
ctx.setOperation(IndexOperation.REPLICATE);
ctx.getComponentsToBeReplicated().clear();
ctx.getComponentsToBeReplicated().addAll(lsmComponents);
- lsmHarness.scheduleReplication(ctx, lsmComponents, bulkload, opType);
+ lsmHarness.scheduleReplication(ctx, lsmComponents, opType);
}
@Override
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/ExternalRTree.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/ExternalRTree.java
index f3162c9..f902153 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/ExternalRTree.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/ExternalRTree.java
@@ -21,6 +21,7 @@
import java.io.IOException;
import java.util.LinkedList;
import java.util.List;
+import java.util.Map;
import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
import org.apache.hyracks.api.dataflow.value.ILinearizeComparatorFactory;
@@ -30,7 +31,6 @@
import org.apache.hyracks.storage.am.bloomfilter.impls.BloomCalculations;
import org.apache.hyracks.storage.am.bloomfilter.impls.BloomFilterSpecification;
import org.apache.hyracks.storage.am.btree.impls.BTree;
-import org.apache.hyracks.storage.am.btree.impls.BTree.BTreeBulkLoader;
import org.apache.hyracks.storage.am.common.api.IIndexOperationContext;
import org.apache.hyracks.storage.am.common.api.ITreeIndexFrameFactory;
import org.apache.hyracks.storage.am.common.api.ITwoPCIndexBulkLoader;
@@ -48,8 +48,10 @@
import org.apache.hyracks.storage.am.lsm.common.api.ILSMMergePolicy;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMOperationTracker;
import org.apache.hyracks.storage.am.lsm.common.api.ITwoPCIndex;
+import org.apache.hyracks.storage.am.lsm.common.impls.ChainedLSMDiskComponentBulkLoader;
import org.apache.hyracks.storage.am.lsm.common.impls.ExternalIndexHarness;
import org.apache.hyracks.storage.am.lsm.common.impls.LSMComponentFileReferences;
+import org.apache.hyracks.storage.am.lsm.common.impls.LoadOperation;
import org.apache.hyracks.storage.am.rtree.impls.SearchPredicate;
import org.apache.hyracks.storage.common.IIndexAccessParameters;
import org.apache.hyracks.storage.common.IIndexBulkLoader;
@@ -433,132 +435,100 @@
// For initial load
@Override
- public IIndexBulkLoader createBulkLoader(float fillLevel, boolean verifyInput, long numElementsHint)
- throws HyracksDataException {
- return new LSMTwoPCRTreeBulkLoader(fillLevel, verifyInput, 0, false);
+ public IIndexBulkLoader createBulkLoader(float fillLevel, boolean verifyInput, long numElementsHint,
+ Map<String, Object> parameters) throws HyracksDataException {
+ return new LSMTwoPCRTreeBulkLoader(fillLevel, verifyInput, 0, false, parameters);
}
// For transaction bulk load <- could consolidate with the above method ->
@Override
- public IIndexBulkLoader createTransactionBulkLoader(float fillLevel, boolean verifyInput, long numElementsHint)
- throws HyracksDataException {
- return new LSMTwoPCRTreeBulkLoader(fillLevel, verifyInput, numElementsHint, true);
+ public IIndexBulkLoader createTransactionBulkLoader(float fillLevel, boolean verifyInput, long numElementsHint,
+ Map<String, Object> parameters) throws HyracksDataException {
+ return new LSMTwoPCRTreeBulkLoader(fillLevel, verifyInput, numElementsHint, true, parameters);
}
// The bulk loader used for both initial loading and transaction
// modifications
public class LSMTwoPCRTreeBulkLoader implements IIndexBulkLoader, ITwoPCIndexBulkLoader {
private final ILSMDiskComponent component;
- private final IIndexBulkLoader rtreeBulkLoader;
- private final BTreeBulkLoader btreeBulkLoader;
- private final IIndexBulkLoader builder;
- private boolean cleanedUpArtifacts = false;
- private boolean isEmptyComponent = true;
- private boolean endedBloomFilterLoad = false;
private final boolean isTransaction;
+ private final LoadOperation loadOp;
+ private final ChainedLSMDiskComponentBulkLoader componentBulkLoader;
public LSMTwoPCRTreeBulkLoader(float fillFactor, boolean verifyInput, long numElementsHint,
- boolean isTransaction) throws HyracksDataException {
+ boolean isTransaction, Map<String, Object> parameters) throws HyracksDataException {
this.isTransaction = isTransaction;
// Create the appropriate target
+ LSMComponentFileReferences componentFileRefs;
if (isTransaction) {
- component = createTransactionTarget();
+ try {
+ componentFileRefs = fileManager.getNewTransactionFileReference();
+ } catch (IOException e) {
+ throw HyracksDataException.create(e);
+ }
+ component = createDiskComponent(componentFactory, componentFileRefs.getInsertIndexFileReference(),
+ componentFileRefs.getDeleteIndexFileReference(),
+ componentFileRefs.getBloomFilterFileReference(), true);
} else {
- component = createBulkLoadTarget();
+ componentFileRefs = fileManager.getRelFlushFileReference();
+ component =
+ createDiskComponent(bulkLoadComponentFactory, componentFileRefs.getInsertIndexFileReference(),
+ componentFileRefs.getDeleteIndexFileReference(),
+ componentFileRefs.getBloomFilterFileReference(), true);
}
- // Create the three loaders
- rtreeBulkLoader = ((LSMRTreeDiskComponent) component).getIndex().createBulkLoader(fillFactor, verifyInput,
- numElementsHint, false);
- btreeBulkLoader = (BTreeBulkLoader) ((LSMRTreeDiskComponent) component).getBuddyIndex()
- .createBulkLoader(fillFactor, verifyInput, numElementsHint, false);
- int maxBucketsPerElement = BloomCalculations.maxBucketsPerElement(numElementsHint);
- BloomFilterSpecification bloomFilterSpec =
- BloomCalculations.computeBloomSpec(maxBucketsPerElement, bloomFilterFalsePositiveRate);
- builder = ((LSMRTreeDiskComponent) component).getBloomFilter().createBuilder(numElementsHint,
- bloomFilterSpec.getNumHashes(), bloomFilterSpec.getNumBucketsPerElements());
+ loadOp = new LoadOperation(componentFileRefs, ioOpCallback, getIndexIdentifier(), parameters);
+ loadOp.setNewComponent(component);
+ ioOpCallback.scheduled(loadOp);
+ ioOpCallback.beforeOperation(loadOp);
+ componentBulkLoader =
+ component.createBulkLoader(loadOp, fillFactor, verifyInput, numElementsHint, false, true, false);
}
@Override
public void add(ITupleReference tuple) throws HyracksDataException {
- try {
- rtreeBulkLoader.add(tuple);
- } catch (Exception e) {
- cleanupArtifacts();
- throw e;
- }
- if (isEmptyComponent) {
- isEmptyComponent = false;
- }
- }
-
- // This is made public in case of a failure, it is better to delete all
- // created artifacts.
- public void cleanupArtifacts() throws HyracksDataException {
- if (!cleanedUpArtifacts) {
- cleanedUpArtifacts = true;
- ((LSMRTreeDiskComponent) component).deactivateAndDestroy();
- }
+ componentBulkLoader.add(tuple);
}
@Override
public void end() throws HyracksDataException {
- if (!cleanedUpArtifacts) {
- if (!endedBloomFilterLoad) {
- builder.end();
- endedBloomFilterLoad = true;
+ try {
+ ioOpCallback.afterOperation(loadOp);
+ componentBulkLoader.end();
+ if (component.getComponentSize() > 0) {
+ if (isTransaction) {
+ // Since this is a transaction component, validate and
+ // deactivate. it could later be added or deleted
+ component.markAsValid(durable);
+ ioOpCallback.afterFinalize(loadOp);
+ component.deactivate();
+ } else {
+ ioOpCallback.afterFinalize(loadOp);
+ getHarness().addBulkLoadedComponent(component);
+ }
}
- rtreeBulkLoader.end();
- btreeBulkLoader.end();
- if (isEmptyComponent) {
- cleanupArtifacts();
- } else if (isTransaction) {
- // Since this is a transaction component, validate and
- // deactivate. it could later be added or deleted
- component.markAsValid(durable);
- component.deactivate();
- } else {
- getHarness().addBulkLoadedComponent(component);
- }
+ } finally {
+ ioOpCallback.completed(loadOp);
}
}
@Override
public void delete(ITupleReference tuple) throws HyracksDataException {
- try {
- btreeBulkLoader.add(tuple);
- builder.add(tuple);
- } catch (Exception e) {
- cleanupArtifacts();
- throw e;
- }
- if (isEmptyComponent) {
- isEmptyComponent = false;
- }
+ componentBulkLoader.delete(tuple);
}
@Override
- public void abort() {
+ public void abort() throws HyracksDataException {
try {
- cleanupArtifacts();
- } catch (Exception e) {
-
+ try {
+ componentBulkLoader.abort();
+ } finally {
+ ioOpCallback.afterFinalize(loadOp);
+ }
+ } finally {
+ ioOpCallback.completed(loadOp);
}
}
-
- // This method is used to create a target for a bulk modify operation. This
- // component must then eventually be either committed or deleted
- private ILSMDiskComponent createTransactionTarget() throws HyracksDataException {
- LSMComponentFileReferences componentFileRefs;
- try {
- componentFileRefs = fileManager.getNewTransactionFileReference();
- } catch (IOException e) {
- throw HyracksDataException.create(e);
- }
- return createDiskComponent(componentFactory, componentFileRefs.getInsertIndexFileReference(),
- componentFileRefs.getDeleteIndexFileReference(), componentFileRefs.getBloomFilterFileReference(),
- true);
- }
}
// The only change the the schedule merge is the method used to create the