[NO ISSUE][STO] Recover from failure in memory allocation callback
- user model changes: no
- storage format changes: no
- interface changes: no
details:
- Previously, if an exception is thrown in the
ILSMIOOperationCallback.allocated call, then the memory component
is allocated but the flag memoryComponentsAllocated is false.
- Any subsequent attempt to modify the index will try to allocate
the component but since it has already been allocated, it will fail
with the exception: File is already mapped.
- In this change, if an exception is thrown from the callback, then
the component is de-allocated before throwing the exception.
- Test is case is added.
Change-Id: I80e605461df18c7f6d7785cd7504ca3acb4f45b1
Reviewed-on: https://asterix-gerrit.ics.uci.edu/2336
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
Sonar-Qube: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Contrib: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/ComponentRollbackTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/ComponentRollbackTest.java
index 9828424..a46b029 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/ComponentRollbackTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/ComponentRollbackTest.java
@@ -20,64 +20,38 @@
import java.io.File;
import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
import java.util.List;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.Semaphore;
import java.util.function.Predicate;
import org.apache.asterix.app.bootstrap.TestNodeController;
import org.apache.asterix.app.bootstrap.TestNodeController.PrimaryIndexInfo;
-import org.apache.asterix.app.data.gen.TestTupleCounterFrameWriter;
import org.apache.asterix.app.data.gen.TupleGenerator;
-import org.apache.asterix.app.data.gen.TupleGenerator.GenerationFunction;
import org.apache.asterix.app.nc.NCAppRuntimeContext;
import org.apache.asterix.common.api.IDatasetLifecycleManager;
-import org.apache.asterix.common.config.DatasetConfig.DatasetType;
-import org.apache.asterix.common.context.PrimaryIndexOperationTracker;
import org.apache.asterix.common.dataflow.LSMInsertDeleteOperatorNodePushable;
import org.apache.asterix.common.ioopcallbacks.AbstractLSMIOOperationCallback;
import org.apache.asterix.common.transactions.ITransactionContext;
import org.apache.asterix.common.transactions.ITransactionManager;
import org.apache.asterix.common.transactions.TransactionOptions;
import org.apache.asterix.external.util.DataflowUtils;
-import org.apache.asterix.file.StorageComponentProvider;
-import org.apache.asterix.metadata.entities.Dataset;
-import org.apache.asterix.metadata.entities.Index;
-import org.apache.asterix.metadata.entities.InternalDatasetDetails;
-import org.apache.asterix.metadata.entities.InternalDatasetDetails.PartitioningStrategy;
-import org.apache.asterix.om.types.ARecordType;
-import org.apache.asterix.om.types.BuiltinType;
-import org.apache.asterix.om.types.IAType;
import org.apache.asterix.test.common.TestHelper;
-import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
-import org.apache.hyracks.algebricks.runtime.base.IPushRuntime;
+import org.apache.asterix.test.dataflow.StorageTestUtils.Flusher;
+import org.apache.asterix.test.dataflow.StorageTestUtils.Merger;
+import org.apache.asterix.test.dataflow.StorageTestUtils.Searcher;
import org.apache.hyracks.api.comm.VSizeFrame;
import org.apache.hyracks.api.context.IHyracksTaskContext;
-import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.job.JobId;
-import org.apache.hyracks.api.test.CountAnswer;
-import org.apache.hyracks.api.test.FrameWriterTestUtils;
-import org.apache.hyracks.api.test.FrameWriterTestUtils.FrameWriterOperation;
import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
import org.apache.hyracks.storage.am.common.api.IIndexDataflowHelper;
import org.apache.hyracks.storage.am.common.dataflow.IndexDataflowHelperFactory;
import org.apache.hyracks.storage.am.common.impls.NoOpIndexAccessParameters;
-import org.apache.hyracks.storage.am.lsm.btree.impl.ITestOpCallback;
import org.apache.hyracks.storage.am.lsm.btree.impl.TestLsmBtree;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
-import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMMemoryComponent;
-import org.apache.hyracks.storage.am.lsm.common.impls.NoMergePolicyFactory;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
@@ -87,46 +61,16 @@
public class ComponentRollbackTest {
- private static final IAType[] KEY_TYPES = { BuiltinType.AINT32 };
- private static final ARecordType RECORD_TYPE = new ARecordType("TestRecordType", new String[] { "key", "value" },
- new IAType[] { BuiltinType.AINT32, BuiltinType.AINT64 }, false);
- private static final GenerationFunction[] RECORD_GEN_FUNCTION =
- { GenerationFunction.DETERMINISTIC, GenerationFunction.DETERMINISTIC };
- private static final boolean[] UNIQUE_RECORD_FIELDS = { true, false };
- private static final ARecordType META_TYPE = null;
- private static final GenerationFunction[] META_GEN_FUNCTION = null;
- private static final boolean[] UNIQUE_META_FIELDS = null;
- private static final int[] KEY_INDEXES = { 0 };
- private static final int[] KEY_INDICATORS = { Index.RECORD_INDICATOR };
- private static final List<Integer> KEY_INDICATORS_LIST = Arrays.asList(new Integer[] { Index.RECORD_INDICATOR });
- private static final int TOTAL_NUM_OF_RECORDS = 10000;
- private static final int RECORDS_PER_COMPONENT = 1000;
- private static final int DATASET_ID = 101;
- private static final String DATAVERSE_NAME = "TestDV";
- private static final String DATASET_NAME = "TestDS";
- private static final String DATA_TYPE_NAME = "DUMMY";
- private static final String NODE_GROUP_NAME = "DEFAULT";
private static final Predicate<ILSMComponent> memoryComponentsPredicate = c -> c instanceof ILSMMemoryComponent;
- private static final StorageComponentProvider storageManager = new StorageComponentProvider();
private static TestNodeController nc;
private static TestLsmBtree lsmBtree;
private static NCAppRuntimeContext ncAppCtx;
private static IDatasetLifecycleManager dsLifecycleMgr;
- private static Dataset dataset;
private static IHyracksTaskContext ctx;
private static IIndexDataflowHelper indexDataflowHelper;
private static ITransactionContext txnCtx;
private static LSMInsertDeleteOperatorNodePushable insertOp;
- public static final ITestOpCallback<Semaphore> ALLOW_CALLBACK = new ITestOpCallback<Semaphore>() {
- @Override
- public void before(Semaphore smeaphore) {
- smeaphore.release();
- }
-
- @Override
- public void after() {
- }
- };
+ private static final int PARTITION = 0;
@BeforeClass
public static void setUp() throws Exception {
@@ -149,27 +93,18 @@
@Before
public void createIndex() throws Exception {
- List<List<String>> partitioningKeys = new ArrayList<>();
- partitioningKeys.add(Collections.singletonList("key"));
- int partition = 0;
- dataset = new TestDataset(DATAVERSE_NAME, DATASET_NAME, DATAVERSE_NAME, DATA_TYPE_NAME,
- NODE_GROUP_NAME, NoMergePolicyFactory.NAME, null, new InternalDatasetDetails(null,
- PartitioningStrategy.HASH, partitioningKeys, null, null, null, false, null),
- null, DatasetType.INTERNAL, DATASET_ID, 0);
- PrimaryIndexInfo primaryIndexInfo = nc.createPrimaryIndex(dataset, KEY_TYPES, RECORD_TYPE, META_TYPE, null,
- storageManager, KEY_INDEXES, KEY_INDICATORS_LIST, partition);
+ PrimaryIndexInfo primaryIndexInfo = StorageTestUtils.createPrimaryIndex(nc, PARTITION);
IndexDataflowHelperFactory iHelperFactory =
new IndexDataflowHelperFactory(nc.getStorageManager(), primaryIndexInfo.getFileSplitProvider());
JobId jobId = nc.newJobId();
- ctx = nc.createTestContext(jobId, partition, false);
- indexDataflowHelper = iHelperFactory.create(ctx.getJobletContext().getServiceContext(), partition);
+ ctx = nc.createTestContext(jobId, PARTITION, false);
+ indexDataflowHelper = iHelperFactory.create(ctx.getJobletContext().getServiceContext(), PARTITION);
indexDataflowHelper.open();
lsmBtree = (TestLsmBtree) indexDataflowHelper.getIndexInstance();
indexDataflowHelper.close();
txnCtx = nc.getTransactionManager().beginTransaction(nc.getTxnJobId(ctx),
new TransactionOptions(ITransactionManager.AtomicityLevel.ENTITY_LEVEL));
- insertOp = nc.getInsertPipeline(ctx, dataset, KEY_TYPES, RECORD_TYPE, META_TYPE, null, KEY_INDEXES,
- KEY_INDICATORS_LIST, storageManager, null).getLeft();
+ insertOp = StorageTestUtils.getInsertPipeline(nc, ctx);
}
@After
@@ -177,26 +112,18 @@
indexDataflowHelper.destroy();
}
- static void allowAllOps(TestLsmBtree lsmBtree) {
- lsmBtree.addModifyCallback(ALLOW_CALLBACK);
- lsmBtree.addFlushCallback(ALLOW_CALLBACK);
- lsmBtree.addSearchCallback(ALLOW_CALLBACK);
- lsmBtree.addMergeCallback(ALLOW_CALLBACK);
- }
-
@Test
public void testRollbackWhileNoOp() {
try {
// allow all operations
- allowAllOps(lsmBtree);
+ StorageTestUtils.allowAllOps(lsmBtree);
insertOp.open();
- TupleGenerator tupleGenerator = new TupleGenerator(RECORD_TYPE, META_TYPE, KEY_INDEXES, KEY_INDICATORS,
- RECORD_GEN_FUNCTION, UNIQUE_RECORD_FIELDS, META_GEN_FUNCTION, UNIQUE_META_FIELDS);
+ TupleGenerator tupleGenerator = StorageTestUtils.getTupleGenerator();
VSizeFrame frame = new VSizeFrame(ctx);
FrameTupleAppender tupleAppender = new FrameTupleAppender(frame);
- for (int j = 0; j < TOTAL_NUM_OF_RECORDS; j++) {
+ for (int j = 0; j < StorageTestUtils.TOTAL_NUM_OF_RECORDS; j++) {
// flush every RECORDS_PER_COMPONENT records
- if (j % RECORDS_PER_COMPONENT == 0 && j + 1 != TOTAL_NUM_OF_RECORDS) {
+ if (j % StorageTestUtils.RECORDS_PER_COMPONENT == 0 && j + 1 != StorageTestUtils.TOTAL_NUM_OF_RECORDS) {
if (tupleAppender.getTupleCount() > 0) {
tupleAppender.write(insertOp, true);
}
@@ -210,28 +137,28 @@
}
insertOp.close();
nc.getTransactionManager().commitTransaction(txnCtx.getTxnId());
-
// get all components
List<ILSMMemoryComponent> memComponents = lsmBtree.getMemoryComponents();
List<ILSMDiskComponent> diskComponents = lsmBtree.getDiskComponents();
Assert.assertEquals(9, diskComponents.size());
Assert.assertTrue(memComponents.get(lsmBtree.getCurrentMemoryComponentIndex()).isModified());
- searchAndAssertCount(nc, 0, dataset, storageManager, TOTAL_NUM_OF_RECORDS);
+ StorageTestUtils.searchAndAssertCount(nc, PARTITION, StorageTestUtils.TOTAL_NUM_OF_RECORDS);
ILSMIndexAccessor lsmAccessor = lsmBtree.createAccessor(NoOpIndexAccessParameters.INSTANCE);
- dsLifecycleMgr.getComponentIdGenerator(DATASET_ID).refresh();
+ dsLifecycleMgr.getComponentIdGenerator(StorageTestUtils.DATASET_ID).refresh();
((AbstractLSMIOOperationCallback) lsmBtree.getIOOperationCallback()).updateLastLSN(0);
// rollback a memory component
lsmAccessor.deleteComponents(memoryComponentsPredicate);
- searchAndAssertCount(nc, 0, dataset, storageManager, TOTAL_NUM_OF_RECORDS - RECORDS_PER_COMPONENT);
+ StorageTestUtils.searchAndAssertCount(nc, PARTITION,
+ StorageTestUtils.TOTAL_NUM_OF_RECORDS - StorageTestUtils.RECORDS_PER_COMPONENT);
// rollback the last disk component
lsmAccessor = lsmBtree.createAccessor(NoOpIndexAccessParameters.INSTANCE);
long lsn = AbstractLSMIOOperationCallback.getTreeIndexLSN(diskComponents.get(0).getMetadata());
-
- dsLifecycleMgr.getComponentIdGenerator(DATASET_ID).refresh();
+ dsLifecycleMgr.getComponentIdGenerator(StorageTestUtils.DATASET_ID).refresh();
((AbstractLSMIOOperationCallback) lsmBtree.getIOOperationCallback()).updateLastLSN(0);
DiskComponentLsnPredicate pred = new DiskComponentLsnPredicate(lsn);
lsmAccessor.deleteComponents(pred);
- searchAndAssertCount(nc, 0, dataset, storageManager, TOTAL_NUM_OF_RECORDS - (2 * RECORDS_PER_COMPONENT));
+ StorageTestUtils.searchAndAssertCount(nc, PARTITION,
+ StorageTestUtils.TOTAL_NUM_OF_RECORDS - (2 * StorageTestUtils.RECORDS_PER_COMPONENT));
} catch (Throwable e) {
e.printStackTrace();
Assert.fail(e.getMessage());
@@ -239,28 +166,21 @@
}
public void flush(boolean async) throws Exception {
- flush(dsLifecycleMgr, lsmBtree, dataset, async);
- }
-
- public static void flush(IDatasetLifecycleManager dsLifecycleMgr, TestLsmBtree lsmBtree, Dataset dataset,
- boolean async) throws Exception {
- waitForOperations(lsmBtree);
- dsLifecycleMgr.flushDataset(dataset.getDatasetId(), async);
+ StorageTestUtils.flush(dsLifecycleMgr, lsmBtree, async);
}
@Test
public void testRollbackThenInsert() {
try {
// allow all operations
- allowAllOps(lsmBtree);
+ StorageTestUtils.allowAllOps(lsmBtree);
insertOp.open();
- TupleGenerator tupleGenerator = new TupleGenerator(RECORD_TYPE, META_TYPE, KEY_INDEXES, KEY_INDICATORS,
- RECORD_GEN_FUNCTION, UNIQUE_RECORD_FIELDS, META_GEN_FUNCTION, UNIQUE_META_FIELDS);
+ TupleGenerator tupleGenerator = StorageTestUtils.getTupleGenerator();
VSizeFrame frame = new VSizeFrame(ctx);
FrameTupleAppender tupleAppender = new FrameTupleAppender(frame);
- for (int j = 0; j < TOTAL_NUM_OF_RECORDS; j++) {
+ for (int j = 0; j < StorageTestUtils.TOTAL_NUM_OF_RECORDS; j++) {
// flush every RECORDS_PER_COMPONENT records
- if (j % RECORDS_PER_COMPONENT == 0 && j + 1 != TOTAL_NUM_OF_RECORDS) {
+ if (j % StorageTestUtils.RECORDS_PER_COMPONENT == 0 && j + 1 != StorageTestUtils.TOTAL_NUM_OF_RECORDS) {
if (tupleAppender.getTupleCount() > 0) {
tupleAppender.write(insertOp, true);
}
@@ -279,23 +199,22 @@
List<ILSMDiskComponent> diskComponents = lsmBtree.getDiskComponents();
Assert.assertEquals(9, diskComponents.size());
Assert.assertTrue(memComponents.get(lsmBtree.getCurrentMemoryComponentIndex()).isModified());
- searchAndAssertCount(nc, 0, dataset, storageManager, TOTAL_NUM_OF_RECORDS);
+ StorageTestUtils.searchAndAssertCount(nc, PARTITION, StorageTestUtils.TOTAL_NUM_OF_RECORDS);
ILSMIndexAccessor lsmAccessor = lsmBtree.createAccessor(NoOpIndexAccessParameters.INSTANCE);
-
- dsLifecycleMgr.getComponentIdGenerator(DATASET_ID).refresh();
+ dsLifecycleMgr.getComponentIdGenerator(StorageTestUtils.DATASET_ID).refresh();
((AbstractLSMIOOperationCallback) lsmBtree.getIOOperationCallback()).updateLastLSN(0);
// rollback a memory component
lsmAccessor.deleteComponents(memoryComponentsPredicate);
- searchAndAssertCount(nc, 0, dataset, storageManager, TOTAL_NUM_OF_RECORDS - RECORDS_PER_COMPONENT);
+ StorageTestUtils.searchAndAssertCount(nc, PARTITION,
+ StorageTestUtils.TOTAL_NUM_OF_RECORDS - StorageTestUtils.RECORDS_PER_COMPONENT);
// insert again
nc.newJobId();
txnCtx = nc.getTransactionManager().beginTransaction(nc.getTxnJobId(ctx),
new TransactionOptions(ITransactionManager.AtomicityLevel.ENTITY_LEVEL));
- insertOp = nc.getInsertPipeline(ctx, dataset, KEY_TYPES, RECORD_TYPE, META_TYPE, null, KEY_INDEXES,
- KEY_INDICATORS_LIST, storageManager, null).getLeft();
+ insertOp = StorageTestUtils.getInsertPipeline(nc, ctx);
insertOp.open();
- for (int j = 0; j < RECORDS_PER_COMPONENT; j++) {
+ for (int j = 0; j < StorageTestUtils.RECORDS_PER_COMPONENT; j++) {
ITupleReference tuple = tupleGenerator.next();
DataflowUtils.addTupleToFrame(tupleAppender, tuple, insertOp);
}
@@ -304,16 +223,16 @@
}
insertOp.close();
nc.getTransactionManager().commitTransaction(txnCtx.getTxnId());
- searchAndAssertCount(nc, 0, dataset, storageManager, TOTAL_NUM_OF_RECORDS);
+ StorageTestUtils.searchAndAssertCount(nc, PARTITION, StorageTestUtils.TOTAL_NUM_OF_RECORDS);
// rollback the last disk component
lsmAccessor = lsmBtree.createAccessor(NoOpIndexAccessParameters.INSTANCE);
long lsn = AbstractLSMIOOperationCallback.getTreeIndexLSN(diskComponents.get(0).getMetadata());
-
- dsLifecycleMgr.getComponentIdGenerator(DATASET_ID).refresh();
+ dsLifecycleMgr.getComponentIdGenerator(StorageTestUtils.DATASET_ID).refresh();
((AbstractLSMIOOperationCallback) lsmBtree.getIOOperationCallback()).updateLastLSN(0);
DiskComponentLsnPredicate pred = new DiskComponentLsnPredicate(lsn);
lsmAccessor.deleteComponents(pred);
- searchAndAssertCount(nc, 0, dataset, storageManager, TOTAL_NUM_OF_RECORDS - (2 * RECORDS_PER_COMPONENT));
+ StorageTestUtils.searchAndAssertCount(nc, PARTITION,
+ StorageTestUtils.TOTAL_NUM_OF_RECORDS - (2 * StorageTestUtils.RECORDS_PER_COMPONENT));
} catch (Throwable e) {
e.printStackTrace();
Assert.fail(e.getMessage());
@@ -324,16 +243,15 @@
public void testRollbackWhileSearch() {
try {
// allow all operations but search
- allowAllOps(lsmBtree);
+ StorageTestUtils.allowAllOps(lsmBtree);
lsmBtree.clearSearchCallbacks();
insertOp.open();
- TupleGenerator tupleGenerator = new TupleGenerator(RECORD_TYPE, META_TYPE, KEY_INDEXES, KEY_INDICATORS,
- RECORD_GEN_FUNCTION, UNIQUE_RECORD_FIELDS, META_GEN_FUNCTION, UNIQUE_META_FIELDS);
+ TupleGenerator tupleGenerator = StorageTestUtils.getTupleGenerator();
VSizeFrame frame = new VSizeFrame(ctx);
FrameTupleAppender tupleAppender = new FrameTupleAppender(frame);
- for (int j = 0; j < TOTAL_NUM_OF_RECORDS; j++) {
+ for (int j = 0; j < StorageTestUtils.TOTAL_NUM_OF_RECORDS; j++) {
// flush every RECORDS_PER_COMPONENT records
- if (j % RECORDS_PER_COMPONENT == 0 && j + 1 != TOTAL_NUM_OF_RECORDS) {
+ if (j % StorageTestUtils.RECORDS_PER_COMPONENT == 0 && j + 1 != StorageTestUtils.TOTAL_NUM_OF_RECORDS) {
if (tupleAppender.getTupleCount() > 0) {
tupleAppender.write(insertOp, true);
}
@@ -353,42 +271,43 @@
List<ILSMDiskComponent> diskComponents = lsmBtree.getDiskComponents();
Assert.assertEquals(9, diskComponents.size());
Assert.assertTrue(memComponents.get(lsmBtree.getCurrentMemoryComponentIndex()).isModified());
- Searcher firstSearcher = new Searcher(nc, 0, dataset, storageManager, lsmBtree, TOTAL_NUM_OF_RECORDS);
+ Searcher firstSearcher = new Searcher(nc, PARTITION, lsmBtree, StorageTestUtils.TOTAL_NUM_OF_RECORDS);
// wait till firstSearcher enter the components
firstSearcher.waitUntilEntered();
// now that we enetered, we will rollback
ILSMIndexAccessor lsmAccessor = lsmBtree.createAccessor(NoOpIndexAccessParameters.INSTANCE);
-
- dsLifecycleMgr.getComponentIdGenerator(DATASET_ID).refresh();
+ dsLifecycleMgr.getComponentIdGenerator(StorageTestUtils.DATASET_ID).refresh();
((AbstractLSMIOOperationCallback) lsmBtree.getIOOperationCallback()).updateLastLSN(0);
// rollback a memory component
lsmAccessor.deleteComponents(
c -> (c instanceof ILSMMemoryComponent && ((ILSMMemoryComponent) c).isModified()));
// now that the rollback has completed, we will unblock the search
- lsmBtree.addSearchCallback(ALLOW_CALLBACK);
+ lsmBtree.addSearchCallback(StorageTestUtils.ALLOW_CALLBACK);
lsmBtree.allowSearch(1);
Assert.assertTrue(firstSearcher.result());
// search now and ensure
- searchAndAssertCount(nc, 0, dataset, storageManager, TOTAL_NUM_OF_RECORDS - RECORDS_PER_COMPONENT);
+ StorageTestUtils.searchAndAssertCount(nc, PARTITION,
+ StorageTestUtils.TOTAL_NUM_OF_RECORDS - StorageTestUtils.RECORDS_PER_COMPONENT);
// rollback the last disk component
// re-block searches
lsmBtree.clearSearchCallbacks();
- Searcher secondSearcher = new Searcher(nc, 0, dataset, storageManager, lsmBtree,
- TOTAL_NUM_OF_RECORDS - RECORDS_PER_COMPONENT);
+ Searcher secondSearcher = new Searcher(nc, PARTITION, lsmBtree,
+ StorageTestUtils.TOTAL_NUM_OF_RECORDS - StorageTestUtils.RECORDS_PER_COMPONENT);
// wait till firstSearcher enter the components
secondSearcher.waitUntilEntered();
lsmAccessor = lsmBtree.createAccessor(NoOpIndexAccessParameters.INSTANCE);
long lsn = AbstractLSMIOOperationCallback.getTreeIndexLSN(diskComponents.get(0).getMetadata());
- dsLifecycleMgr.getComponentIdGenerator(DATASET_ID).refresh();
+ dsLifecycleMgr.getComponentIdGenerator(StorageTestUtils.DATASET_ID).refresh();
((AbstractLSMIOOperationCallback) lsmBtree.getIOOperationCallback()).updateLastLSN(0);
DiskComponentLsnPredicate pred = new DiskComponentLsnPredicate(lsn);
lsmAccessor.deleteComponents(pred);
// now that the rollback has completed, we will unblock the search
- lsmBtree.addSearchCallback(ALLOW_CALLBACK);
+ lsmBtree.addSearchCallback(StorageTestUtils.ALLOW_CALLBACK);
lsmBtree.allowSearch(1);
Assert.assertTrue(secondSearcher.result());
- searchAndAssertCount(nc, 0, dataset, storageManager, TOTAL_NUM_OF_RECORDS - (2 * RECORDS_PER_COMPONENT));
+ StorageTestUtils.searchAndAssertCount(nc, PARTITION,
+ StorageTestUtils.TOTAL_NUM_OF_RECORDS - (2 * StorageTestUtils.RECORDS_PER_COMPONENT));
} catch (Throwable e) {
e.printStackTrace();
Assert.fail(e.getMessage());
@@ -399,15 +318,14 @@
public void testRollbackWhileFlush() {
try {
// allow all operations
- allowAllOps(lsmBtree);
+ StorageTestUtils.allowAllOps(lsmBtree);
insertOp.open();
- TupleGenerator tupleGenerator = new TupleGenerator(RECORD_TYPE, META_TYPE, KEY_INDEXES, KEY_INDICATORS,
- RECORD_GEN_FUNCTION, UNIQUE_RECORD_FIELDS, META_GEN_FUNCTION, UNIQUE_META_FIELDS);
+ TupleGenerator tupleGenerator = StorageTestUtils.getTupleGenerator();
VSizeFrame frame = new VSizeFrame(ctx);
FrameTupleAppender tupleAppender = new FrameTupleAppender(frame);
- for (int j = 0; j < TOTAL_NUM_OF_RECORDS; j++) {
+ for (int j = 0; j < StorageTestUtils.TOTAL_NUM_OF_RECORDS; j++) {
// flush every RECORDS_PER_COMPONENT records
- if (j % RECORDS_PER_COMPONENT == 0 && j + 1 != TOTAL_NUM_OF_RECORDS) {
+ if (j % StorageTestUtils.RECORDS_PER_COMPONENT == 0 && j + 1 != StorageTestUtils.TOTAL_NUM_OF_RECORDS) {
if (tupleAppender.getTupleCount() > 0) {
tupleAppender.write(insertOp, true);
}
@@ -426,7 +344,7 @@
List<ILSMDiskComponent> diskComponents = lsmBtree.getDiskComponents();
Assert.assertEquals(9, diskComponents.size());
Assert.assertTrue(memComponents.get(lsmBtree.getCurrentMemoryComponentIndex()).isModified());
- searchAndAssertCount(nc, 0, dataset, storageManager, TOTAL_NUM_OF_RECORDS);
+ StorageTestUtils.searchAndAssertCount(nc, PARTITION, StorageTestUtils.TOTAL_NUM_OF_RECORDS);
// disable flushes
lsmBtree.clearFlushCallbacks();
Flusher firstFlusher = new Flusher(lsmBtree);
@@ -435,7 +353,7 @@
// now that we enetered, we will rollback. This will not proceed since it is waiting for the flush to complete
Rollerback rollerback = new Rollerback(lsmBtree, memoryComponentsPredicate);
// now that the rollback has completed, we will search
- searchAndAssertCount(nc, 0, dataset, storageManager, TOTAL_NUM_OF_RECORDS);
+ StorageTestUtils.searchAndAssertCount(nc, PARTITION, StorageTestUtils.TOTAL_NUM_OF_RECORDS);
//unblock the flush
lsmBtree.allowFlush(1);
// ensure rollback completed
@@ -443,7 +361,7 @@
// ensure current mem component is not modified
Assert.assertFalse(memComponents.get(lsmBtree.getCurrentMemoryComponentIndex()).isModified());
// search now and ensure
- searchAndAssertCount(nc, 0, dataset, storageManager, TOTAL_NUM_OF_RECORDS);
+ StorageTestUtils.searchAndAssertCount(nc, PARTITION, StorageTestUtils.TOTAL_NUM_OF_RECORDS);
} catch (Throwable e) {
e.printStackTrace();
Assert.fail(e.getMessage());
@@ -454,16 +372,15 @@
public void testRollbackWhileMerge() {
try {
// allow all operations but merge
- allowAllOps(lsmBtree);
+ StorageTestUtils.allowAllOps(lsmBtree);
lsmBtree.clearMergeCallbacks();
insertOp.open();
- TupleGenerator tupleGenerator = new TupleGenerator(RECORD_TYPE, META_TYPE, KEY_INDEXES, KEY_INDICATORS,
- RECORD_GEN_FUNCTION, UNIQUE_RECORD_FIELDS, META_GEN_FUNCTION, UNIQUE_META_FIELDS);
+ TupleGenerator tupleGenerator = StorageTestUtils.getTupleGenerator();
VSizeFrame frame = new VSizeFrame(ctx);
FrameTupleAppender tupleAppender = new FrameTupleAppender(frame);
- for (int j = 0; j < TOTAL_NUM_OF_RECORDS; j++) {
+ for (int j = 0; j < StorageTestUtils.TOTAL_NUM_OF_RECORDS; j++) {
// flush every RECORDS_PER_COMPONENT records
- if (j % RECORDS_PER_COMPONENT == 0 && j + 1 != TOTAL_NUM_OF_RECORDS) {
+ if (j % StorageTestUtils.RECORDS_PER_COMPONENT == 0 && j + 1 != StorageTestUtils.TOTAL_NUM_OF_RECORDS) {
if (tupleAppender.getTupleCount() > 0) {
tupleAppender.write(insertOp, true);
}
@@ -482,7 +399,7 @@
List<ILSMDiskComponent> diskComponents = lsmBtree.getDiskComponents();
Assert.assertEquals(9, diskComponents.size());
Assert.assertTrue(memComponents.get(lsmBtree.getCurrentMemoryComponentIndex()).isModified());
- searchAndAssertCount(nc, 0, dataset, storageManager, TOTAL_NUM_OF_RECORDS);
+ StorageTestUtils.searchAndAssertCount(nc, PARTITION, StorageTestUtils.TOTAL_NUM_OF_RECORDS);
// Now, we will start a full merge
Merger merger = new Merger(lsmBtree);
ILSMIndexAccessor mergeAccessor = lsmBtree.createAccessor(NoOpIndexAccessParameters.INSTANCE);
@@ -499,7 +416,7 @@
Rollerback rollerback = new Rollerback(lsmBtree, new DiskComponentLsnPredicate(lsn));
// rollback is now waiting for the merge to complete
// we will search
- searchAndAssertCount(nc, 0, dataset, storageManager, TOTAL_NUM_OF_RECORDS);
+ StorageTestUtils.searchAndAssertCount(nc, PARTITION, StorageTestUtils.TOTAL_NUM_OF_RECORDS);
//unblock the merge
lsmBtree.allowMerge(1);
// ensure rollback completes
@@ -507,8 +424,8 @@
// ensure current mem component is not modified
Assert.assertFalse(memComponents.get(lsmBtree.getCurrentMemoryComponentIndex()).isModified());
// search now and ensure that we rolled back the merged component
- searchAndAssertCount(nc, 0, dataset, storageManager,
- TOTAL_NUM_OF_RECORDS - ((numMergedComponents + 1/*memory component*/) * RECORDS_PER_COMPONENT));
+ StorageTestUtils.searchAndAssertCount(nc, PARTITION, StorageTestUtils.TOTAL_NUM_OF_RECORDS
+ - ((numMergedComponents + 1/*memory component*/) * StorageTestUtils.RECORDS_PER_COMPONENT));
} catch (Throwable e) {
e.printStackTrace();
Assert.fail(e.getMessage());
@@ -519,15 +436,14 @@
public void testRollbackWhileFlushAndSearchFlushExistsFirst() {
try {
// allow all operations
- allowAllOps(lsmBtree);
+ StorageTestUtils.allowAllOps(lsmBtree);
insertOp.open();
- TupleGenerator tupleGenerator = new TupleGenerator(RECORD_TYPE, META_TYPE, KEY_INDEXES, KEY_INDICATORS,
- RECORD_GEN_FUNCTION, UNIQUE_RECORD_FIELDS, META_GEN_FUNCTION, UNIQUE_META_FIELDS);
+ TupleGenerator tupleGenerator = StorageTestUtils.getTupleGenerator();
VSizeFrame frame = new VSizeFrame(ctx);
FrameTupleAppender tupleAppender = new FrameTupleAppender(frame);
- for (int j = 0; j < TOTAL_NUM_OF_RECORDS; j++) {
+ for (int j = 0; j < StorageTestUtils.TOTAL_NUM_OF_RECORDS; j++) {
// flush every RECORDS_PER_COMPONENT records
- if (j % RECORDS_PER_COMPONENT == 0 && j + 1 != TOTAL_NUM_OF_RECORDS) {
+ if (j % StorageTestUtils.RECORDS_PER_COMPONENT == 0 && j + 1 != StorageTestUtils.TOTAL_NUM_OF_RECORDS) {
if (tupleAppender.getTupleCount() > 0) {
tupleAppender.write(insertOp, true);
}
@@ -546,7 +462,7 @@
List<ILSMDiskComponent> diskComponents = lsmBtree.getDiskComponents();
Assert.assertEquals(9, diskComponents.size());
Assert.assertTrue(memComponents.get(lsmBtree.getCurrentMemoryComponentIndex()).isModified());
- searchAndAssertCount(nc, 0, dataset, storageManager, TOTAL_NUM_OF_RECORDS);
+ StorageTestUtils.searchAndAssertCount(nc, PARTITION, StorageTestUtils.TOTAL_NUM_OF_RECORDS);
// disable flushes
// disable searches
lsmBtree.clearFlushCallbacks();
@@ -554,21 +470,21 @@
Flusher firstFlusher = new Flusher(lsmBtree);
flush(true);
firstFlusher.waitUntilCount(1);
- Searcher firstSearcher = new Searcher(nc, 0, dataset, storageManager, lsmBtree, TOTAL_NUM_OF_RECORDS);
+ Searcher firstSearcher = new Searcher(nc, PARTITION, lsmBtree, StorageTestUtils.TOTAL_NUM_OF_RECORDS);
// wait till firstSearcher enter the components
firstSearcher.waitUntilEntered();
// now that we enetered, we will rollback rollback a memory component
Rollerback rollerback = new Rollerback(lsmBtree, memoryComponentsPredicate);
//unblock the flush
lsmBtree.allowFlush(1);
- lsmBtree.addSearchCallback(ALLOW_CALLBACK);
+ lsmBtree.addSearchCallback(StorageTestUtils.ALLOW_CALLBACK);
lsmBtree.allowSearch(1);
Assert.assertTrue(firstSearcher.result());
// ensure current mem component is not modified
rollerback.complete();
Assert.assertFalse(memComponents.get(lsmBtree.getCurrentMemoryComponentIndex()).isModified());
// search now and ensure the rollback was no op since it waits for ongoing flushes
- searchAndAssertCount(nc, 0, dataset, storageManager, TOTAL_NUM_OF_RECORDS);
+ StorageTestUtils.searchAndAssertCount(nc, PARTITION, StorageTestUtils.TOTAL_NUM_OF_RECORDS);
} catch (Throwable e) {
e.printStackTrace();
Assert.fail(e.getMessage());
@@ -579,15 +495,14 @@
public void testRollbackWhileFlushAndSearchSearchExistsFirst() {
try {
// allow all operations
- allowAllOps(lsmBtree);
+ StorageTestUtils.allowAllOps(lsmBtree);
insertOp.open();
- TupleGenerator tupleGenerator = new TupleGenerator(RECORD_TYPE, META_TYPE, KEY_INDEXES, KEY_INDICATORS,
- RECORD_GEN_FUNCTION, UNIQUE_RECORD_FIELDS, META_GEN_FUNCTION, UNIQUE_META_FIELDS);
+ TupleGenerator tupleGenerator = StorageTestUtils.getTupleGenerator();
VSizeFrame frame = new VSizeFrame(ctx);
FrameTupleAppender tupleAppender = new FrameTupleAppender(frame);
- for (int j = 0; j < TOTAL_NUM_OF_RECORDS; j++) {
+ for (int j = 0; j < StorageTestUtils.TOTAL_NUM_OF_RECORDS; j++) {
// flush every RECORDS_PER_COMPONENT records
- if (j % RECORDS_PER_COMPONENT == 0 && j + 1 != TOTAL_NUM_OF_RECORDS) {
+ if (j % StorageTestUtils.RECORDS_PER_COMPONENT == 0 && j + 1 != StorageTestUtils.TOTAL_NUM_OF_RECORDS) {
if (tupleAppender.getTupleCount() > 0) {
tupleAppender.write(insertOp, true);
}
@@ -606,7 +521,7 @@
List<ILSMDiskComponent> diskComponents = lsmBtree.getDiskComponents();
Assert.assertEquals(9, diskComponents.size());
Assert.assertTrue(memComponents.get(lsmBtree.getCurrentMemoryComponentIndex()).isModified());
- searchAndAssertCount(nc, 0, dataset, storageManager, TOTAL_NUM_OF_RECORDS);
+ StorageTestUtils.searchAndAssertCount(nc, PARTITION, StorageTestUtils.TOTAL_NUM_OF_RECORDS);
// disable flushes
// disable searches
lsmBtree.clearFlushCallbacks();
@@ -614,13 +529,13 @@
flush(true);
firstFlusher.waitUntilCount(1);
lsmBtree.clearSearchCallbacks();
- Searcher firstSearcher = new Searcher(nc, 0, dataset, storageManager, lsmBtree, TOTAL_NUM_OF_RECORDS);
+ Searcher firstSearcher = new Searcher(nc, PARTITION, lsmBtree, StorageTestUtils.TOTAL_NUM_OF_RECORDS);
// wait till firstSearcher enter the components
firstSearcher.waitUntilEntered();
// now that we enetered, we will rollback
Rollerback rollerback = new Rollerback(lsmBtree, memoryComponentsPredicate);
// The rollback will be waiting for the flush to complete
- lsmBtree.addSearchCallback(ALLOW_CALLBACK);
+ lsmBtree.addSearchCallback(StorageTestUtils.ALLOW_CALLBACK);
lsmBtree.allowSearch(1);
Assert.assertTrue(firstSearcher.result());
//unblock the flush
@@ -629,7 +544,7 @@
rollerback.complete();
Assert.assertFalse(memComponents.get(lsmBtree.getCurrentMemoryComponentIndex()).isModified());
// search now and ensure
- searchAndAssertCount(nc, 0, dataset, storageManager, TOTAL_NUM_OF_RECORDS);
+ StorageTestUtils.searchAndAssertCount(nc, PARTITION, StorageTestUtils.TOTAL_NUM_OF_RECORDS);
} catch (Throwable e) {
e.printStackTrace();
Assert.fail(e.getMessage());
@@ -640,16 +555,15 @@
public void testRollbackWhileMergeAndSearchMergeExitsFirst() {
try {
// allow all operations except merge
- allowAllOps(lsmBtree);
+ StorageTestUtils.allowAllOps(lsmBtree);
lsmBtree.clearMergeCallbacks();
insertOp.open();
- TupleGenerator tupleGenerator = new TupleGenerator(RECORD_TYPE, META_TYPE, KEY_INDEXES, KEY_INDICATORS,
- RECORD_GEN_FUNCTION, UNIQUE_RECORD_FIELDS, META_GEN_FUNCTION, UNIQUE_META_FIELDS);
+ TupleGenerator tupleGenerator = StorageTestUtils.getTupleGenerator();
VSizeFrame frame = new VSizeFrame(ctx);
FrameTupleAppender tupleAppender = new FrameTupleAppender(frame);
- for (int j = 0; j < TOTAL_NUM_OF_RECORDS; j++) {
+ for (int j = 0; j < StorageTestUtils.TOTAL_NUM_OF_RECORDS; j++) {
// flush every RECORDS_PER_COMPONENT records
- if (j % RECORDS_PER_COMPONENT == 0 && j + 1 != TOTAL_NUM_OF_RECORDS) {
+ if (j % StorageTestUtils.RECORDS_PER_COMPONENT == 0 && j + 1 != StorageTestUtils.TOTAL_NUM_OF_RECORDS) {
if (tupleAppender.getTupleCount() > 0) {
tupleAppender.write(insertOp, true);
}
@@ -668,7 +582,7 @@
List<ILSMDiskComponent> diskComponents = lsmBtree.getDiskComponents();
Assert.assertEquals(9, diskComponents.size());
Assert.assertTrue(memComponents.get(lsmBtree.getCurrentMemoryComponentIndex()).isModified());
- searchAndAssertCount(nc, 0, dataset, storageManager, TOTAL_NUM_OF_RECORDS);
+ StorageTestUtils.searchAndAssertCount(nc, PARTITION, StorageTestUtils.TOTAL_NUM_OF_RECORDS);
// Now, we will start a merge
Merger merger = new Merger(lsmBtree);
ILSMIndexAccessor mergeAccessor = lsmBtree.createAccessor(NoOpIndexAccessParameters.INSTANCE);
@@ -683,7 +597,7 @@
merger.waitUntilCount(1);
// we will block search
lsmBtree.clearSearchCallbacks();
- Searcher firstSearcher = new Searcher(nc, 0, dataset, storageManager, lsmBtree, TOTAL_NUM_OF_RECORDS);
+ Searcher firstSearcher = new Searcher(nc, PARTITION, lsmBtree, StorageTestUtils.TOTAL_NUM_OF_RECORDS);
// wait till firstSearcher enter the components
firstSearcher.waitUntilEntered();
// now that we enetered, we will rollback
@@ -692,13 +606,13 @@
// unblock the merge
lsmBtree.allowMerge(1);
// unblock the search
- lsmBtree.addSearchCallback(ALLOW_CALLBACK);
+ lsmBtree.addSearchCallback(StorageTestUtils.ALLOW_CALLBACK);
lsmBtree.allowSearch(1);
Assert.assertTrue(firstSearcher.result());
rollerback.complete();
// now that the rollback has completed, we will search
- searchAndAssertCount(nc, 0, dataset, storageManager,
- TOTAL_NUM_OF_RECORDS - ((numMergedComponents + 1/*memory component*/) * RECORDS_PER_COMPONENT));
+ StorageTestUtils.searchAndAssertCount(nc, PARTITION, StorageTestUtils.TOTAL_NUM_OF_RECORDS
+ - ((numMergedComponents + 1/*memory component*/) * StorageTestUtils.RECORDS_PER_COMPONENT));
// ensure current mem component is not modified
Assert.assertFalse(memComponents.get(lsmBtree.getCurrentMemoryComponentIndex()).isModified());
} catch (Throwable e) {
@@ -711,16 +625,15 @@
public void testRollbackWhileMergeAndSearchSearchExitsFirst() {
try {
// allow all operations except merge
- allowAllOps(lsmBtree);
+ StorageTestUtils.allowAllOps(lsmBtree);
lsmBtree.clearMergeCallbacks();
insertOp.open();
- TupleGenerator tupleGenerator = new TupleGenerator(RECORD_TYPE, META_TYPE, KEY_INDEXES, KEY_INDICATORS,
- RECORD_GEN_FUNCTION, UNIQUE_RECORD_FIELDS, META_GEN_FUNCTION, UNIQUE_META_FIELDS);
+ TupleGenerator tupleGenerator = StorageTestUtils.getTupleGenerator();
VSizeFrame frame = new VSizeFrame(ctx);
FrameTupleAppender tupleAppender = new FrameTupleAppender(frame);
- for (int j = 0; j < TOTAL_NUM_OF_RECORDS; j++) {
+ for (int j = 0; j < StorageTestUtils.TOTAL_NUM_OF_RECORDS; j++) {
// flush every RECORDS_PER_COMPONENT records
- if (j % RECORDS_PER_COMPONENT == 0 && j + 1 != TOTAL_NUM_OF_RECORDS) {
+ if (j % StorageTestUtils.RECORDS_PER_COMPONENT == 0 && j + 1 != StorageTestUtils.TOTAL_NUM_OF_RECORDS) {
if (tupleAppender.getTupleCount() > 0) {
tupleAppender.write(insertOp, true);
}
@@ -739,7 +652,7 @@
List<ILSMDiskComponent> diskComponents = lsmBtree.getDiskComponents();
Assert.assertEquals(9, diskComponents.size());
Assert.assertTrue(memComponents.get(lsmBtree.getCurrentMemoryComponentIndex()).isModified());
- searchAndAssertCount(nc, 0, dataset, storageManager, TOTAL_NUM_OF_RECORDS);
+ StorageTestUtils.searchAndAssertCount(nc, PARTITION, StorageTestUtils.TOTAL_NUM_OF_RECORDS);
// Now, we will start a merge
Merger merger = new Merger(lsmBtree);
ILSMIndexAccessor mergeAccessor = lsmBtree.createAccessor(NoOpIndexAccessParameters.INSTANCE);
@@ -754,22 +667,22 @@
merger.waitUntilCount(1);
// we will block search
lsmBtree.clearSearchCallbacks();
- Searcher firstSearcher = new Searcher(nc, 0, dataset, storageManager, lsmBtree, TOTAL_NUM_OF_RECORDS);
+ Searcher firstSearcher = new Searcher(nc, PARTITION, lsmBtree, StorageTestUtils.TOTAL_NUM_OF_RECORDS);
// wait till firstSearcher enter the components
firstSearcher.waitUntilEntered();
// now that we enetered, we will rollback
Rollerback rollerBack = new Rollerback(lsmBtree, new DiskComponentLsnPredicate(lsn));
// unblock the search
- lsmBtree.addSearchCallback(ALLOW_CALLBACK);
+ lsmBtree.addSearchCallback(StorageTestUtils.ALLOW_CALLBACK);
lsmBtree.allowSearch(1);
Assert.assertTrue(firstSearcher.result());
// even though rollback has been called, it is still waiting for the merge to complete
- searchAndAssertCount(nc, 0, dataset, storageManager, TOTAL_NUM_OF_RECORDS);
+ StorageTestUtils.searchAndAssertCount(nc, PARTITION, StorageTestUtils.TOTAL_NUM_OF_RECORDS);
//unblock the merge
lsmBtree.allowMerge(1);
rollerBack.complete();
- searchAndAssertCount(nc, 0, dataset, storageManager,
- TOTAL_NUM_OF_RECORDS - ((numMergedComponents + 1/*memory component*/) * RECORDS_PER_COMPONENT));
+ StorageTestUtils.searchAndAssertCount(nc, PARTITION, StorageTestUtils.TOTAL_NUM_OF_RECORDS
+ - ((numMergedComponents + 1/*memory component*/) * StorageTestUtils.RECORDS_PER_COMPONENT));
// ensure current mem component is not modified
Assert.assertFalse(memComponents.get(lsmBtree.getCurrentMemoryComponentIndex()).isModified());
} catch (Throwable e) {
@@ -789,7 +702,7 @@
public void run() {
ILSMIndexAccessor lsmAccessor = lsmBtree.createAccessor(NoOpIndexAccessParameters.INSTANCE);
try {
- dsLifecycleMgr.getComponentIdGenerator(DATASET_ID).refresh();
+ dsLifecycleMgr.getComponentIdGenerator(StorageTestUtils.DATASET_ID).refresh();
((AbstractLSMIOOperationCallback) lsmBtree.getIOOperationCallback()).updateLastLSN(0);
lsmAccessor.deleteComponents(predicate);
} catch (HyracksDataException e) {
@@ -809,102 +722,6 @@
}
}
- static class Searcher {
- private final ExecutorService executor = Executors.newSingleThreadExecutor();
- private Future<Boolean> task;
- private volatile boolean entered = false;
-
- public Searcher(TestNodeController nc, int partition, Dataset dataset, StorageComponentProvider storageManager,
- TestLsmBtree lsmBtree, int numOfRecords) {
- lsmBtree.addSearchCallback(new ITestOpCallback<Semaphore>() {
-
- @Override
- public void before(Semaphore sem) {
- synchronized (Searcher.this) {
- entered = true;
- Searcher.this.notifyAll();
- }
- }
-
- @Override
- public void after() {
- }
- });
- Callable<Boolean> callable = new Callable<Boolean>() {
- @Override
- public Boolean call() throws Exception {
- searchAndAssertCount(nc, partition, dataset, storageManager, numOfRecords);
- return true;
- }
- };
- task = executor.submit(callable);
- }
-
- boolean result() throws Exception {
- return task.get();
- }
-
- synchronized void waitUntilEntered() throws InterruptedException {
- while (!entered) {
- this.wait();
- }
- }
- }
-
- private class Merger {
- private volatile int count = 0;
-
- public Merger(TestLsmBtree lsmBtree) {
- lsmBtree.addMergeCallback(new ITestOpCallback<Semaphore>() {
-
- @Override
- public void before(Semaphore smeaphore) {
- synchronized (Merger.this) {
- count++;
- Merger.this.notifyAll();
- }
- }
-
- @Override
- public void after() {
- }
- });
- }
-
- synchronized void waitUntilCount(int count) throws InterruptedException {
- while (this.count != count) {
- this.wait();
- }
- }
- }
-
- private class Flusher {
- private volatile int count = 0;
-
- public Flusher(TestLsmBtree lsmBtree) {
- lsmBtree.addFlushCallback(new ITestOpCallback<Semaphore>() {
-
- @Override
- public void before(Semaphore smeaphore) {
- synchronized (Flusher.this) {
- count++;
- Flusher.this.notifyAll();
- }
- }
-
- @Override
- public void after() {
- }
- });
- }
-
- synchronized void waitUntilCount(int count) throws InterruptedException {
- while (this.count != count) {
- this.wait();
- }
- }
- }
-
private static class DiskComponentLsnPredicate implements Predicate<ILSMComponent> {
private final long lsn;
@@ -924,49 +741,4 @@
}
}
}
-
- static void searchAndAssertCount(TestNodeController nc, int partition, Dataset dataset,
- StorageComponentProvider storageManager, int numOfRecords)
- throws HyracksDataException, AlgebricksException {
- JobId jobId = nc.newJobId();
- IHyracksTaskContext ctx = nc.createTestContext(jobId, partition, false);
- TestTupleCounterFrameWriter countOp = create(nc.getSearchOutputDesc(KEY_TYPES, RECORD_TYPE, META_TYPE),
- Collections.emptyList(), Collections.emptyList(), false);
- IPushRuntime emptyTupleOp = nc.getFullScanPipeline(countOp, ctx, dataset, KEY_TYPES, RECORD_TYPE, META_TYPE,
- new NoMergePolicyFactory(), null, null, KEY_INDEXES, KEY_INDICATORS_LIST, storageManager);
- emptyTupleOp.open();
- emptyTupleOp.close();
- Assert.assertEquals(numOfRecords, countOp.getCount());
- }
-
- public static void waitForOperations(ILSMIndex index) throws InterruptedException {
- // wait until number of activeOperation reaches 0
- PrimaryIndexOperationTracker opTracker = (PrimaryIndexOperationTracker) index.getOperationTracker();
- long maxWaitTime = 60000L; // 1 minute
- long before = System.currentTimeMillis();
- while (opTracker.getNumActiveOperations() > 0) {
- Thread.sleep(5); // NOSONAR: Test code with a timeout
- if (System.currentTimeMillis() - before > maxWaitTime) {
- throw new IllegalStateException(
- (System.currentTimeMillis() - before) + "ms passed without completing the frame operation");
- }
- }
- }
-
- public static TestTupleCounterFrameWriter create(RecordDescriptor recordDescriptor,
- Collection<FrameWriterOperation> exceptionThrowingOperations,
- Collection<FrameWriterOperation> errorThrowingOperations, boolean deepCopyInputFrames) {
- CountAnswer openAnswer = FrameWriterTestUtils.createAnswer(FrameWriterOperation.Open,
- exceptionThrowingOperations, errorThrowingOperations);
- CountAnswer nextAnswer = FrameWriterTestUtils.createAnswer(FrameWriterOperation.NextFrame,
- exceptionThrowingOperations, errorThrowingOperations);
- CountAnswer flushAnswer = FrameWriterTestUtils.createAnswer(FrameWriterOperation.Flush,
- exceptionThrowingOperations, errorThrowingOperations);
- CountAnswer failAnswer = FrameWriterTestUtils.createAnswer(FrameWriterOperation.Fail,
- exceptionThrowingOperations, errorThrowingOperations);
- CountAnswer closeAnswer = FrameWriterTestUtils.createAnswer(FrameWriterOperation.Close,
- exceptionThrowingOperations, errorThrowingOperations);
- return new TestTupleCounterFrameWriter(recordDescriptor, openAnswer, nextAnswer, flushAnswer, failAnswer,
- closeAnswer, deepCopyInputFrames);
- }
}
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/IoCallbackFailureTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/IoCallbackFailureTest.java
new file mode 100644
index 0000000..8bafd32
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/IoCallbackFailureTest.java
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.test.dataflow;
+
+import java.io.File;
+
+import org.apache.asterix.app.bootstrap.TestNodeController;
+import org.apache.asterix.app.bootstrap.TestNodeController.PrimaryIndexInfo;
+import org.apache.asterix.app.data.gen.TupleGenerator;
+import org.apache.asterix.app.nc.NCAppRuntimeContext;
+import org.apache.asterix.common.api.IDatasetLifecycleManager;
+import org.apache.asterix.common.dataflow.LSMInsertDeleteOperatorNodePushable;
+import org.apache.asterix.common.transactions.ITransactionContext;
+import org.apache.asterix.common.transactions.ITransactionManager;
+import org.apache.asterix.common.transactions.TransactionOptions;
+import org.apache.asterix.external.util.DataflowUtils;
+import org.apache.asterix.test.common.TestHelper;
+import org.apache.hyracks.api.comm.VSizeFrame;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
+import org.apache.hyracks.storage.am.common.api.IIndexDataflowHelper;
+import org.apache.hyracks.storage.am.common.dataflow.IndexDataflowHelperFactory;
+import org.apache.hyracks.storage.am.lsm.btree.impl.ITestOpCallback;
+import org.apache.hyracks.storage.am.lsm.btree.impl.TestLsmBtree;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMMemoryComponent;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class IoCallbackFailureTest {
+
+ private static final int PARTITION = 0;
+ private static TestNodeController nc;
+
+ @BeforeClass
+ public static void setUp() throws Exception {
+ System.out.println("SetUp: ");
+ TestHelper.deleteExistingInstanceFiles();
+ String configPath = System.getProperty("user.dir") + File.separator + "src" + File.separator + "test"
+ + File.separator + "resources" + File.separator + "cc.conf";
+ nc = new TestNodeController(configPath, false);
+ nc.init();
+ }
+
+ @AfterClass
+ public static void tearDown() throws Exception {
+ System.out.println("TearDown");
+ nc.deInit();
+ TestHelper.deleteExistingInstanceFiles();
+ }
+
+ @Test
+ public void testTempFailureInAllocateCallback() throws Exception {
+ PrimaryIndexInfo primaryIndexInfo = StorageTestUtils.createPrimaryIndex(nc, PARTITION);
+ IndexDataflowHelperFactory iHelperFactory =
+ new IndexDataflowHelperFactory(nc.getStorageManager(), primaryIndexInfo.getFileSplitProvider());
+ JobId jobId = nc.newJobId();
+ IHyracksTaskContext ctx = nc.createTestContext(jobId, PARTITION, false);
+ IIndexDataflowHelper indexDataflowHelper =
+ iHelperFactory.create(ctx.getJobletContext().getServiceContext(), PARTITION);
+ indexDataflowHelper.open();
+ TestLsmBtree lsmBtree = (TestLsmBtree) indexDataflowHelper.getIndexInstance();
+ indexDataflowHelper.close();
+ LSMInsertDeleteOperatorNodePushable insertOp = StorageTestUtils.getInsertPipeline(nc, ctx);
+ StorageTestUtils.allowAllOps(lsmBtree);
+ ITestOpCallback<ILSMMemoryComponent> failCallback = new ITestOpCallback<ILSMMemoryComponent>() {
+ @SuppressWarnings("deprecation")
+ @Override
+ public void before(ILSMMemoryComponent c) throws HyracksDataException {
+ throw new HyracksDataException("Fail on allocate callback");
+ }
+
+ @Override
+ public void after() throws HyracksDataException {
+ // No Op
+ }
+ };
+ lsmBtree.addIoAllocateCallback(failCallback);
+ boolean expectedExceptionThrown = false;
+ try {
+ insert(nc, lsmBtree, ctx, insertOp, StorageTestUtils.TOTAL_NUM_OF_RECORDS,
+ StorageTestUtils.RECORDS_PER_COMPONENT);
+ } catch (Exception e) {
+ expectedExceptionThrown = true;
+ }
+ Assert.assertTrue(expectedExceptionThrown);
+ // Clear the callback and retry
+ lsmBtree.clearIoAllocateCallback();
+ jobId = nc.newJobId();
+ ctx = nc.createTestContext(jobId, PARTITION, false);
+ insertOp = StorageTestUtils.getInsertPipeline(nc, ctx);
+ insert(nc, lsmBtree, ctx, insertOp, StorageTestUtils.TOTAL_NUM_OF_RECORDS,
+ StorageTestUtils.RECORDS_PER_COMPONENT);
+ }
+
+ private static void insert(TestNodeController nc, TestLsmBtree lsmBtree, IHyracksTaskContext ctx,
+ LSMInsertDeleteOperatorNodePushable insertOp, int totalNumRecords, int recordsPerComponent)
+ throws Exception {
+ NCAppRuntimeContext ncAppCtx = nc.getAppRuntimeContext();
+ IDatasetLifecycleManager dsLifecycleMgr = ncAppCtx.getDatasetLifecycleManager();
+ TupleGenerator tupleGenerator = StorageTestUtils.getTupleGenerator();
+ ITransactionContext txnCtx = nc.getTransactionManager().beginTransaction(nc.getTxnJobId(ctx),
+ new TransactionOptions(ITransactionManager.AtomicityLevel.ENTITY_LEVEL));
+ boolean failed = false;
+ try {
+ try {
+ insertOp.open();
+ VSizeFrame frame = new VSizeFrame(ctx);
+ FrameTupleAppender tupleAppender = new FrameTupleAppender(frame);
+ for (int j = 0; j < totalNumRecords; j++) {
+ // flush every recordsPerComponent records
+ if (j % recordsPerComponent == 0 && j + 1 != totalNumRecords) {
+ if (tupleAppender.getTupleCount() > 0) {
+ tupleAppender.write(insertOp, true);
+ }
+ StorageTestUtils.flush(dsLifecycleMgr, lsmBtree, false);
+ }
+ ITupleReference tuple = tupleGenerator.next();
+ DataflowUtils.addTupleToFrame(tupleAppender, tuple, insertOp);
+ }
+ if (tupleAppender.getTupleCount() > 0) {
+ tupleAppender.write(insertOp, true);
+ }
+ } catch (Throwable th) {
+ failed = true;
+ insertOp.fail();
+ throw th;
+ } finally {
+ insertOp.close();
+ }
+ } finally {
+ if (failed) {
+ nc.getTransactionManager().abortTransaction(txnCtx.getTxnId());
+ } else {
+ nc.getTransactionManager().commitTransaction(txnCtx.getTxnId());
+ }
+ }
+ }
+}
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/MultiPartitionLSMIndexTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/MultiPartitionLSMIndexTest.java
index b39a5c6..367d0b9 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/MultiPartitionLSMIndexTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/MultiPartitionLSMIndexTest.java
@@ -189,8 +189,8 @@
}
// allow all operations
for (int i = 0; i < NUM_PARTITIONS; i++) {
- ComponentRollbackTest.allowAllOps(primaryLsmBtrees[i]);
- ComponentRollbackTest.allowAllOps(secondaryLsmBtrees[i]);
+ StorageTestUtils.allowAllOps(primaryLsmBtrees[i]);
+ StorageTestUtils.allowAllOps(secondaryLsmBtrees[i]);
actors[i].add(new Request(Request.Action.INSERT_OPEN));
}
}
@@ -224,9 +224,9 @@
}
ensureDone(actors[0]);
// search now and ensure partition 0 has all the records
- ComponentRollbackTest.searchAndAssertCount(nc, 0, dataset, storageManager, TOTAL_NUM_OF_RECORDS);
+ StorageTestUtils.searchAndAssertCount(nc, 0, dataset, storageManager, TOTAL_NUM_OF_RECORDS);
// and that partition 1 has no records
- ComponentRollbackTest.searchAndAssertCount(nc, 1, dataset, storageManager, 0);
+ StorageTestUtils.searchAndAssertCount(nc, 1, dataset, storageManager, 0);
// and that partition 0 has numFlushes disk components
Assert.assertEquals(totalNumOfComponents, primaryLsmBtrees[0].getDiskComponents().size());
// and that partition 1 has no disk components
@@ -655,7 +655,7 @@
if (tupleAppender.getTupleCount() > 0) {
tupleAppender.write(insertOps[partition], true);
}
- ComponentRollbackTest.waitForOperations(primaryLsmBtrees[partition]);
+ StorageTestUtils.waitForOperations(primaryLsmBtrees[partition]);
break;
default:
break;
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/SearchCursorComponentSwitchTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/SearchCursorComponentSwitchTest.java
index 7bc7a88..c452548 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/SearchCursorComponentSwitchTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/SearchCursorComponentSwitchTest.java
@@ -45,7 +45,7 @@
import org.apache.asterix.om.types.BuiltinType;
import org.apache.asterix.om.types.IAType;
import org.apache.asterix.test.common.TestHelper;
-import org.apache.asterix.test.dataflow.ComponentRollbackTest.Searcher;
+import org.apache.asterix.test.dataflow.StorageTestUtils.Searcher;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.algebricks.runtime.base.IPushRuntime;
import org.apache.hyracks.api.comm.VSizeFrame;
@@ -147,7 +147,7 @@
}
void unblockSearch(TestLsmBtree lsmBtree) {
- lsmBtree.addSearchCallback(ComponentRollbackTest.ALLOW_CALLBACK);
+ lsmBtree.addSearchCallback(StorageTestUtils.ALLOW_CALLBACK);
lsmBtree.allowSearch(1);
}
@@ -155,7 +155,7 @@
public void testCursorSwitchSucceed() {
try {
// allow all operations
- ComponentRollbackTest.allowAllOps(lsmBtree);
+ StorageTestUtils.allowAllOps(lsmBtree);
// except search
lsmBtree.clearSearchCallbacks();
insertOp.open();
@@ -170,7 +170,7 @@
if (tupleAppender.getTupleCount() > 0) {
tupleAppender.write(insertOp, true);
}
- ComponentRollbackTest.flush(dsLifecycleMgr, lsmBtree, dataset, false);
+ StorageTestUtils.flush(dsLifecycleMgr, lsmBtree, dataset, false);
}
ITupleReference tuple = tupleGenerator.next();
DataflowUtils.addTupleToFrame(tupleAppender, tuple, insertOp);
@@ -183,7 +183,7 @@
firstSearcher = new Searcher(nc, 0, dataset, storageManager, lsmBtree, TOTAL_NUM_OF_RECORDS);
// wait till firstSearcher enter the components
firstSearcher.waitUntilEntered();
- ComponentRollbackTest.flush(dsLifecycleMgr, lsmBtree, dataset, false);
+ StorageTestUtils.flush(dsLifecycleMgr, lsmBtree, dataset, false);
nc.getTransactionManager().commitTransaction(txnCtx.getTxnId());
// unblock the search
unblockSearch(lsmBtree);
@@ -201,7 +201,7 @@
public void testCursorSwitchFails() {
try {
// allow all operations
- ComponentRollbackTest.allowAllOps(lsmBtree);
+ StorageTestUtils.allowAllOps(lsmBtree);
// except search
lsmBtree.clearSearchCallbacks();
insertOp.open();
@@ -216,7 +216,7 @@
if (tupleAppender.getTupleCount() > 0) {
tupleAppender.write(insertOp, true);
}
- ComponentRollbackTest.flush(dsLifecycleMgr, lsmBtree, dataset, false);
+ StorageTestUtils.flush(dsLifecycleMgr, lsmBtree, dataset, false);
}
ITupleReference tuple = tupleGenerator.next();
DataflowUtils.addTupleToFrame(tupleAppender, tuple, insertOp);
@@ -229,7 +229,7 @@
firstSearcher = new Searcher(nc, 0, dataset, storageManager, lsmBtree, TOTAL_NUM_OF_RECORDS);
// wait till firstSearcher enter the components
firstSearcher.waitUntilEntered();
- ComponentRollbackTest.flush(dsLifecycleMgr, lsmBtree, dataset, false);
+ StorageTestUtils.flush(dsLifecycleMgr, lsmBtree, dataset, false);
nc.getTransactionManager().commitTransaction(txnCtx.getTxnId());
// merge all components
ILSMIndexAccessor mergeAccessor = lsmBtree.createAccessor(NoOpIndexAccessParameters.INSTANCE);
@@ -255,7 +255,7 @@
throws HyracksDataException, AlgebricksException {
nc.newJobId();
TestTupleCounterFrameWriter countOp =
- ComponentRollbackTest.create(nc.getSearchOutputDesc(KEY_TYPES, RECORD_TYPE, META_TYPE),
+ StorageTestUtils.create(nc.getSearchOutputDesc(KEY_TYPES, RECORD_TYPE, META_TYPE),
Collections.emptyList(), Collections.emptyList(), false);
IPushRuntime emptyTupleOp = nc.getFullScanPipeline(countOp, ctx, dataset, KEY_TYPES, RECORD_TYPE, META_TYPE,
new NoMergePolicyFactory(), null, null, KEY_INDEXES, KEY_INDICATORS_LIST, storageManager);
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/StorageTestUtils.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/StorageTestUtils.java
new file mode 100644
index 0000000..e7a455c
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/StorageTestUtils.java
@@ -0,0 +1,293 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.test.dataflow;
+
+import java.rmi.RemoteException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.Semaphore;
+
+import org.apache.asterix.app.bootstrap.TestNodeController;
+import org.apache.asterix.app.bootstrap.TestNodeController.PrimaryIndexInfo;
+import org.apache.asterix.app.data.gen.TestTupleCounterFrameWriter;
+import org.apache.asterix.app.data.gen.TupleGenerator;
+import org.apache.asterix.app.data.gen.TupleGenerator.GenerationFunction;
+import org.apache.asterix.common.api.IDatasetLifecycleManager;
+import org.apache.asterix.common.config.DatasetConfig.DatasetType;
+import org.apache.asterix.common.context.PrimaryIndexOperationTracker;
+import org.apache.asterix.common.dataflow.LSMInsertDeleteOperatorNodePushable;
+import org.apache.asterix.common.exceptions.ACIDException;
+import org.apache.asterix.file.StorageComponentProvider;
+import org.apache.asterix.metadata.entities.Dataset;
+import org.apache.asterix.metadata.entities.Index;
+import org.apache.asterix.metadata.entities.InternalDatasetDetails;
+import org.apache.asterix.metadata.entities.InternalDatasetDetails.PartitioningStrategy;
+import org.apache.asterix.om.types.ARecordType;
+import org.apache.asterix.om.types.BuiltinType;
+import org.apache.asterix.om.types.IAType;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.runtime.base.IPushRuntime;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.api.test.CountAnswer;
+import org.apache.hyracks.api.test.FrameWriterTestUtils;
+import org.apache.hyracks.api.test.FrameWriterTestUtils.FrameWriterOperation;
+import org.apache.hyracks.storage.am.lsm.btree.impl.ITestOpCallback;
+import org.apache.hyracks.storage.am.lsm.btree.impl.TestLsmBtree;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
+import org.apache.hyracks.storage.am.lsm.common.impls.NoMergePolicyFactory;
+import org.junit.Assert;
+
+public class StorageTestUtils {
+
+ public static final IAType[] KEY_TYPES = { BuiltinType.AINT32 };
+ public static final ARecordType RECORD_TYPE = new ARecordType("TestRecordType", new String[] { "key", "value" },
+ new IAType[] { BuiltinType.AINT32, BuiltinType.AINT64 }, false);
+ public static final GenerationFunction[] RECORD_GEN_FUNCTION =
+ { GenerationFunction.DETERMINISTIC, GenerationFunction.DETERMINISTIC };
+ public static final boolean[] UNIQUE_RECORD_FIELDS = { true, false };
+ public static final ARecordType META_TYPE = null;
+ public static final GenerationFunction[] META_GEN_FUNCTION = null;
+ public static final boolean[] UNIQUE_META_FIELDS = null;
+ public static final int[] KEY_INDEXES = { 0 };
+ public static final int[] KEY_INDICATORS = { Index.RECORD_INDICATOR };
+ public static final List<Integer> KEY_INDICATORS_LIST = Arrays.asList(new Integer[] { Index.RECORD_INDICATOR });
+ public static final int TOTAL_NUM_OF_RECORDS = 10000;
+ public static final int RECORDS_PER_COMPONENT = 1000;
+ public static final int DATASET_ID = 101;
+ public static final String DATAVERSE_NAME = "TestDV";
+ public static final String DATASET_NAME = "TestDS";
+ public static final String DATA_TYPE_NAME = "DUMMY";
+ public static final String NODE_GROUP_NAME = "DEFAULT";
+ public static final StorageComponentProvider STORAGE_MANAGER = new StorageComponentProvider();
+ public static final List<List<String>> PARTITIONING_KEYS =
+ new ArrayList<>(Collections.singletonList(Collections.singletonList(RECORD_TYPE.getFieldNames()[0])));
+ public static final TestDataset DATASET =
+ new TestDataset(DATAVERSE_NAME, DATASET_NAME, DATAVERSE_NAME, DATA_TYPE_NAME, NODE_GROUP_NAME,
+ NoMergePolicyFactory.NAME, null, new InternalDatasetDetails(null, PartitioningStrategy.HASH,
+ PARTITIONING_KEYS, null, null, null, false, null),
+ null, DatasetType.INTERNAL, DATASET_ID, 0);
+ public static final ITestOpCallback<Semaphore> ALLOW_CALLBACK = new ITestOpCallback<Semaphore>() {
+ @Override
+ public void before(Semaphore smeaphore) {
+ smeaphore.release();
+ }
+
+ @Override
+ public void after() {
+ }
+ };
+
+ private StorageTestUtils() {
+ }
+
+ static void allowAllOps(TestLsmBtree lsmBtree) {
+ lsmBtree.addModifyCallback(ALLOW_CALLBACK);
+ lsmBtree.addFlushCallback(ALLOW_CALLBACK);
+ lsmBtree.addSearchCallback(ALLOW_CALLBACK);
+ lsmBtree.addMergeCallback(ALLOW_CALLBACK);
+ }
+
+ public static PrimaryIndexInfo createPrimaryIndex(TestNodeController nc, int partition)
+ throws HyracksDataException, RemoteException, ACIDException, AlgebricksException {
+ return nc.createPrimaryIndex(DATASET, KEY_TYPES, RECORD_TYPE, META_TYPE, null, STORAGE_MANAGER, KEY_INDEXES,
+ KEY_INDICATORS_LIST, partition);
+ }
+
+ public static LSMInsertDeleteOperatorNodePushable getInsertPipeline(TestNodeController nc, IHyracksTaskContext ctx)
+ throws HyracksDataException, RemoteException, ACIDException, AlgebricksException {
+ return nc.getInsertPipeline(ctx, DATASET, KEY_TYPES, RECORD_TYPE, META_TYPE, null, KEY_INDEXES,
+ KEY_INDICATORS_LIST, STORAGE_MANAGER, null).getLeft();
+ }
+
+ public static TupleGenerator getTupleGenerator() {
+ return new TupleGenerator(RECORD_TYPE, META_TYPE, KEY_INDEXES, KEY_INDICATORS, RECORD_GEN_FUNCTION,
+ UNIQUE_RECORD_FIELDS, META_GEN_FUNCTION, UNIQUE_META_FIELDS);
+ }
+
+ public static void searchAndAssertCount(TestNodeController nc, int partition, int numOfRecords)
+ throws HyracksDataException, AlgebricksException {
+ searchAndAssertCount(nc, partition, DATASET, STORAGE_MANAGER, numOfRecords);
+ }
+
+ public static void searchAndAssertCount(TestNodeController nc, int partition, Dataset dataset,
+ StorageComponentProvider storageManager, int numOfRecords)
+ throws HyracksDataException, AlgebricksException {
+ JobId jobId = nc.newJobId();
+ IHyracksTaskContext ctx = nc.createTestContext(jobId, partition, false);
+ TestTupleCounterFrameWriter countOp = create(nc.getSearchOutputDesc(KEY_TYPES, RECORD_TYPE, META_TYPE),
+ Collections.emptyList(), Collections.emptyList(), false);
+ IPushRuntime emptyTupleOp = nc.getFullScanPipeline(countOp, ctx, dataset, KEY_TYPES, RECORD_TYPE, META_TYPE,
+ new NoMergePolicyFactory(), null, null, KEY_INDEXES, KEY_INDICATORS_LIST, storageManager);
+ emptyTupleOp.open();
+ emptyTupleOp.close();
+ Assert.assertEquals(numOfRecords, countOp.getCount());
+ }
+
+ public static TestTupleCounterFrameWriter create(RecordDescriptor recordDescriptor,
+ Collection<FrameWriterOperation> exceptionThrowingOperations,
+ Collection<FrameWriterOperation> errorThrowingOperations, boolean deepCopyInputFrames) {
+ CountAnswer openAnswer = FrameWriterTestUtils.createAnswer(FrameWriterOperation.Open,
+ exceptionThrowingOperations, errorThrowingOperations);
+ CountAnswer nextAnswer = FrameWriterTestUtils.createAnswer(FrameWriterOperation.NextFrame,
+ exceptionThrowingOperations, errorThrowingOperations);
+ CountAnswer flushAnswer = FrameWriterTestUtils.createAnswer(FrameWriterOperation.Flush,
+ exceptionThrowingOperations, errorThrowingOperations);
+ CountAnswer failAnswer = FrameWriterTestUtils.createAnswer(FrameWriterOperation.Fail,
+ exceptionThrowingOperations, errorThrowingOperations);
+ CountAnswer closeAnswer = FrameWriterTestUtils.createAnswer(FrameWriterOperation.Close,
+ exceptionThrowingOperations, errorThrowingOperations);
+ return new TestTupleCounterFrameWriter(recordDescriptor, openAnswer, nextAnswer, flushAnswer, failAnswer,
+ closeAnswer, deepCopyInputFrames);
+ }
+
+ public static void flush(IDatasetLifecycleManager dsLifecycleMgr, TestLsmBtree lsmBtree, boolean async)
+ throws Exception {
+ flush(dsLifecycleMgr, lsmBtree, DATASET, async);
+ }
+
+ public static void flush(IDatasetLifecycleManager dsLifecycleMgr, TestLsmBtree lsmBtree, Dataset dataset,
+ boolean async) throws Exception {
+ waitForOperations(lsmBtree);
+ dsLifecycleMgr.flushDataset(dataset.getDatasetId(), async);
+ }
+
+ public static void waitForOperations(ILSMIndex index) throws InterruptedException {
+ // wait until number of activeOperation reaches 0
+ PrimaryIndexOperationTracker opTracker = (PrimaryIndexOperationTracker) index.getOperationTracker();
+ long maxWaitTime = 60000L; // 1 minute
+ long before = System.currentTimeMillis();
+ while (opTracker.getNumActiveOperations() > 0) {
+ Thread.sleep(5); // NOSONAR: Test code with a timeout
+ if (System.currentTimeMillis() - before > maxWaitTime) {
+ throw new IllegalStateException(
+ (System.currentTimeMillis() - before) + "ms passed without completing the frame operation");
+ }
+ }
+ }
+
+ public static class Searcher {
+ private final ExecutorService executor = Executors.newSingleThreadExecutor();
+ private Future<Boolean> task;
+ private volatile boolean entered = false;
+
+ public Searcher(TestNodeController nc, int partition, TestLsmBtree lsmBtree, int numOfRecords) {
+ this(nc, partition, DATASET, STORAGE_MANAGER, lsmBtree, numOfRecords);
+ }
+
+ public Searcher(TestNodeController nc, int partition, Dataset dataset, StorageComponentProvider storageManager,
+ TestLsmBtree lsmBtree, int numOfRecords) {
+ lsmBtree.addSearchCallback(new ITestOpCallback<Semaphore>() {
+
+ @Override
+ public void before(Semaphore sem) {
+ synchronized (Searcher.this) {
+ entered = true;
+ Searcher.this.notifyAll();
+ }
+ }
+
+ @Override
+ public void after() {
+ }
+ });
+ Callable<Boolean> callable = new Callable<Boolean>() {
+ @Override
+ public Boolean call() throws Exception {
+ searchAndAssertCount(nc, partition, dataset, storageManager, numOfRecords);
+ return true;
+ }
+ };
+ task = executor.submit(callable);
+ }
+
+ public boolean result() throws Exception {
+ return task.get();
+ }
+
+ public synchronized void waitUntilEntered() throws InterruptedException {
+ while (!entered) {
+ this.wait();
+ }
+ }
+ }
+
+ public static class Merger {
+ private volatile int count = 0;
+
+ public Merger(TestLsmBtree lsmBtree) {
+ lsmBtree.addMergeCallback(new ITestOpCallback<Semaphore>() {
+
+ @Override
+ public void before(Semaphore smeaphore) {
+ synchronized (Merger.this) {
+ count++;
+ Merger.this.notifyAll();
+ }
+ }
+
+ @Override
+ public void after() {
+ }
+ });
+ }
+
+ public synchronized void waitUntilCount(int count) throws InterruptedException {
+ while (this.count != count) {
+ this.wait();
+ }
+ }
+ }
+
+ public static class Flusher {
+ private volatile int count = 0;
+
+ public Flusher(TestLsmBtree lsmBtree) {
+ lsmBtree.addFlushCallback(new ITestOpCallback<Semaphore>() {
+
+ @Override
+ public void before(Semaphore smeaphore) {
+ synchronized (Flusher.this) {
+ count++;
+ Flusher.this.notifyAll();
+ }
+ }
+
+ @Override
+ public void after() {
+ }
+ });
+ }
+
+ public synchronized void waitUntilCount(int count) throws InterruptedException {
+ while (this.count != count) {
+ this.wait();
+ }
+ }
+ }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/impls/AbstractTreeIndex.java b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/impls/AbstractTreeIndex.java
index 453431d..905c99d 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/impls/AbstractTreeIndex.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/impls/AbstractTreeIndex.java
@@ -81,12 +81,28 @@
throw HyracksDataException.create(ErrorCode.CANNOT_CREATE_ACTIVE_INDEX);
}
fileId = bufferCache.createFile(file);
- bufferCache.openFile(fileId);
- freePageManager.open(fileId);
- freePageManager.init(interiorFrameFactory, leafFrameFactory);
- setRootPage();
- freePageManager.close();
- bufferCache.closeFile(fileId);
+ boolean failed = true;
+ try {
+ bufferCache.openFile(fileId);
+ failed = false;
+ } finally {
+ if (failed) {
+ bufferCache.deleteFile(fileId);
+ }
+ }
+ failed = true;
+ try {
+ freePageManager.open(fileId);
+ freePageManager.init(interiorFrameFactory, leafFrameFactory);
+ setRootPage();
+ freePageManager.close();
+ failed = false;
+ } finally {
+ bufferCache.closeFile(fileId);
+ if (failed) {
+ bufferCache.deleteFile(fileId);
+ }
+ }
}
private void setRootPage() throws HyracksDataException {
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMMemoryComponent.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMMemoryComponent.java
index f892585..c72d402 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMMemoryComponent.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMMemoryComponent.java
@@ -76,7 +76,8 @@
void setState(ComponentState state);
/**
- * Allocates memory to this component, create and activate it
+ * Allocates memory to this component, create and activate it.
+ * This method is atomic. If an exception is thrown, then the call had no effect.
*
* @throws HyracksDataException
*/
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java
index e9f410d..749b3ba 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java
@@ -438,9 +438,29 @@
if (memoryComponentsAllocated || memoryComponents == null) {
return;
}
- for (ILSMMemoryComponent c : memoryComponents) {
- c.allocate();
- ioOpCallback.allocated(c);
+ int i = 0;
+ boolean allocated = false;
+ try {
+ for (; i < memoryComponents.size(); i++) {
+ allocated = false;
+ ILSMMemoryComponent c = memoryComponents.get(i);
+ c.allocate();
+ allocated = true;
+ ioOpCallback.allocated(c);
+ }
+ } finally {
+ if (i < memoryComponents.size()) {
+ // something went wrong
+ if (allocated) {
+ ILSMMemoryComponent c = memoryComponents.get(i);
+ c.deallocate();
+ }
+ // deallocate all previous components
+ for (int j = i - 1; j >= 0; j--) {
+ ILSMMemoryComponent c = memoryComponents.get(j);
+ c.deallocate();
+ }
+ }
}
memoryComponentsAllocated = true;
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMMemoryComponent.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMMemoryComponent.java
index c0bef7d..3fbef18 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMMemoryComponent.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMMemoryComponent.java
@@ -237,19 +237,40 @@
@Override
public final void allocate() throws HyracksDataException {
+ boolean allocated = false;
((IVirtualBufferCache) getIndex().getBufferCache()).open();
- doAllocate();
+ try {
+ doAllocate();
+ allocated = true;
+ } finally {
+ if (!allocated) {
+ ((IVirtualBufferCache) getIndex().getBufferCache()).close();
+ }
+ }
}
protected void doAllocate() throws HyracksDataException {
- getIndex().create();
- getIndex().activate();
+ boolean created = false;
+ boolean activated = false;
+ try {
+ getIndex().create();
+ created = true;
+ getIndex().activate();
+ activated = true;
+ } finally {
+ if (created && !activated) {
+ getIndex().destroy();
+ }
+ }
}
@Override
public final void deallocate() throws HyracksDataException {
- doDeallocate();
- getIndex().getBufferCache().close();
+ try {
+ doDeallocate();
+ } finally {
+ getIndex().getBufferCache().close();
+ }
}
protected void doDeallocate() throws HyracksDataException {
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/MultitenantVirtualBufferCache.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/MultitenantVirtualBufferCache.java
index 7a3d58b..9b25471 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/MultitenantVirtualBufferCache.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/MultitenantVirtualBufferCache.java
@@ -108,7 +108,15 @@
public synchronized void open() throws HyracksDataException {
++openCount;
if (openCount == 1) {
- vbc.open();
+ boolean failed = true;
+ try {
+ vbc.open();
+ failed = false;
+ } finally {
+ if (failed) {
+ openCount--;
+ }
+ }
}
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/IIndex.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/IIndex.java
index 816550b..19b4856 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/IIndex.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/IIndex.java
@@ -34,6 +34,7 @@
* Initializes the persistent state of an index.
* An index cannot be created if it is in the activated state.
* Calling create on an index that is deactivated has the effect of clearing the index.
+ * This method is atomic. If an exception is thrown, then the call had no effect.
*
* @throws HyracksDataException
* if there is an error in the BufferCache while (un)pinning pages, (un)latching pages,
diff --git a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/impl/ITestOpCallback.java b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/impl/ITestOpCallback.java
index acc3347..e888238 100644
--- a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/impl/ITestOpCallback.java
+++ b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/impl/ITestOpCallback.java
@@ -18,8 +18,10 @@
*/
package org.apache.hyracks.storage.am.lsm.btree.impl;
-public interface ITestOpCallback<T> {
- void before(T t);
+import org.apache.hyracks.api.exceptions.HyracksDataException;
- void after();
+public interface ITestOpCallback<T> {
+ void before(T t) throws HyracksDataException;
+
+ void after() throws HyracksDataException;
}
diff --git a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/impl/TestLsmBtree.java b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/impl/TestLsmBtree.java
index bf3bb31..3c781a6 100644
--- a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/impl/TestLsmBtree.java
+++ b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/impl/TestLsmBtree.java
@@ -109,7 +109,7 @@
}
}
- public static <T> void callback(ITestOpCallback<T> callback, T t) {
+ public static <T> void callback(ITestOpCallback<T> callback, T t) throws HyracksDataException {
if (callback != null) {
callback.before(t);
}
@@ -344,7 +344,7 @@
}
}
- public void beforeIoOperationCalled() {
+ public void beforeIoOperationCalled() throws HyracksDataException {
synchronized (ioBeforeCallbacks) {
for (ITestOpCallback<Void> callback : ioBeforeCallbacks) {
callback.before(null);
@@ -352,7 +352,7 @@
}
}
- public void beforeIoOperationReturned() {
+ public void beforeIoOperationReturned() throws HyracksDataException {
synchronized (ioBeforeCallbacks) {
for (ITestOpCallback<Void> callback : ioBeforeCallbacks) {
callback.after();
@@ -360,7 +360,7 @@
}
}
- public void afterIoOperationCalled() {
+ public void afterIoOperationCalled() throws HyracksDataException {
synchronized (ioAfterOpCallbacks) {
for (ITestOpCallback<Void> callback : ioAfterOpCallbacks) {
callback.before(null);
@@ -368,7 +368,7 @@
}
}
- public void afterIoOperationReturned() {
+ public void afterIoOperationReturned() throws HyracksDataException {
synchronized (ioAfterOpCallbacks) {
for (ITestOpCallback<Void> callback : ioAfterOpCallbacks) {
callback.after();
@@ -376,7 +376,7 @@
}
}
- public void afterIoFinalizeCalled() {
+ public void afterIoFinalizeCalled() throws HyracksDataException {
synchronized (ioAfterFinalizeCallbacks) {
for (ITestOpCallback<Void> callback : ioAfterFinalizeCallbacks) {
callback.before(null);
@@ -384,7 +384,7 @@
}
}
- public void afterIoFinalizeReturned() {
+ public void afterIoFinalizeReturned() throws HyracksDataException {
synchronized (ioAfterFinalizeCallbacks) {
for (ITestOpCallback<Void> callback : ioAfterFinalizeCallbacks) {
callback.after();
@@ -392,7 +392,7 @@
}
}
- public void recycledCalled(ILSMMemoryComponent component) {
+ public void recycledCalled(ILSMMemoryComponent component) throws HyracksDataException {
synchronized (ioRecycleCallbacks) {
for (ITestOpCallback<ILSMMemoryComponent> callback : ioRecycleCallbacks) {
callback.before(component);
@@ -400,7 +400,7 @@
}
}
- public void recycledReturned(ILSMMemoryComponent component) {
+ public void recycledReturned(ILSMMemoryComponent component) throws HyracksDataException {
synchronized (ioRecycleCallbacks) {
for (ITestOpCallback<ILSMMemoryComponent> callback : ioRecycleCallbacks) {
callback.after();
@@ -408,7 +408,7 @@
}
}
- public void allocatedCalled(ILSMMemoryComponent component) {
+ public void allocatedCalled(ILSMMemoryComponent component) throws HyracksDataException {
synchronized (ioAllocateCallbacks) {
for (ITestOpCallback<ILSMMemoryComponent> callback : ioAllocateCallbacks) {
callback.before(component);
@@ -416,7 +416,7 @@
}
}
- public void allocatedReturned(ILSMMemoryComponent component) {
+ public void allocatedReturned(ILSMMemoryComponent component) throws HyracksDataException {
synchronized (ioAllocateCallbacks) {
for (ITestOpCallback<ILSMMemoryComponent> callback : ioAllocateCallbacks) {
callback.after();