[ASTERIXDB-2188] Ensure recovery of component ids
- user model changes: no
- storage format changes: yes.
Flush log record format changes.
- interface changes: no
Details:
- Add flush component ids to the flush log record. Upon
seeing a flush log record during recovery, schedule
a flush to all indexes in this partition s.t. LSN>maxDiskLSN
to ensure component ids are properly maintained upon
failed flushes.
- Add a test case to ensure the correctness of the recovery logic
of component ids
Change-Id: I8c1fc2b209cfb9d3dafa216771d2b7032eb99e75
Reviewed-on: https://asterix-gerrit.ics.uci.edu/2408
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Contrib: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: abdullah alamoudi <bamousaa@gmail.com>
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java
index 273d832..a8d8610 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java
@@ -44,6 +44,8 @@
import org.apache.asterix.common.api.IDatasetLifecycleManager;
import org.apache.asterix.common.api.INcApplicationContext;
import org.apache.asterix.common.config.ReplicationProperties;
+import org.apache.asterix.common.context.DatasetInfo;
+import org.apache.asterix.common.context.IndexInfo;
import org.apache.asterix.common.dataflow.DatasetLocalResource;
import org.apache.asterix.common.exceptions.ACIDException;
import org.apache.asterix.common.ioopcallbacks.AbstractLSMIOOperationCallback;
@@ -69,9 +71,14 @@
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.lifecycle.ILifeCycleComponent;
import org.apache.hyracks.storage.am.common.impls.NoOpIndexAccessParameters;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentId;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentId.IdCompareResult;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentIdGenerator;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndex;
+import org.apache.hyracks.storage.am.lsm.common.impls.LSMComponentId;
import org.apache.hyracks.storage.common.IIndex;
import org.apache.hyracks.storage.common.LocalResource;
import org.apache.logging.log4j.Level;
@@ -284,6 +291,7 @@
TxnEntityId tempKeyTxnEntityId = new TxnEntityId(-1, -1, -1, null, -1, false);
ILogRecord logRecord = null;
+ ILSMComponentIdGenerator idGenerator = null;
try {
logReader.setPosition(lowWaterMarkLSN);
logRecord = logReader.next();
@@ -363,10 +371,51 @@
}
}
break;
+ case LogType.FLUSH:
+ int partition = logRecord.getResourcePartition();
+ if (partitions.contains(partition)) {
+ int datasetId = logRecord.getDatasetId();
+ idGenerator = datasetLifecycleManager.getComponentIdGenerator(datasetId, partition);
+ if (idGenerator == null) {
+ // it's possible this dataset has been dropped
+ logRecord = logReader.next();
+ continue;
+ }
+ idGenerator.refresh();
+ DatasetInfo dsInfo = datasetLifecycleManager.getDatasetInfo(datasetId);
+ // we only need to flush open indexes here (opened by previous update records)
+ // if an index has no ongoing updates, then it's memory component must be empty
+ // and there is nothing to flush
+ for (IndexInfo iInfo : dsInfo.getIndexes().values()) {
+ if (iInfo.isOpen()) {
+ maxDiskLastLsn = resourceId2MaxLSNMap.get(iInfo.getResourceId());
+ index = iInfo.getIndex();
+ AbstractLSMIOOperationCallback ioCallback =
+ (AbstractLSMIOOperationCallback) index.getIOOperationCallback();
+ if (logRecord.getLSN() > maxDiskLastLsn
+ && !index.isCurrentMutableComponentEmpty()) {
+ // schedule flush
+ ioCallback.updateLastLSN(logRecord.getLSN());
+ redoFlush(index, logRecord);
+ redoCount++;
+ } else {
+ if (index.isMemoryComponentsAllocated()) {
+ // if the memory component has been allocated, we
+ // force it to receive the same Id
+ index.getCurrentMemoryComponent().resetId(idGenerator.getId(), true);
+ } else {
+ // otherwise, we refresh the id stored in ioCallback
+ // to ensure the memory component receives correct Id upon activation
+ ioCallback.forceRefreshNextId();
+ }
+ }
+ }
+ }
+ }
+ break;
case LogType.JOB_COMMIT:
case LogType.ENTITY_COMMIT:
case LogType.ABORT:
- case LogType.FLUSH:
case LogType.WAIT:
case LogType.MARKER:
//do nothing
@@ -736,6 +785,23 @@
}
}
+ private static void redoFlush(ILSMIndex index, ILogRecord logRecord) throws HyracksDataException {
+ ILSMIndexAccessor accessor = index.createAccessor(NoOpIndexAccessParameters.INSTANCE);
+ long minId = logRecord.getFlushingComponentMinId();
+ long maxId = logRecord.getFlushingComponentMaxId();
+ ILSMComponentId id = new LSMComponentId(minId, maxId);
+ if (!index.getDiskComponents().isEmpty()) {
+ ILSMDiskComponent diskComponent = index.getDiskComponents().get(0);
+ ILSMComponentId maxDiskComponentId = diskComponent.getId();
+ if (maxDiskComponentId.compareTo(id) != IdCompareResult.LESS_THAN) {
+ throw new IllegalStateException("Illegal state of component Id. Max disk component Id "
+ + maxDiskComponentId + " should be less than redo flush component Id " + id);
+ }
+ }
+ index.getCurrentMemoryComponent().resetId(id, true);
+ accessor.scheduleFlush(index.getIOOperationCallback());
+ }
+
private class JobEntityCommits {
private static final String PARTITION_FILE_NAME_SEPARATOR = "_";
private final long txnId;
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
index 6c4d068..f99429c 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
@@ -121,8 +121,8 @@
protected static final String TEST_CONFIG_FILE_NAME = "src/main/resources/cc.conf";
protected static TransactionProperties txnProperties;
- private static final boolean cleanupOnStart = true;
- private static final boolean cleanupOnStop = true;
+ private static final boolean CLEANUP_ON_START = true;
+ private static final boolean CLEANUP_ON_STOP = true;
// Constants
public static final int DEFAULT_HYRACKS_CC_CLIENT_PORT = 1098;
@@ -142,6 +142,10 @@
}
public void init() throws Exception {
+ init(CLEANUP_ON_START);
+ }
+
+ public void init(boolean cleanupOnStart) throws Exception {
try {
File outdir = new File(PATH_ACTUAL);
outdir.mkdirs();
@@ -157,6 +161,10 @@
}
public void deInit() throws Exception {
+ deInit(CLEANUP_ON_STOP);
+ }
+
+ public void deInit(boolean cleanupOnStop) throws Exception {
ExternalUDFLibrarian.removeLibraryDir();
ExecutionTestUtil.tearDown(cleanupOnStop);
}
@@ -165,6 +173,10 @@
options.addAll(opts);
}
+ public void clearOpts() {
+ options.clear();
+ }
+
public TxnId getTxnJobId(IHyracksTaskContext ctx) {
return getTxnJobId(ctx.getJobletContext().getJobId());
}
@@ -241,10 +253,15 @@
SecondaryIndexInfo secondaryIndexInfo = new SecondaryIndexInfo(primaryIndexInfo, secondaryIndex);
IIndexDataflowHelperFactory secondaryIndexHelperFactory = new IndexDataflowHelperFactory(
storageComponentProvider.getStorageManager(), secondaryIndexInfo.fileSplitProvider);
+
+ IModificationOperationCallbackFactory secondaryModCallbackFactory =
+ dataset.getModificationCallbackFactory(storageComponentProvider, secondaryIndex,
+ IndexOperation.INSERT, primaryKeyIndexes);
+
LSMInsertDeleteOperatorNodePushable secondaryInsertOp =
new LSMInsertDeleteOperatorNodePushable(ctx, ctx.getTaskAttemptId().getTaskId().getPartition(),
secondaryIndexInfo.insertFieldsPermutations, secondaryIndexInfo.rDesc, op, false,
- secondaryIndexHelperFactory, NoOpOperationCallbackFactory.INSTANCE, null);
+ secondaryIndexHelperFactory, secondaryModCallbackFactory, null);
assignOp.setOutputFrameWriter(0, secondaryInsertOp, secondaryIndexInfo.rDesc);
CommitRuntime commitOp = new CommitRuntime(ctx, getTxnJobId(ctx), dataset.getDatasetId(),
secondaryIndexInfo.primaryKeyIndexes, true, ctx.getTaskAttemptId().getTaskId().getPartition(),
@@ -465,12 +482,12 @@
}
public static class SecondaryIndexInfo {
- private int[] primaryKeyIndexes;
- private PrimaryIndexInfo primaryIndexInfo;
- private Index secondaryIndex;
- private ConstantFileSplitProvider fileSplitProvider;
- private RecordDescriptor rDesc;
- private int[] insertFieldsPermutations;
+ private final int[] primaryKeyIndexes;
+ private final PrimaryIndexInfo primaryIndexInfo;
+ private final Index secondaryIndex;
+ private final ConstantFileSplitProvider fileSplitProvider;
+ private final RecordDescriptor rDesc;
+ private final int[] insertFieldsPermutations;
public SecondaryIndexInfo(PrimaryIndexInfo primaryIndexInfo, Index secondaryIndex) {
this.primaryIndexInfo = primaryIndexInfo;
@@ -504,20 +521,20 @@
}
public static class PrimaryIndexInfo {
- private Dataset dataset;
- private IAType[] primaryKeyTypes;
- private ARecordType recordType;
- private ARecordType metaType;
- private ILSMMergePolicyFactory mergePolicyFactory;
- private Map<String, String> mergePolicyProperties;
- private int primaryIndexNumOfTupleFields;
- private ITypeTraits[] primaryIndexTypeTraits;
- private ISerializerDeserializer<?>[] primaryIndexSerdes;
- private ConstantFileSplitProvider fileSplitProvider;
- private RecordDescriptor rDesc;
- private int[] primaryIndexInsertFieldsPermutations;
- private int[] primaryKeyIndexes;
- private Index index;
+ private final Dataset dataset;
+ private final IAType[] primaryKeyTypes;
+ private final ARecordType recordType;
+ private final ARecordType metaType;
+ private final ILSMMergePolicyFactory mergePolicyFactory;
+ private final Map<String, String> mergePolicyProperties;
+ private final int primaryIndexNumOfTupleFields;
+ private final ITypeTraits[] primaryIndexTypeTraits;
+ private final ISerializerDeserializer<?>[] primaryIndexSerdes;
+ private final ConstantFileSplitProvider fileSplitProvider;
+ private final RecordDescriptor rDesc;
+ private final int[] primaryIndexInsertFieldsPermutations;
+ private final int[] primaryKeyIndexes;
+ private final Index index;
public PrimaryIndexInfo(Dataset dataset, IAType[] primaryKeyTypes, ARecordType recordType, ARecordType metaType,
ILSMMergePolicyFactory mergePolicyFactory, Map<String, String> mergePolicyProperties,
@@ -563,6 +580,10 @@
return index;
}
+ public Dataset getDataset() {
+ return dataset;
+ }
+
public IRecordDescriptorProvider getInsertRecordDescriptorProvider() {
IRecordDescriptorProvider rDescProvider = Mockito.mock(IRecordDescriptorProvider.class);
Mockito.when(rDescProvider.getInputRecordDescriptor(Mockito.any(), Mockito.anyInt())).thenReturn(rDesc);
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/ComponentRollbackTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/ComponentRollbackTest.java
index c6232f5..9ef531e 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/ComponentRollbackTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/ComponentRollbackTest.java
@@ -47,6 +47,7 @@
import org.apache.hyracks.storage.am.common.api.IIndexDataflowHelper;
import org.apache.hyracks.storage.am.common.dataflow.IndexDataflowHelperFactory;
import org.apache.hyracks.storage.am.common.impls.NoOpIndexAccessParameters;
+import org.apache.hyracks.storage.am.lsm.btree.impl.AllowTestOpCallback;
import org.apache.hyracks.storage.am.lsm.btree.impl.TestLsmBtree;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
@@ -282,7 +283,7 @@
lsmAccessor.deleteComponents(
c -> (c instanceof ILSMMemoryComponent && ((ILSMMemoryComponent) c).isModified()));
// now that the rollback has completed, we will unblock the search
- lsmBtree.addSearchCallback(StorageTestUtils.ALLOW_CALLBACK);
+ lsmBtree.addSearchCallback(AllowTestOpCallback.INSTANCE);
lsmBtree.allowSearch(1);
Assert.assertTrue(firstSearcher.result());
// search now and ensure
@@ -303,7 +304,7 @@
DiskComponentLsnPredicate pred = new DiskComponentLsnPredicate(lsn);
lsmAccessor.deleteComponents(pred);
// now that the rollback has completed, we will unblock the search
- lsmBtree.addSearchCallback(StorageTestUtils.ALLOW_CALLBACK);
+ lsmBtree.addSearchCallback(AllowTestOpCallback.INSTANCE);
lsmBtree.allowSearch(1);
Assert.assertTrue(secondSearcher.result());
StorageTestUtils.searchAndAssertCount(nc, PARTITION,
@@ -477,7 +478,7 @@
Rollerback rollerback = new Rollerback(lsmBtree, memoryComponentsPredicate);
//unblock the flush
lsmBtree.allowFlush(1);
- lsmBtree.addSearchCallback(StorageTestUtils.ALLOW_CALLBACK);
+ lsmBtree.addSearchCallback(AllowTestOpCallback.INSTANCE);
lsmBtree.allowSearch(1);
Assert.assertTrue(firstSearcher.result());
// ensure current mem component is not modified
@@ -535,7 +536,7 @@
// now that we enetered, we will rollback
Rollerback rollerback = new Rollerback(lsmBtree, memoryComponentsPredicate);
// The rollback will be waiting for the flush to complete
- lsmBtree.addSearchCallback(StorageTestUtils.ALLOW_CALLBACK);
+ lsmBtree.addSearchCallback(AllowTestOpCallback.INSTANCE);
lsmBtree.allowSearch(1);
Assert.assertTrue(firstSearcher.result());
//unblock the flush
@@ -606,7 +607,7 @@
// unblock the merge
lsmBtree.allowMerge(1);
// unblock the search
- lsmBtree.addSearchCallback(StorageTestUtils.ALLOW_CALLBACK);
+ lsmBtree.addSearchCallback(AllowTestOpCallback.INSTANCE);
lsmBtree.allowSearch(1);
Assert.assertTrue(firstSearcher.result());
rollerback.complete();
@@ -673,7 +674,7 @@
// now that we enetered, we will rollback
Rollerback rollerBack = new Rollerback(lsmBtree, new DiskComponentLsnPredicate(lsn));
// unblock the search
- lsmBtree.addSearchCallback(StorageTestUtils.ALLOW_CALLBACK);
+ lsmBtree.addSearchCallback(AllowTestOpCallback.INSTANCE);
lsmBtree.allowSearch(1);
Assert.assertTrue(firstSearcher.result());
// even though rollback has been called, it is still waiting for the merge to complete
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/LSMFlushRecoveryTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/LSMFlushRecoveryTest.java
new file mode 100644
index 0000000..b10e9b1
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/LSMFlushRecoveryTest.java
@@ -0,0 +1,365 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.test.dataflow;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.Semaphore;
+
+import org.apache.asterix.app.bootstrap.TestNodeController;
+import org.apache.asterix.app.bootstrap.TestNodeController.PrimaryIndexInfo;
+import org.apache.asterix.app.bootstrap.TestNodeController.SecondaryIndexInfo;
+import org.apache.asterix.app.data.gen.TupleGenerator;
+import org.apache.asterix.app.nc.NCAppRuntimeContext;
+import org.apache.asterix.common.api.IDatasetLifecycleManager;
+import org.apache.asterix.common.config.DatasetConfig.IndexType;
+import org.apache.asterix.common.config.StorageProperties.Option;
+import org.apache.asterix.common.dataflow.LSMInsertDeleteOperatorNodePushable;
+import org.apache.asterix.common.transactions.ITransactionContext;
+import org.apache.asterix.common.transactions.ITransactionManager;
+import org.apache.asterix.common.transactions.TransactionOptions;
+import org.apache.asterix.external.util.DataflowUtils;
+import org.apache.asterix.metadata.entities.Dataset;
+import org.apache.asterix.metadata.entities.Index;
+import org.apache.asterix.om.types.BuiltinType;
+import org.apache.asterix.om.types.IAType;
+import org.apache.asterix.test.common.TestHelper;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.hyracks.api.comm.VSizeFrame;
+import org.apache.hyracks.api.config.IOption;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
+import org.apache.hyracks.storage.am.common.api.IIndexDataflowHelper;
+import org.apache.hyracks.storage.am.common.dataflow.IndexDataflowHelperFactory;
+import org.apache.hyracks.storage.am.lsm.btree.impl.AllowTestOpCallback;
+import org.apache.hyracks.storage.am.lsm.btree.impl.ITestOpCallback;
+import org.apache.hyracks.storage.am.lsm.btree.impl.TestLsmBtree;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMMemoryComponent;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class LSMFlushRecoveryTest {
+
+ private static TestNodeController nc;
+ private static Dataset dataset;
+ private static PrimaryIndexInfo primaryIndexInfo;
+ private static SecondaryIndexInfo secondaryIndexInfo;
+ private static TestLsmBtree primaryIndex;
+ private static TestLsmBtree secondaryIndex;
+ private static Index secondaryIndexEntity;
+ private static NCAppRuntimeContext ncAppCtx;
+ private static IDatasetLifecycleManager dsLifecycleMgr;
+
+ private static IHyracksTaskContext ctx;
+ private static IIndexDataflowHelper primaryIndexDataflowHelper;
+ private static IIndexDataflowHelper secondaryIndexDataflowHelper;
+ private static ITransactionContext txnCtx;
+ private static LSMInsertDeleteOperatorNodePushable insertOp;
+ private static final int PARTITION = 0;
+ private static TupleGenerator tupleGenerator;
+
+ private static final String SECONDARY_INDEX_NAME = "TestIdx";
+ private static final IndexType SECONDARY_INDEX_TYPE = IndexType.BTREE;
+ private static final List<List<String>> SECONDARY_INDEX_FIELD_NAMES =
+ Arrays.asList(Arrays.asList(StorageTestUtils.RECORD_TYPE.getFieldNames()[1]));
+ private static final List<Integer> SECONDARY_INDEX_FIELD_INDICATORS = Arrays.asList(Index.RECORD_INDICATOR);
+ private static final List<IAType> SECONDARY_INDEX_FIELD_TYPES = Arrays.asList(BuiltinType.AINT64);
+
+ @BeforeClass
+ public static void setUp() throws Exception {
+ System.out.println("SetUp: ");
+ TestHelper.deleteExistingInstanceFiles();
+ String configPath = System.getProperty("user.dir") + File.separator + "src" + File.separator + "test"
+ + File.separator + "resources" + File.separator + "cc.conf";
+ nc = new TestNodeController(configPath, false);
+ }
+
+ @Before
+ public void initializeTest() throws Exception {
+ // initialize NC before each test
+ initializeNc(true);
+ initializeTestCtx();
+ createIndex();
+ readIndex();
+ insertOp = StorageTestUtils.getInsertPipeline(nc, ctx, secondaryIndexEntity);
+ tupleGenerator = StorageTestUtils.getTupleGenerator();
+ }
+
+ @After
+ public void testRecovery() {
+ try {
+ // right now we've inserted 1000 records to the index, and each record is at least 12 bytes.
+ // thus, the memory component size is at least 12KB.
+ List<Pair<IOption, Object>> opts = new ArrayList<>();
+ opts.add(Pair.of(Option.STORAGE_MEMORYCOMPONENT_GLOBALBUDGET, "128MB"));
+ opts.add(Pair.of(Option.STORAGE_MAX_ACTIVE_WRITABLE_DATASETS, "10000"));
+ nc.setOpts(opts);
+ nc.init(false);
+ initializeTestCtx();
+ readIndex();
+ checkComponentIds();
+ insertOp = StorageTestUtils.getInsertPipeline(nc, ctx, secondaryIndexEntity);
+ // insert more records
+ insertRecords(StorageTestUtils.TOTAL_NUM_OF_RECORDS, StorageTestUtils.RECORDS_PER_COMPONENT);
+ checkComponentIds();
+
+ dropIndex();
+ // cleanup after each test case
+ nc.deInit(true);
+ nc.clearOpts();
+ } catch (Throwable e) {
+ e.printStackTrace();
+ Assert.fail(e.getMessage());
+ }
+ }
+
+ private void initializeNc(boolean cleanUpOnStart) throws Exception {
+ nc.init(cleanUpOnStart);
+ ncAppCtx = nc.getAppRuntimeContext();
+ dsLifecycleMgr = ncAppCtx.getDatasetLifecycleManager();
+ }
+
+ private void createIndex() throws Exception {
+ primaryIndexInfo = StorageTestUtils.createPrimaryIndex(nc, PARTITION);
+ dataset = primaryIndexInfo.getDataset();
+ secondaryIndexEntity = new Index(dataset.getDataverseName(), dataset.getDatasetName(), SECONDARY_INDEX_NAME,
+ SECONDARY_INDEX_TYPE, SECONDARY_INDEX_FIELD_NAMES, SECONDARY_INDEX_FIELD_INDICATORS,
+ SECONDARY_INDEX_FIELD_TYPES, false, false, false, 0);
+ secondaryIndexInfo = nc.createSecondaryIndex(primaryIndexInfo, secondaryIndexEntity,
+ StorageTestUtils.STORAGE_MANAGER, PARTITION);
+ }
+
+ private void initializeTestCtx() throws Exception {
+ JobId jobId = nc.newJobId();
+ ctx = nc.createTestContext(jobId, PARTITION, false);
+ txnCtx = nc.getTransactionManager().beginTransaction(nc.getTxnJobId(ctx),
+ new TransactionOptions(ITransactionManager.AtomicityLevel.ENTITY_LEVEL));
+ }
+
+ private void readIndex() throws HyracksDataException {
+ IndexDataflowHelperFactory primaryHelperFactory =
+ new IndexDataflowHelperFactory(nc.getStorageManager(), primaryIndexInfo.getFileSplitProvider());
+ primaryIndexDataflowHelper = primaryHelperFactory.create(ctx.getJobletContext().getServiceContext(), PARTITION);
+ primaryIndexDataflowHelper.open();
+ primaryIndex = (TestLsmBtree) primaryIndexDataflowHelper.getIndexInstance();
+ primaryIndexDataflowHelper.close();
+
+ IndexDataflowHelperFactory secodnaryIHelperFactory =
+ new IndexDataflowHelperFactory(nc.getStorageManager(), secondaryIndexInfo.getFileSplitProvider());
+ secondaryIndexDataflowHelper =
+ secodnaryIHelperFactory.create(ctx.getJobletContext().getServiceContext(), PARTITION);
+ secondaryIndexDataflowHelper.open();
+ secondaryIndex = (TestLsmBtree) secondaryIndexDataflowHelper.getIndexInstance();
+ secondaryIndexDataflowHelper.close();
+ }
+
+ private void dropIndex() throws HyracksDataException {
+ primaryIndexDataflowHelper.destroy();
+ secondaryIndexDataflowHelper.destroy();
+ }
+
+ @Test
+ public void testBothFlushSucceed() throws Exception {
+ insertRecords(StorageTestUtils.TOTAL_NUM_OF_RECORDS, StorageTestUtils.RECORDS_PER_COMPONENT);
+ // shutdown the server
+ nc.deInit(false);
+ }
+
+ @Test
+ public void testSecondaryFlushFails() throws Exception {
+ insertRecords(StorageTestUtils.TOTAL_NUM_OF_RECORDS, StorageTestUtils.RECORDS_PER_COMPONENT);
+
+ primaryIndex.clearFlushCallbacks();
+ secondaryIndex.clearFlushCallbacks();
+
+ Semaphore primaryFlushSemaphore = new Semaphore(0);
+ secondaryIndex.addFlushCallback(new ITestOpCallback<Semaphore>() {
+ @Override
+ public void before(Semaphore t) throws HyracksDataException {
+ throw new HyracksDataException("Kill the flush thread");
+ }
+
+ @Override
+ public void after() throws HyracksDataException {
+
+ }
+ });
+
+ primaryIndex.addFlushCallback(AllowTestOpCallback.INSTANCE);
+ primaryIndex.addIoAfterFinalizeCallback(new ITestOpCallback<Void>() {
+ @Override
+ public void before(Void t) throws HyracksDataException {
+
+ }
+
+ @Override
+ public void after() throws HyracksDataException {
+ primaryFlushSemaphore.release();
+ }
+ });
+ StorageTestUtils.flush(dsLifecycleMgr, primaryIndex, true);
+
+ primaryFlushSemaphore.acquire();
+ List<ILSMDiskComponent> primaryComponents = primaryIndex.getDiskComponents();
+ List<ILSMDiskComponent> secondaryComponents = secondaryIndex.getDiskComponents();
+ Assert.assertEquals(primaryComponents.size(), secondaryComponents.size() + 1);
+ // shutdown the NC
+ nc.deInit(false);
+ }
+
+ @Test
+ public void testPrimaryFlushFails() throws Exception {
+ insertRecords(StorageTestUtils.TOTAL_NUM_OF_RECORDS, StorageTestUtils.RECORDS_PER_COMPONENT);
+
+ primaryIndex.clearFlushCallbacks();
+ secondaryIndex.clearFlushCallbacks();
+
+ Semaphore secondaryFlushSemaphore = new Semaphore(0);
+
+ primaryIndex.addFlushCallback(new ITestOpCallback<Semaphore>() {
+ @Override
+ public void before(Semaphore t) throws HyracksDataException {
+ throw new HyracksDataException("Kill the flush thread");
+ }
+
+ @Override
+ public void after() throws HyracksDataException {
+
+ }
+ });
+
+ secondaryIndex.addFlushCallback(AllowTestOpCallback.INSTANCE);
+ secondaryIndex.addIoAfterFinalizeCallback(new ITestOpCallback<Void>() {
+ @Override
+ public void before(Void t) throws HyracksDataException {
+
+ }
+
+ @Override
+ public void after() throws HyracksDataException {
+ secondaryFlushSemaphore.release();
+ }
+ });
+ StorageTestUtils.flush(dsLifecycleMgr, primaryIndex, true);
+
+ secondaryFlushSemaphore.acquire();
+ List<ILSMDiskComponent> primaryComponents = primaryIndex.getDiskComponents();
+ List<ILSMDiskComponent> secondaryComponents = secondaryIndex.getDiskComponents();
+ Assert.assertEquals(secondaryComponents.size(), primaryComponents.size() + 1);
+ // shutdown the NC
+ nc.deInit(false);
+ }
+
+ @Test
+ public void testBothFlushFail() throws Exception {
+ insertRecords(StorageTestUtils.TOTAL_NUM_OF_RECORDS, StorageTestUtils.RECORDS_PER_COMPONENT);
+
+ primaryIndex.clearFlushCallbacks();
+ secondaryIndex.clearFlushCallbacks();
+
+ Semaphore primaryFlushSemaphore = new Semaphore(0);
+ Semaphore secondaryFlushSemaphore = new Semaphore(0);
+
+ primaryIndex.addFlushCallback(new ITestOpCallback<Semaphore>() {
+ @Override
+ public void before(Semaphore t) throws HyracksDataException {
+ primaryFlushSemaphore.release();
+ throw new HyracksDataException("Kill the flush thread");
+ }
+
+ @Override
+ public void after() throws HyracksDataException {
+
+ }
+ });
+
+ secondaryIndex.addFlushCallback(new ITestOpCallback<Semaphore>() {
+ @Override
+ public void before(Semaphore t) throws HyracksDataException {
+ secondaryFlushSemaphore.release();
+ throw new HyracksDataException("Kill the fluhs thread");
+ }
+
+ @Override
+ public void after() throws HyracksDataException {
+
+ }
+ });
+ StorageTestUtils.flush(dsLifecycleMgr, primaryIndex, true);
+
+ primaryFlushSemaphore.acquire();
+ secondaryFlushSemaphore.acquire();
+ List<ILSMDiskComponent> primaryComponents = primaryIndex.getDiskComponents();
+ List<ILSMDiskComponent> secondaryComponents = secondaryIndex.getDiskComponents();
+ Assert.assertEquals(secondaryComponents.size(), primaryComponents.size());
+ // shutdown the NC
+ nc.deInit(false);
+ }
+
+ private void insertRecords(int totalNumRecords, int recordsPerComponent) throws Exception {
+ StorageTestUtils.allowAllOps(primaryIndex);
+ StorageTestUtils.allowAllOps(secondaryIndex);
+ insertOp.open();
+ VSizeFrame frame = new VSizeFrame(ctx);
+ FrameTupleAppender tupleAppender = new FrameTupleAppender(frame);
+ for (int i = 0; i < totalNumRecords; i++) {
+ // flush every RECORDS_PER_COMPONENT records
+ if (i % recordsPerComponent == 0 && i + 1 != totalNumRecords) {
+ if (tupleAppender.getTupleCount() > 0) {
+ tupleAppender.write(insertOp, true);
+ }
+ StorageTestUtils.flush(dsLifecycleMgr, primaryIndex, false);
+ }
+ ITupleReference tuple = tupleGenerator.next();
+ DataflowUtils.addTupleToFrame(tupleAppender, tuple, insertOp);
+ }
+ if (tupleAppender.getTupleCount() > 0) {
+ tupleAppender.write(insertOp, true);
+ }
+ insertOp.close();
+ nc.getTransactionManager().commitTransaction(txnCtx.getTxnId());
+ }
+
+ private void checkComponentIds() throws HyracksDataException {
+ // check memory component
+ if (primaryIndex.isMemoryComponentsAllocated()) {
+ ILSMMemoryComponent primaryMemComponent = primaryIndex.getCurrentMemoryComponent();
+ ILSMMemoryComponent secondaryMemComponent = secondaryIndex.getCurrentMemoryComponent();
+ Assert.assertEquals(primaryMemComponent.getId(), secondaryMemComponent.getId());
+ }
+
+ List<ILSMDiskComponent> primaryDiskComponents = primaryIndex.getDiskComponents();
+ List<ILSMDiskComponent> secondaryDiskComponents = secondaryIndex.getDiskComponents();
+
+ Assert.assertEquals(primaryDiskComponents.size(), secondaryDiskComponents.size());
+ for (int i = 0; i < primaryDiskComponents.size(); i++) {
+ Assert.assertEquals(primaryDiskComponents.get(i).getId(), secondaryDiskComponents.get(i).getId());
+ }
+ }
+
+}
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/SearchCursorComponentSwitchTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/SearchCursorComponentSwitchTest.java
index c452548..e4373f6 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/SearchCursorComponentSwitchTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/SearchCursorComponentSwitchTest.java
@@ -57,6 +57,7 @@
import org.apache.hyracks.storage.am.common.api.IIndexDataflowHelper;
import org.apache.hyracks.storage.am.common.dataflow.IndexDataflowHelperFactory;
import org.apache.hyracks.storage.am.common.impls.NoOpIndexAccessParameters;
+import org.apache.hyracks.storage.am.lsm.btree.impl.AllowTestOpCallback;
import org.apache.hyracks.storage.am.lsm.btree.impl.TestLsmBtree;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
@@ -147,7 +148,7 @@
}
void unblockSearch(TestLsmBtree lsmBtree) {
- lsmBtree.addSearchCallback(StorageTestUtils.ALLOW_CALLBACK);
+ lsmBtree.addSearchCallback(AllowTestOpCallback.INSTANCE);
lsmBtree.allowSearch(1);
}
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/StorageTestUtils.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/StorageTestUtils.java
index e7a455c..d9f2e20 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/StorageTestUtils.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/StorageTestUtils.java
@@ -57,6 +57,7 @@
import org.apache.hyracks.api.test.CountAnswer;
import org.apache.hyracks.api.test.FrameWriterTestUtils;
import org.apache.hyracks.api.test.FrameWriterTestUtils.FrameWriterOperation;
+import org.apache.hyracks.storage.am.lsm.btree.impl.AllowTestOpCallback;
import org.apache.hyracks.storage.am.lsm.btree.impl.ITestOpCallback;
import org.apache.hyracks.storage.am.lsm.btree.impl.TestLsmBtree;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
@@ -92,25 +93,20 @@
NoMergePolicyFactory.NAME, null, new InternalDatasetDetails(null, PartitioningStrategy.HASH,
PARTITIONING_KEYS, null, null, null, false, null),
null, DatasetType.INTERNAL, DATASET_ID, 0);
- public static final ITestOpCallback<Semaphore> ALLOW_CALLBACK = new ITestOpCallback<Semaphore>() {
- @Override
- public void before(Semaphore smeaphore) {
- smeaphore.release();
- }
-
- @Override
- public void after() {
- }
- };
private StorageTestUtils() {
}
static void allowAllOps(TestLsmBtree lsmBtree) {
- lsmBtree.addModifyCallback(ALLOW_CALLBACK);
- lsmBtree.addFlushCallback(ALLOW_CALLBACK);
- lsmBtree.addSearchCallback(ALLOW_CALLBACK);
- lsmBtree.addMergeCallback(ALLOW_CALLBACK);
+ lsmBtree.clearModifyCallbacks();
+ lsmBtree.clearFlushCallbacks();
+ lsmBtree.clearSearchCallbacks();
+ lsmBtree.clearMergeCallbacks();
+
+ lsmBtree.addModifyCallback(AllowTestOpCallback.INSTANCE);
+ lsmBtree.addFlushCallback(AllowTestOpCallback.INSTANCE);
+ lsmBtree.addSearchCallback(AllowTestOpCallback.INSTANCE);
+ lsmBtree.addMergeCallback(AllowTestOpCallback.INSTANCE);
}
public static PrimaryIndexInfo createPrimaryIndex(TestNodeController nc, int partition)
@@ -121,8 +117,13 @@
public static LSMInsertDeleteOperatorNodePushable getInsertPipeline(TestNodeController nc, IHyracksTaskContext ctx)
throws HyracksDataException, RemoteException, ACIDException, AlgebricksException {
+ return getInsertPipeline(nc, ctx, null);
+ }
+
+ public static LSMInsertDeleteOperatorNodePushable getInsertPipeline(TestNodeController nc, IHyracksTaskContext ctx,
+ Index secondaryIndex) throws HyracksDataException, RemoteException, ACIDException, AlgebricksException {
return nc.getInsertPipeline(ctx, DATASET, KEY_TYPES, RECORD_TYPE, META_TYPE, null, KEY_INDEXES,
- KEY_INDICATORS_LIST, STORAGE_MANAGER, null).getLeft();
+ KEY_INDICATORS_LIST, STORAGE_MANAGER, secondaryIndex).getLeft();
}
public static TupleGenerator getTupleGenerator() {
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
index 1a61b8f..9de8f73 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
@@ -49,6 +49,7 @@
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMOperationTracker;
import org.apache.hyracks.storage.am.lsm.common.api.IVirtualBufferCache;
+import org.apache.hyracks.storage.am.lsm.common.impls.LSMComponentId;
import org.apache.hyracks.storage.am.lsm.common.impls.LSMComponentIdGenerator;
import org.apache.hyracks.storage.common.IIndex;
import org.apache.hyracks.storage.common.ILocalResourceRepository;
@@ -330,6 +331,9 @@
@Override
public synchronized ILSMComponentIdGenerator getComponentIdGenerator(int datasetId, int partition) {
DatasetResource dataset = datasets.get(datasetId);
+ if (dataset == null) {
+ return null;
+ }
ILSMComponentIdGenerator generator = dataset.getComponentIdGenerator(partition);
if (generator == null) {
populateOpTrackerAndIdGenerator(dataset, partition);
@@ -425,12 +429,26 @@
}
int partition = primaryOpTracker.getPartition();
Collection<ILSMIndex> indexes = dsInfo.getDatasetPartitionOpenIndexes(partition);
+ ILSMIndex flushIndex = null;
+ for (ILSMIndex lsmIndex : indexes) {
+ if (!lsmIndex.isCurrentMutableComponentEmpty()) {
+ flushIndex = lsmIndex;
+ break;
+ }
+ }
+ if (flushIndex == null) {
+ // all open indexes are empty, nothing to flush
+ continue;
+ }
+ LSMComponentId componentId = (LSMComponentId) flushIndex.getCurrentMemoryComponent().getId();
ILSMComponentIdGenerator idGenerator = getComponentIdGenerator(dsInfo.getDatasetID(), partition);
idGenerator.refresh();
if (dsInfo.isDurable()) {
+
synchronized (logRecord) {
- TransactionUtil.formFlushLogRecord(logRecord, dsInfo.getDatasetID(), null);
+ TransactionUtil.formFlushLogRecord(logRecord, dsInfo.getDatasetID(), partition,
+ componentId.getMinId(), componentId.getMaxId(), null);
try {
logManager.log(logRecord);
} catch (ACIDException e) {
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java
index f7f2806..8ed4bb6 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java
@@ -38,6 +38,7 @@
import org.apache.hyracks.storage.am.lsm.common.api.ILSMMemoryComponent;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMOperationTracker;
import org.apache.hyracks.storage.am.lsm.common.api.LSMOperationType;
+import org.apache.hyracks.storage.am.lsm.common.impls.LSMComponentId;
import org.apache.hyracks.storage.common.IModificationOperationCallback;
import org.apache.hyracks.storage.common.ISearchOperationCallback;
@@ -117,6 +118,7 @@
if (needsFlush || flushOnExit) {
//Make the current mutable components READABLE_UNWRITABLE to stop coming modify operations from entering them until the current flush is scheduled.
+ LSMComponentId primaryId = null;
for (ILSMIndex lsmIndex : indexes) {
ILSMOperationTracker opTracker = lsmIndex.getOperationTracker();
synchronized (opTracker) {
@@ -124,8 +126,14 @@
if (memComponent.getState() == ComponentState.READABLE_WRITABLE && memComponent.isModified()) {
memComponent.setState(ComponentState.READABLE_UNWRITABLE);
}
+ if (lsmIndex.isPrimaryIndex()) {
+ primaryId = (LSMComponentId) memComponent.getId();
+ }
}
}
+ if (primaryId == null) {
+ throw new IllegalStateException("Primary index not found in dataset " + dsInfo.getDatasetID());
+ }
LogRecord logRecord = new LogRecord();
flushOnExit = false;
if (dsInfo.isDurable()) {
@@ -133,7 +141,8 @@
* Generate a FLUSH log.
* Flush will be triggered when the log is written to disk by LogFlusher.
*/
- TransactionUtil.formFlushLogRecord(logRecord, datasetID, this);
+ TransactionUtil.formFlushLogRecord(logRecord, datasetID, partition, primaryId.getMinId(),
+ primaryId.getMaxId(), this);
try {
logManager.log(logRecord);
} catch (ACIDException e) {
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/AbstractLSMIOOperationCallback.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/AbstractLSMIOOperationCallback.java
index bacebf1..412981c 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/AbstractLSMIOOperationCallback.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/AbstractLSMIOOperationCallback.java
@@ -213,6 +213,13 @@
}
}
+ /**
+ * Used during the recovery process to force refresh the next component id
+ */
+ public void forceRefreshNextId() {
+ nextComponentIds[writeIndex] = idGenerator.getId();
+ }
+
public synchronized void setFirstLSN(long firstLSN) {
// We make sure that this method is only called on an empty component so the first LSN is not set incorrectly
firstLSNs[writeIndex] = firstLSN;
@@ -258,7 +265,7 @@
@Override
public void recycled(ILSMMemoryComponent component, boolean componentSwitched) throws HyracksDataException {
ILSMComponentId componentId = getLSMComponentId();
- component.resetId(componentId);
+ component.resetId(componentId, false);
if (componentSwitched) {
recycleIndex = (recycleIndex + 1) % nextComponentIds.length;
}
@@ -269,7 +276,7 @@
if (component == lsmIndex.getCurrentMemoryComponent()) {
// only set the component id for the first (current) memory component
ILSMComponentId componentId = getLSMComponentId();
- component.resetId(componentId);
+ component.resetId(componentId, false);
}
}
}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogRecord.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogRecord.java
index 7ddfdfb..e58a6fa 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogRecord.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogRecord.java
@@ -46,6 +46,8 @@
int SEQ_NUM_LEN = Long.BYTES;
int TYPE_LEN = Byte.BYTES;
int UUID_LEN = Long.BYTES;
+ int FLUSHING_COMPONENT_MINID_LEN = Long.BYTES;
+ int FLUSHING_COMPONENT_MAXID_LEN = Long.BYTES;
int ALL_RECORD_HEADER_LEN = LOG_SOURCE_LEN + TYPE_LEN + TxnId.BYTES;
int ENTITYCOMMIT_UPDATE_HEADER_LEN = RS_PARTITION_LEN + DatasetId.BYTES + PKHASH_LEN + PKSZ_LEN;
@@ -55,7 +57,8 @@
int JOB_TERMINATE_LOG_SIZE = ALL_RECORD_HEADER_LEN + CHKSUM_LEN;
int ENTITY_COMMIT_LOG_BASE_SIZE = ALL_RECORD_HEADER_LEN + ENTITYCOMMIT_UPDATE_HEADER_LEN + CHKSUM_LEN;
int UPDATE_LOG_BASE_SIZE = ENTITY_COMMIT_LOG_BASE_SIZE + UPDATE_LSN_HEADER + UPDATE_BODY_HEADER;
- int FLUSH_LOG_SIZE = ALL_RECORD_HEADER_LEN + DatasetId.BYTES + CHKSUM_LEN;
+ int FLUSH_LOG_SIZE = ALL_RECORD_HEADER_LEN + DS_LEN + RS_PARTITION_LEN + FLUSHING_COMPONENT_MINID_LEN
+ + FLUSHING_COMPONENT_MAXID_LEN + CHKSUM_LEN;
int WAIT_LOG_SIZE = ALL_RECORD_HEADER_LEN + CHKSUM_LEN;
int MARKER_BASE_LOG_SIZE =
ALL_RECORD_HEADER_LEN + CHKSUM_LEN + DS_LEN + RS_PARTITION_LEN + PRVLSN_LEN + LOGRCD_SZ_LEN;
@@ -176,4 +179,12 @@
* @return the flag
*/
boolean isReplicate();
+
+ long getFlushingComponentMinId();
+
+ void setFlushingComponentMinId(long flushingComponentMinId);
+
+ long getFlushingComponentMaxId();
+
+ void setFlushingComponentMaxId(long flushingComponentMaxId);
}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/LogRecord.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/LogRecord.java
index 743a3fe..7e61266 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/LogRecord.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/LogRecord.java
@@ -81,6 +81,8 @@
private long checksum;
private long prevMarkerLSN;
private ByteBuffer marker;
+ private long flushingComponentMinId;
+ private long flushingComponentMaxId;
// ------------- fields in a log record (end) --------------//
private final ILogMarkerCallback callback; // A callback for log mark operations
private int PKFieldCnt;
@@ -141,6 +143,9 @@
break;
case LogType.FLUSH:
buffer.putInt(datasetId);
+ buffer.putInt(resourcePartition);
+ buffer.putLong(flushingComponentMinId);
+ buffer.putLong(flushingComponentMaxId);
break;
case LogType.MARKER:
buffer.putInt(datasetId);
@@ -238,13 +243,23 @@
txnId = buffer.getLong();
switch (logType) {
case LogType.FLUSH:
+ if (buffer.remaining() < DS_LEN + RS_PARTITION_LEN + FLUSHING_COMPONENT_MINID_LEN
+ + FLUSHING_COMPONENT_MAXID_LEN) {
+ return RecordReadStatus.TRUNCATED;
+ }
+ datasetId = buffer.getInt();
+ resourcePartition = buffer.getInt();
+ flushingComponentMinId = buffer.getLong();
+ flushingComponentMaxId = buffer.getLong();
+ resourceId = 0l;
+ computeAndSetLogSize();
+ break;
+ case LogType.WAIT:
if (buffer.remaining() < ILogRecord.DS_LEN) {
return RecordReadStatus.TRUNCATED;
}
datasetId = buffer.getInt();
resourceId = 0l;
- // fall throuh
- case LogType.WAIT:
computeAndSetLogSize();
break;
case LogType.JOB_COMMIT:
@@ -710,4 +725,24 @@
public void setRequester(ILogRequester requester) {
this.requester = requester;
}
+
+ @Override
+ public long getFlushingComponentMinId() {
+ return flushingComponentMinId;
+ }
+
+ @Override
+ public void setFlushingComponentMinId(long flushingComponentMinId) {
+ this.flushingComponentMinId = flushingComponentMinId;
+ }
+
+ @Override
+ public long getFlushingComponentMaxId() {
+ return flushingComponentMaxId;
+ }
+
+ @Override
+ public void setFlushingComponentMaxId(long flushingComponentMaxId) {
+ this.flushingComponentMaxId = flushingComponentMaxId;
+ }
}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/TransactionUtil.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/TransactionUtil.java
index c3af0f3..690eeb6 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/TransactionUtil.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/TransactionUtil.java
@@ -45,10 +45,14 @@
logRecord.computeAndSetLogSize();
}
- public static void formFlushLogRecord(LogRecord logRecord, int datasetId, PrimaryIndexOperationTracker opTracker) {
+ public static void formFlushLogRecord(LogRecord logRecord, int datasetId, int resourcePartition,
+ long flushingComponentMinId, long flushingComponentMaxId, PrimaryIndexOperationTracker opTracker) {
logRecord.setLogType(LogType.FLUSH);
logRecord.setTxnId(-1);
logRecord.setDatasetId(datasetId);
+ logRecord.setResourcePartition(resourcePartition);
+ logRecord.setFlushingComponentMinId(flushingComponentMinId);
+ logRecord.setFlushingComponentMaxId(flushingComponentMaxId);
logRecord.setOpTracker(opTracker);
logRecord.computeAndSetLogSize();
}
diff --git a/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/AbstractLSMIOOperationCallbackTest.java b/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/AbstractLSMIOOperationCallbackTest.java
index 1c57d1b..2ab5b4e 100644
--- a/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/AbstractLSMIOOperationCallbackTest.java
+++ b/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/AbstractLSMIOOperationCallbackTest.java
@@ -263,9 +263,11 @@
private void checkMemoryComponent(ILSMComponentId expected, ILSMMemoryComponent memoryComponent)
throws HyracksDataException {
- ArgumentCaptor<ILSMComponentId> argument = ArgumentCaptor.forClass(ILSMComponentId.class);
- Mockito.verify(memoryComponent).resetId(argument.capture());
- assertEquals(expected, argument.getValue());
+ ArgumentCaptor<ILSMComponentId> idArgument = ArgumentCaptor.forClass(ILSMComponentId.class);
+ ArgumentCaptor<Boolean> forceArgument = ArgumentCaptor.forClass(Boolean.class);
+ Mockito.verify(memoryComponent).resetId(idArgument.capture(), forceArgument.capture());
+ assertEquals(expected, idArgument.getValue());
+ assertEquals(false, forceArgument.getValue().booleanValue());
Mockito.reset(memoryComponent);
}
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/logging/RemoteLogsProcessor.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/logging/RemoteLogsProcessor.java
index 5009614..6189e37 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/logging/RemoteLogsProcessor.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/logging/RemoteLogsProcessor.java
@@ -67,7 +67,9 @@
break;
case LogType.FLUSH:
RemoteLogRecord flushLog = new RemoteLogRecord();
- TransactionUtil.formFlushLogRecord(flushLog, reusableLog.getDatasetId(), null);
+ TransactionUtil.formFlushLogRecord(flushLog, reusableLog.getDatasetId(),
+ reusableLog.getResourcePartition(), reusableLog.getFlushingComponentMinId(),
+ reusableLog.getFlushingComponentMaxId(), null);
flushLog.setRequester(this);
flushLog.setLogSource(LogSource.REMOTE);
flushLog.setMasterLsn(reusableLog.getLSN());
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMMemoryComponent.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMMemoryComponent.java
index c72d402..4ff6377 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMMemoryComponent.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMMemoryComponent.java
@@ -107,7 +107,9 @@
* Reset the component Id of the memory component after it's recycled
*
* @param newId
+ * @param force
+ * Whether to force reset the Id to skip sanity checks
* @throws HyracksDataException
*/
- void resetId(ILSMComponentId newId) throws HyracksDataException;
+ void resetId(ILSMComponentId newId, boolean force) throws HyracksDataException;
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMMemoryComponent.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMMemoryComponent.java
index 3fbef18..9596495 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMMemoryComponent.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMMemoryComponent.java
@@ -296,8 +296,8 @@
}
@Override
- public void resetId(ILSMComponentId componentId) throws HyracksDataException {
- if (this.componentId != null && !componentId.missing() // for backward compatibility
+ public void resetId(ILSMComponentId componentId, boolean force) throws HyracksDataException {
+ if (!force && this.componentId != null && !componentId.missing() // for backward compatibility
&& this.componentId.compareTo(componentId) != IdCompareResult.LESS_THAN) {
throw new IllegalStateException(
this + " receives illegal id. Old id " + this.componentId + ", new id " + componentId);
diff --git a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/impl/AllowTestOpCallback.java b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/impl/AllowTestOpCallback.java
new file mode 100644
index 0000000..19a9872
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/impl/AllowTestOpCallback.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.storage.am.lsm.btree.impl;
+
+import java.util.concurrent.Semaphore;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public class AllowTestOpCallback implements ITestOpCallback<Semaphore> {
+
+ public static final ITestOpCallback<Semaphore> INSTANCE = new AllowTestOpCallback();
+
+ private AllowTestOpCallback() {
+ }
+
+ @Override
+ public void before(Semaphore t) throws HyracksDataException {
+ t.release();
+ }
+
+ @Override
+ public void after() throws HyracksDataException {
+
+ }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/impl/TestLsmBtree.java b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/impl/TestLsmBtree.java
index 3c781a6..1d4b7d6 100644
--- a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/impl/TestLsmBtree.java
+++ b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/impl/TestLsmBtree.java
@@ -53,6 +53,7 @@
public class TestLsmBtree extends LSMBTree {
// Semaphores are used to control operations
+ // Operations are allowed by default.
private final Semaphore modifySemaphore = new Semaphore(0);
private final Semaphore searchSemaphore = new Semaphore(0);
private final Semaphore flushSemaphore = new Semaphore(0);
@@ -91,6 +92,11 @@
filterFrameFactory, filterManager, bloomFilterFalsePositiveRate, fieldCount, cmpFactories, mergePolicy,
opTracker, ioScheduler, ioOperationCallbackFactory, needKeyDupCheck, btreeFields, filterFields, durable,
updateAware, tracer);
+
+ addModifyCallback(AllowTestOpCallback.INSTANCE);
+ addSearchCallback(AllowTestOpCallback.INSTANCE);
+ addFlushCallback(AllowTestOpCallback.INSTANCE);
+ addMergeCallback(AllowTestOpCallback.INSTANCE);
}
@Override
@@ -226,13 +232,13 @@
}
public void addModifyCallback(ITestOpCallback<Semaphore> modifyCallback) {
- synchronized (mergeCallbacks) {
+ synchronized (modifyCallbacks) {
modifyCallbacks.add(modifyCallback);
}
}
public void clearModifyCallbacks() {
- synchronized (mergeCallbacks) {
+ synchronized (modifyCallbacks) {
modifyCallbacks.clear();
}
}
@@ -329,6 +335,18 @@
}
}
+ public void addIoAfterFinalizeCallback(ITestOpCallback<Void> callback) {
+ synchronized (ioAfterFinalizeCallbacks) {
+ ioAfterFinalizeCallbacks.add(callback);
+ }
+ }
+
+ public void clearIoAfterFinalizeCallbacks() {
+ synchronized (ioAfterFinalizeCallbacks) {
+ ioAfterFinalizeCallbacks.clear();
+ }
+ }
+
@Override
public void allocateMemoryComponents() throws HyracksDataException {
synchronized (allocateComponentCallbacks) {