[NO ISSUE][*DB][STO] indicate null/missing value fix in index checkpoint
- user model changes: no
- storage format changes: yes (backwards-compatible)
- interface changes: no
Details:
- add marker on inintial IndexCheckpoint, propagate on subsequent checkpoints
- ignore extra JSON fields when deserializing IndexCheckpoint
Change-Id: Id93ab9d16887b37cf6c0d011950e7c57f1a1d646
Reviewed-on: https://asterix-gerrit.ics.uci.edu/3324
Sonar-Qube: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/CheckpointInSecondaryIndexTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/CheckpointInSecondaryIndexTest.java
index a0ed26e..bb9c10e 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/CheckpointInSecondaryIndexTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/CheckpointInSecondaryIndexTest.java
@@ -54,6 +54,7 @@
import org.apache.asterix.om.types.BuiltinType;
import org.apache.asterix.om.types.IAType;
import org.apache.asterix.runtime.operators.LSMIndexBulkLoadOperatorNodePushable;
+import org.apache.asterix.test.base.TestMethodTracer;
import org.apache.asterix.test.common.TestHelper;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
@@ -79,7 +80,11 @@
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
+import org.junit.Rule;
import org.junit.Test;
+import org.junit.rules.TestRule;
+import org.junit.rules.TestWatcher;
+import org.junit.runner.Description;
import org.junit.runners.Parameterized;
public class CheckpointInSecondaryIndexTest {
@@ -101,7 +106,7 @@
private static final boolean[] UNIQUE_META_FIELDS = null;
private static final int[] KEY_INDEXES = { 0 };
private static final int[] KEY_INDICATORS = { Index.RECORD_INDICATOR };
- private static final List<Integer> KEY_INDICATORS_LIST = Arrays.asList(new Integer[] { Index.RECORD_INDICATOR });
+ private static final List<Integer> KEY_INDICATORS_LIST = Collections.singletonList(Index.RECORD_INDICATOR);
private static final int RECORDS_PER_COMPONENT = 500;
private static final int DATASET_ID = 101;
private static final String DATAVERSE_NAME = "TestDV";
@@ -113,9 +118,9 @@
private static final IFieldValueGenerator[] SECONDARY_INDEX_VALUE_GENERATOR =
{ new AInt64ValueGenerator(), new AInt32ValueGenerator() };
private static final List<List<String>> INDEX_FIELD_NAMES =
- Arrays.asList(Arrays.asList(RECORD_TYPE.getFieldNames()[1]));
- private static final List<Integer> INDEX_FIELD_INDICATORS = Arrays.asList(Index.RECORD_INDICATOR);
- private static final List<IAType> INDEX_FIELD_TYPES = Arrays.asList(BuiltinType.AINT64);
+ Collections.singletonList(Collections.singletonList(RECORD_TYPE.getFieldNames()[1]));
+ private static final List<Integer> INDEX_FIELD_INDICATORS = Collections.singletonList(Index.RECORD_INDICATOR);
+ private static final List<IAType> INDEX_FIELD_TYPES = Collections.singletonList(BuiltinType.AINT64);
private static final StorageComponentProvider storageManager = new StorageComponentProvider();
private static TestNodeController nc;
private static NCAppRuntimeContext ncAppCtx;
@@ -154,6 +159,9 @@
TestHelper.deleteExistingInstanceFiles();
}
+ @Rule
+ public TestRule tracer = new TestMethodTracer();
+
@Before
public void createIndex() throws Exception {
List<List<String>> partitioningKeys = new ArrayList<>();
@@ -206,50 +214,47 @@
@Test
public void testCheckpointUpdatedWhenSecondaryIsEmpty() throws Exception {
- try {
- // create secondary
- createSecondaryIndex();
- actor.add(new Request(Request.Action.INSERT_PATCH));
- ensureDone(actor);
- // search now and ensure partition 0 has all the records
- StorageTestUtils.searchAndAssertCount(nc, 0, dataset, storageManager, RECORDS_PER_COMPONENT);
- // and that secondary index is empty
- Assert.assertTrue(secondaryLsmBtree.isCurrentMutableComponentEmpty());
- // flush
- actor.add(new Request(Request.Action.FLUSH_DATASET));
- ensureDone(actor);
- // ensure primary has a component
- Assert.assertEquals(1, primaryLsmBtree.getDiskComponents().size());
+ // create secondary
+ createSecondaryIndex();
+ actor.add(new Request(Request.Action.INSERT_PATCH));
+ ensureDone(actor);
+ // search now and ensure partition 0 has all the records
+ StorageTestUtils.searchAndAssertCount(nc, 0, dataset, storageManager, RECORDS_PER_COMPONENT);
+ // and that secondary index is empty
+ Assert.assertTrue(secondaryLsmBtree.isCurrentMutableComponentEmpty());
+ // flush
+ actor.add(new Request(Request.Action.FLUSH_DATASET));
+ ensureDone(actor);
+ // ensure primary has a component
+ Assert.assertEquals(1, primaryLsmBtree.getDiskComponents().size());
- // ensure secondary doesn't have a component
- Assert.assertEquals(0, secondaryLsmBtree.getDiskComponents().size());
- // ensure that current memory component index match
- Assert.assertEquals(secondaryLsmBtree.getCurrentMemoryComponentIndex(),
- primaryLsmBtree.getCurrentMemoryComponentIndex());
- // ensure both checkpoint files has the same component id as the last flushed component id
- ILSMDiskComponent primaryDiskComponent = primaryLsmBtree.getDiskComponents().get(0);
- LSMComponentId id = (LSMComponentId) primaryDiskComponent.getId();
- long min = id.getMinId();
- // primary ref
- Field fileManagerField = AbstractLSMIndex.class.getDeclaredField("fileManager"); //get private declared object from class
- fileManagerField.setAccessible(true); //Make it accessible so you can access it
- ILSMIndexFileManager primaryFileManager = (ILSMIndexFileManager) fileManagerField.get(primaryLsmBtree);
- final ResourceReference primaryRef = ResourceReference
- .of(primaryFileManager.getRelFlushFileReference().getInsertIndexFileReference().getAbsolutePath());
- IIndexCheckpointManager primaryCheckpointManager = getIndexCheckpointManagerProvider().get(primaryRef);
- // secondary ref
- ILSMIndexFileManager secondaryFileManager = (ILSMIndexFileManager) fileManagerField.get(secondaryLsmBtree);
- final ResourceReference secondaryRef = ResourceReference.of(
- secondaryFileManager.getRelFlushFileReference().getInsertIndexFileReference().getAbsolutePath());
- IIndexCheckpointManager secondaryCheckpointManager = getIndexCheckpointManagerProvider().get(secondaryRef);
- IndexCheckpoint latestPrimaryCheckpoint = primaryCheckpointManager.getLatest();
- IndexCheckpoint latestSecondaryCheckpoint = secondaryCheckpointManager.getLatest();
- Assert.assertEquals(latestPrimaryCheckpoint.getLastComponentId(), min);
- Assert.assertEquals(latestSecondaryCheckpoint.getLastComponentId(), min);
- } catch (Throwable e) {
- e.printStackTrace();
- throw e;
- }
+ // ensure secondary doesn't have a component
+ Assert.assertEquals(0, secondaryLsmBtree.getDiskComponents().size());
+ // ensure that current memory component index match
+ Assert.assertEquals(secondaryLsmBtree.getCurrentMemoryComponentIndex(),
+ primaryLsmBtree.getCurrentMemoryComponentIndex());
+ // ensure both checkpoint files has the same component id as the last flushed component id
+ ILSMDiskComponent primaryDiskComponent = primaryLsmBtree.getDiskComponents().get(0);
+ LSMComponentId id = (LSMComponentId) primaryDiskComponent.getId();
+ long min = id.getMinId();
+ // primary ref
+ Field fileManagerField = AbstractLSMIndex.class.getDeclaredField("fileManager"); //get private declared object from class
+ fileManagerField.setAccessible(true); //Make it accessible so you can access it
+ ILSMIndexFileManager primaryFileManager = (ILSMIndexFileManager) fileManagerField.get(primaryLsmBtree);
+ final ResourceReference primaryRef = ResourceReference
+ .of(primaryFileManager.getRelFlushFileReference().getInsertIndexFileReference().getAbsolutePath());
+ IIndexCheckpointManager primaryCheckpointManager = getIndexCheckpointManagerProvider().get(primaryRef);
+ // secondary ref
+ ILSMIndexFileManager secondaryFileManager = (ILSMIndexFileManager) fileManagerField.get(secondaryLsmBtree);
+ final ResourceReference secondaryRef = ResourceReference
+ .of(secondaryFileManager.getRelFlushFileReference().getInsertIndexFileReference().getAbsolutePath());
+ IIndexCheckpointManager secondaryCheckpointManager = getIndexCheckpointManagerProvider().get(secondaryRef);
+ IndexCheckpoint latestPrimaryCheckpoint = primaryCheckpointManager.getLatest();
+ IndexCheckpoint latestSecondaryCheckpoint = secondaryCheckpointManager.getLatest();
+ Assert.assertEquals(latestPrimaryCheckpoint.getLastComponentId(), min);
+ Assert.assertEquals(latestSecondaryCheckpoint.getLastComponentId(), min);
+ Assert.assertTrue(latestPrimaryCheckpoint.hasNullMissingValuesFix());
+ Assert.assertTrue(latestSecondaryCheckpoint.hasNullMissingValuesFix());
}
private void createSecondaryIndex()
@@ -266,204 +271,192 @@
@Test
public void testCheckpointWhenBulkloadingSecondaryAndPrimaryIsSingleComponent() throws Exception {
- try {
- // create secondary
- actor.add(new Request(Request.Action.INSERT_PATCH));
- ensureDone(actor);
- // search now and ensure partition 0 has all the records
- StorageTestUtils.searchAndAssertCount(nc, 0, dataset, storageManager, RECORDS_PER_COMPONENT);
- // flush
- actor.add(new Request(Request.Action.FLUSH_DATASET));
- ensureDone(actor);
- // ensure primary has a component
- Assert.assertEquals(1, primaryLsmBtree.getDiskComponents().size());
- // ensure both checkpoint files has the same component id as the last flushed component id
- ILSMDiskComponent primaryDiskComponent = primaryLsmBtree.getDiskComponents().get(0);
- LSMComponentId id = (LSMComponentId) primaryDiskComponent.getId();
- long min = id.getMinId();
- // primary ref
- Field fileManagerField = AbstractLSMIndex.class.getDeclaredField("fileManager"); //get private declared object from class
- fileManagerField.setAccessible(true); //Make it accessible so you can access it
- ILSMIndexFileManager primaryFileManager = (ILSMIndexFileManager) fileManagerField.get(primaryLsmBtree);
- final ResourceReference primaryRef = ResourceReference
- .of(primaryFileManager.getRelFlushFileReference().getInsertIndexFileReference().getAbsolutePath());
- IIndexCheckpointManager primaryCheckpointManager = getIndexCheckpointManagerProvider().get(primaryRef);
- IndexCheckpoint latestPrimaryCheckpoint = primaryCheckpointManager.getLatest();
- Assert.assertEquals(latestPrimaryCheckpoint.getLastComponentId(), min);
- createSecondaryIndex();
- JobId jobId = nc.newJobId();
- loadTaskCtx = nc.createTestContext(jobId, 0, false);
- Pair<SecondaryIndexInfo, LSMIndexBulkLoadOperatorNodePushable> infoAndOp =
- nc.getBulkLoadSecondaryOperator(loadTaskCtx, dataset, KEY_TYPES, RECORD_TYPE, META_TYPE, null,
- KEY_INDEXES, KEY_INDICATORS_LIST, storageManager, secondaryIndex, RECORDS_PER_COMPONENT);
- indexLoadOp = infoAndOp.getRight();
- secondaryIndexInfo = infoAndOp.getLeft();
- actor.add(new Request(Request.Action.LOAD_OPEN));
- actor.add(new Request(Request.Action.INDEX_LOAD_PATCH));
- actor.add(new Request(Request.Action.LOAD_CLOSE));
- ensureDone(actor);
- latestPrimaryCheckpoint = primaryCheckpointManager.getLatest();
- Assert.assertEquals(latestPrimaryCheckpoint.getLastComponentId(), min);
- ILSMIndexFileManager secondaryFileManager = (ILSMIndexFileManager) fileManagerField.get(secondaryLsmBtree);
- final ResourceReference secondaryRef = ResourceReference.of(
- secondaryFileManager.getRelFlushFileReference().getInsertIndexFileReference().getAbsolutePath());
- IIndexCheckpointManager secondaryCheckpointManager = getIndexCheckpointManagerProvider().get(secondaryRef);
- IndexCheckpoint latestSecondaryCheckpoint = secondaryCheckpointManager.getLatest();
- Assert.assertEquals(latestSecondaryCheckpoint.getLastComponentId(), min);
- } catch (Throwable e) {
- e.printStackTrace();
- throw e;
- }
+ // create secondary
+ actor.add(new Request(Request.Action.INSERT_PATCH));
+ ensureDone(actor);
+ // search now and ensure partition 0 has all the records
+ StorageTestUtils.searchAndAssertCount(nc, 0, dataset, storageManager, RECORDS_PER_COMPONENT);
+ // flush
+ actor.add(new Request(Request.Action.FLUSH_DATASET));
+ ensureDone(actor);
+ // ensure primary has a component
+ Assert.assertEquals(1, primaryLsmBtree.getDiskComponents().size());
+ // ensure both checkpoint files has the same component id as the last flushed component id
+ ILSMDiskComponent primaryDiskComponent = primaryLsmBtree.getDiskComponents().get(0);
+ LSMComponentId id = (LSMComponentId) primaryDiskComponent.getId();
+ long min = id.getMinId();
+ // primary ref
+ Field fileManagerField = AbstractLSMIndex.class.getDeclaredField("fileManager"); //get private declared object from class
+ fileManagerField.setAccessible(true); //Make it accessible so you can access it
+ ILSMIndexFileManager primaryFileManager = (ILSMIndexFileManager) fileManagerField.get(primaryLsmBtree);
+ final ResourceReference primaryRef = ResourceReference
+ .of(primaryFileManager.getRelFlushFileReference().getInsertIndexFileReference().getAbsolutePath());
+ IIndexCheckpointManager primaryCheckpointManager = getIndexCheckpointManagerProvider().get(primaryRef);
+ IndexCheckpoint latestPrimaryCheckpoint = primaryCheckpointManager.getLatest();
+ Assert.assertEquals(latestPrimaryCheckpoint.getLastComponentId(), min);
+ createSecondaryIndex();
+ JobId jobId = nc.newJobId();
+ loadTaskCtx = nc.createTestContext(jobId, 0, false);
+ Pair<SecondaryIndexInfo, LSMIndexBulkLoadOperatorNodePushable> infoAndOp =
+ nc.getBulkLoadSecondaryOperator(loadTaskCtx, dataset, KEY_TYPES, RECORD_TYPE, META_TYPE, null,
+ KEY_INDEXES, KEY_INDICATORS_LIST, storageManager, secondaryIndex, RECORDS_PER_COMPONENT);
+ indexLoadOp = infoAndOp.getRight();
+ secondaryIndexInfo = infoAndOp.getLeft();
+ actor.add(new Request(Request.Action.LOAD_OPEN));
+ actor.add(new Request(Request.Action.INDEX_LOAD_PATCH));
+ actor.add(new Request(Request.Action.LOAD_CLOSE));
+ ensureDone(actor);
+ latestPrimaryCheckpoint = primaryCheckpointManager.getLatest();
+ Assert.assertEquals(latestPrimaryCheckpoint.getLastComponentId(), min);
+ ILSMIndexFileManager secondaryFileManager = (ILSMIndexFileManager) fileManagerField.get(secondaryLsmBtree);
+ final ResourceReference secondaryRef = ResourceReference
+ .of(secondaryFileManager.getRelFlushFileReference().getInsertIndexFileReference().getAbsolutePath());
+ IIndexCheckpointManager secondaryCheckpointManager = getIndexCheckpointManagerProvider().get(secondaryRef);
+ IndexCheckpoint latestSecondaryCheckpoint = secondaryCheckpointManager.getLatest();
+ Assert.assertEquals(latestSecondaryCheckpoint.getLastComponentId(), min);
+ Assert.assertTrue(latestPrimaryCheckpoint.hasNullMissingValuesFix());
+ Assert.assertTrue(latestSecondaryCheckpoint.hasNullMissingValuesFix());
}
@Test
public void testCheckpointWhenBulkloadingSecondaryAndPrimaryIsTwoComponents() throws Exception {
- try {
- // create secondary
- actor.add(new Request(Request.Action.INSERT_PATCH));
- ensureDone(actor);
- // search now and ensure partition 0 has all the records
- StorageTestUtils.searchAndAssertCount(nc, 0, dataset, storageManager, RECORDS_PER_COMPONENT);
- // flush
- actor.add(new Request(Request.Action.FLUSH_DATASET));
- ensureDone(actor);
- // ensure primary has a component
- Assert.assertEquals(1, primaryLsmBtree.getDiskComponents().size());
- // ensure both checkpoint files has the same component id as the last flushed component id
- ILSMDiskComponent primaryDiskComponent = primaryLsmBtree.getDiskComponents().get(0);
- LSMComponentId id = (LSMComponentId) primaryDiskComponent.getId();
- long min = id.getMinId();
- // primary ref
- Field fileManagerField = AbstractLSMIndex.class.getDeclaredField("fileManager"); //get private declared object from class
- fileManagerField.setAccessible(true); //Make it accessible so you can access it
- ILSMIndexFileManager primaryFileManager = (ILSMIndexFileManager) fileManagerField.get(primaryLsmBtree);
- final ResourceReference primaryRef = ResourceReference
- .of(primaryFileManager.getRelFlushFileReference().getInsertIndexFileReference().getAbsolutePath());
- IIndexCheckpointManager primaryCheckpointManager = getIndexCheckpointManagerProvider().get(primaryRef);
- IndexCheckpoint latestPrimaryCheckpoint = primaryCheckpointManager.getLatest();
- Assert.assertEquals(latestPrimaryCheckpoint.getLastComponentId(), min);
- actor.add(new Request(Request.Action.INSERT_PATCH));
- ensureDone(actor);
- actor.add(new Request(Request.Action.FLUSH_DATASET));
- ensureDone(actor);
- Assert.assertEquals(2, primaryLsmBtree.getDiskComponents().size());
- // ensure both checkpoint files has the same component id as the last flushed component id
- primaryDiskComponent = primaryLsmBtree.getDiskComponents().get(0);
- id = (LSMComponentId) primaryDiskComponent.getId();
- min = id.getMaxId();
- createSecondaryIndex();
- JobId jobId = nc.newJobId();
- loadTaskCtx = nc.createTestContext(jobId, 0, false);
- Pair<SecondaryIndexInfo, LSMIndexBulkLoadOperatorNodePushable> infoAndOp =
- nc.getBulkLoadSecondaryOperator(loadTaskCtx, dataset, KEY_TYPES, RECORD_TYPE, META_TYPE, null,
- KEY_INDEXES, KEY_INDICATORS_LIST, storageManager, secondaryIndex, RECORDS_PER_COMPONENT);
- indexLoadOp = infoAndOp.getRight();
- secondaryIndexInfo = infoAndOp.getLeft();
- actor.add(new Request(Request.Action.LOAD_OPEN));
- actor.add(new Request(Request.Action.INDEX_LOAD_PATCH));
- actor.add(new Request(Request.Action.LOAD_CLOSE));
- ensureDone(actor);
- latestPrimaryCheckpoint = primaryCheckpointManager.getLatest();
- Assert.assertEquals(latestPrimaryCheckpoint.getLastComponentId(), min);
- ILSMIndexFileManager secondaryFileManager = (ILSMIndexFileManager) fileManagerField.get(secondaryLsmBtree);
- final ResourceReference secondaryRef = ResourceReference.of(
- secondaryFileManager.getRelFlushFileReference().getInsertIndexFileReference().getAbsolutePath());
- IIndexCheckpointManager secondaryCheckpointManager = getIndexCheckpointManagerProvider().get(secondaryRef);
- IndexCheckpoint latestSecondaryCheckpoint = secondaryCheckpointManager.getLatest();
- Assert.assertEquals(latestSecondaryCheckpoint.getLastComponentId(), min);
- } catch (Throwable e) {
- e.printStackTrace();
- throw e;
- }
+ // create secondary
+ actor.add(new Request(Request.Action.INSERT_PATCH));
+ ensureDone(actor);
+ // search now and ensure partition 0 has all the records
+ StorageTestUtils.searchAndAssertCount(nc, 0, dataset, storageManager, RECORDS_PER_COMPONENT);
+ // flush
+ actor.add(new Request(Request.Action.FLUSH_DATASET));
+ ensureDone(actor);
+ // ensure primary has a component
+ Assert.assertEquals(1, primaryLsmBtree.getDiskComponents().size());
+ // ensure both checkpoint files has the same component id as the last flushed component id
+ ILSMDiskComponent primaryDiskComponent = primaryLsmBtree.getDiskComponents().get(0);
+ LSMComponentId id = (LSMComponentId) primaryDiskComponent.getId();
+ long min = id.getMinId();
+ // primary ref
+ Field fileManagerField = AbstractLSMIndex.class.getDeclaredField("fileManager"); //get private declared object from class
+ fileManagerField.setAccessible(true); //Make it accessible so you can access it
+ ILSMIndexFileManager primaryFileManager = (ILSMIndexFileManager) fileManagerField.get(primaryLsmBtree);
+ final ResourceReference primaryRef = ResourceReference
+ .of(primaryFileManager.getRelFlushFileReference().getInsertIndexFileReference().getAbsolutePath());
+ IIndexCheckpointManager primaryCheckpointManager = getIndexCheckpointManagerProvider().get(primaryRef);
+ IndexCheckpoint latestPrimaryCheckpoint = primaryCheckpointManager.getLatest();
+ Assert.assertEquals(latestPrimaryCheckpoint.getLastComponentId(), min);
+ actor.add(new Request(Request.Action.INSERT_PATCH));
+ ensureDone(actor);
+ actor.add(new Request(Request.Action.FLUSH_DATASET));
+ ensureDone(actor);
+ Assert.assertEquals(2, primaryLsmBtree.getDiskComponents().size());
+ // ensure both checkpoint files has the same component id as the last flushed component id
+ primaryDiskComponent = primaryLsmBtree.getDiskComponents().get(0);
+ id = (LSMComponentId) primaryDiskComponent.getId();
+ min = id.getMaxId();
+ createSecondaryIndex();
+ JobId jobId = nc.newJobId();
+ loadTaskCtx = nc.createTestContext(jobId, 0, false);
+ Pair<SecondaryIndexInfo, LSMIndexBulkLoadOperatorNodePushable> infoAndOp =
+ nc.getBulkLoadSecondaryOperator(loadTaskCtx, dataset, KEY_TYPES, RECORD_TYPE, META_TYPE, null,
+ KEY_INDEXES, KEY_INDICATORS_LIST, storageManager, secondaryIndex, RECORDS_PER_COMPONENT);
+ indexLoadOp = infoAndOp.getRight();
+ secondaryIndexInfo = infoAndOp.getLeft();
+ actor.add(new Request(Request.Action.LOAD_OPEN));
+ actor.add(new Request(Request.Action.INDEX_LOAD_PATCH));
+ actor.add(new Request(Request.Action.LOAD_CLOSE));
+ ensureDone(actor);
+ latestPrimaryCheckpoint = primaryCheckpointManager.getLatest();
+ Assert.assertEquals(latestPrimaryCheckpoint.getLastComponentId(), min);
+ ILSMIndexFileManager secondaryFileManager = (ILSMIndexFileManager) fileManagerField.get(secondaryLsmBtree);
+ final ResourceReference secondaryRef = ResourceReference
+ .of(secondaryFileManager.getRelFlushFileReference().getInsertIndexFileReference().getAbsolutePath());
+ IIndexCheckpointManager secondaryCheckpointManager = getIndexCheckpointManagerProvider().get(secondaryRef);
+ IndexCheckpoint latestSecondaryCheckpoint = secondaryCheckpointManager.getLatest();
+ Assert.assertEquals(latestSecondaryCheckpoint.getLastComponentId(), min);
+ Assert.assertTrue(latestPrimaryCheckpoint.hasNullMissingValuesFix());
+ Assert.assertTrue(latestSecondaryCheckpoint.hasNullMissingValuesFix());
}
@Test
public void testCheckpointWhenBulkloadedSecondaryIsEmptyAndPrimaryIsEmpty() throws Exception {
- try {
- // ensure primary has no component
- Assert.assertEquals(0, primaryLsmBtree.getDiskComponents().size());
- // primary ref
- Field fileManagerField = AbstractLSMIndex.class.getDeclaredField("fileManager"); //get private declared object from class
- fileManagerField.setAccessible(true); //Make it accessible so you can access it
- ILSMIndexFileManager primaryFileManager = (ILSMIndexFileManager) fileManagerField.get(primaryLsmBtree);
- final ResourceReference primaryRef = ResourceReference
- .of(primaryFileManager.getRelFlushFileReference().getInsertIndexFileReference().getAbsolutePath());
- IIndexCheckpointManager primaryCheckpointManager = getIndexCheckpointManagerProvider().get(primaryRef);
- IndexCheckpoint latestPrimaryCheckpoint = primaryCheckpointManager.getLatest();
- createSecondaryIndex();
- JobId jobId = nc.newJobId();
- loadTaskCtx = nc.createTestContext(jobId, 0, false);
- Pair<SecondaryIndexInfo, LSMIndexBulkLoadOperatorNodePushable> infoAndOp =
- nc.getBulkLoadSecondaryOperator(loadTaskCtx, dataset, KEY_TYPES, RECORD_TYPE, META_TYPE, null,
- KEY_INDEXES, KEY_INDICATORS_LIST, storageManager, secondaryIndex, RECORDS_PER_COMPONENT);
- indexLoadOp = infoAndOp.getRight();
- secondaryIndexInfo = infoAndOp.getLeft();
- actor.add(new Request(Request.Action.LOAD_OPEN));
- actor.add(new Request(Request.Action.LOAD_CLOSE));
- ensureDone(actor);
- latestPrimaryCheckpoint = primaryCheckpointManager.getLatest();
- ILSMIndexFileManager secondaryFileManager = (ILSMIndexFileManager) fileManagerField.get(secondaryLsmBtree);
- final ResourceReference secondaryRef = ResourceReference.of(
- secondaryFileManager.getRelFlushFileReference().getInsertIndexFileReference().getAbsolutePath());
- IIndexCheckpointManager secondaryCheckpointManager = getIndexCheckpointManagerProvider().get(secondaryRef);
- IndexCheckpoint latestSecondaryCheckpoint = secondaryCheckpointManager.getLatest();
- Assert.assertEquals(latestSecondaryCheckpoint.getLastComponentId(),
- latestPrimaryCheckpoint.getLastComponentId());
- } catch (Throwable e) {
- e.printStackTrace();
- throw e;
- }
+ // ensure primary has no component
+ Assert.assertEquals(0, primaryLsmBtree.getDiskComponents().size());
+ // primary ref
+ Field fileManagerField = AbstractLSMIndex.class.getDeclaredField("fileManager"); //get private declared object from class
+ fileManagerField.setAccessible(true); //Make it accessible so you can access it
+ ILSMIndexFileManager primaryFileManager = (ILSMIndexFileManager) fileManagerField.get(primaryLsmBtree);
+ final ResourceReference primaryRef = ResourceReference
+ .of(primaryFileManager.getRelFlushFileReference().getInsertIndexFileReference().getAbsolutePath());
+ IIndexCheckpointManager primaryCheckpointManager = getIndexCheckpointManagerProvider().get(primaryRef);
+ IndexCheckpoint latestPrimaryCheckpoint = primaryCheckpointManager.getLatest();
+ createSecondaryIndex();
+ JobId jobId = nc.newJobId();
+ loadTaskCtx = nc.createTestContext(jobId, 0, false);
+ Pair<SecondaryIndexInfo, LSMIndexBulkLoadOperatorNodePushable> infoAndOp =
+ nc.getBulkLoadSecondaryOperator(loadTaskCtx, dataset, KEY_TYPES, RECORD_TYPE, META_TYPE, null,
+ KEY_INDEXES, KEY_INDICATORS_LIST, storageManager, secondaryIndex, RECORDS_PER_COMPONENT);
+ indexLoadOp = infoAndOp.getRight();
+ secondaryIndexInfo = infoAndOp.getLeft();
+ actor.add(new Request(Request.Action.LOAD_OPEN));
+ actor.add(new Request(Request.Action.LOAD_CLOSE));
+ ensureDone(actor);
+ latestPrimaryCheckpoint = primaryCheckpointManager.getLatest();
+ ILSMIndexFileManager secondaryFileManager = (ILSMIndexFileManager) fileManagerField.get(secondaryLsmBtree);
+ final ResourceReference secondaryRef = ResourceReference
+ .of(secondaryFileManager.getRelFlushFileReference().getInsertIndexFileReference().getAbsolutePath());
+ IIndexCheckpointManager secondaryCheckpointManager = getIndexCheckpointManagerProvider().get(secondaryRef);
+ IndexCheckpoint latestSecondaryCheckpoint = secondaryCheckpointManager.getLatest();
+ Assert.assertEquals(latestSecondaryCheckpoint.getLastComponentId(),
+ latestPrimaryCheckpoint.getLastComponentId());
+ Assert.assertTrue(latestPrimaryCheckpoint.hasNullMissingValuesFix());
+ Assert.assertTrue(latestSecondaryCheckpoint.hasNullMissingValuesFix());
}
@Test
public void testCheckpointWhenBulkloadedSecondaryIsEmptyAndPrimaryIsNotEmpty() throws Exception {
- try {
- // create secondary
- actor.add(new Request(Request.Action.INSERT_PATCH));
- ensureDone(actor);
- // search now and ensure partition 0 has all the records
- StorageTestUtils.searchAndAssertCount(nc, 0, dataset, storageManager, RECORDS_PER_COMPONENT);
- // flush
- actor.add(new Request(Request.Action.FLUSH_DATASET));
- ensureDone(actor);
- // ensure primary has a component
- Assert.assertEquals(1, primaryLsmBtree.getDiskComponents().size());
- // ensure both checkpoint files has the same component id as the last flushed component id
- ILSMDiskComponent primaryDiskComponent = primaryLsmBtree.getDiskComponents().get(0);
- LSMComponentId id = (LSMComponentId) primaryDiskComponent.getId();
- long min = id.getMinId();
- // primary ref
- Field fileManagerField = AbstractLSMIndex.class.getDeclaredField("fileManager"); //get private declared object from class
- fileManagerField.setAccessible(true); //Make it accessible so you can access it
- ILSMIndexFileManager primaryFileManager = (ILSMIndexFileManager) fileManagerField.get(primaryLsmBtree);
- final ResourceReference primaryRef = ResourceReference
- .of(primaryFileManager.getRelFlushFileReference().getInsertIndexFileReference().getAbsolutePath());
- IIndexCheckpointManager primaryCheckpointManager = getIndexCheckpointManagerProvider().get(primaryRef);
- IndexCheckpoint latestPrimaryCheckpoint = primaryCheckpointManager.getLatest();
- Assert.assertEquals(latestPrimaryCheckpoint.getLastComponentId(), min);
- createSecondaryIndex();
- JobId jobId = nc.newJobId();
- loadTaskCtx = nc.createTestContext(jobId, 0, false);
- Pair<SecondaryIndexInfo, LSMIndexBulkLoadOperatorNodePushable> infoAndOp =
- nc.getBulkLoadSecondaryOperator(loadTaskCtx, dataset, KEY_TYPES, RECORD_TYPE, META_TYPE, null,
- KEY_INDEXES, KEY_INDICATORS_LIST, storageManager, secondaryIndex, RECORDS_PER_COMPONENT);
- indexLoadOp = infoAndOp.getRight();
- secondaryIndexInfo = infoAndOp.getLeft();
- actor.add(new Request(Request.Action.LOAD_OPEN));
- actor.add(new Request(Request.Action.LOAD_CLOSE));
- ensureDone(actor);
- latestPrimaryCheckpoint = primaryCheckpointManager.getLatest();
- Assert.assertEquals(latestPrimaryCheckpoint.getLastComponentId(), min);
- ILSMIndexFileManager secondaryFileManager = (ILSMIndexFileManager) fileManagerField.get(secondaryLsmBtree);
- final ResourceReference secondaryRef = ResourceReference.of(
- secondaryFileManager.getRelFlushFileReference().getInsertIndexFileReference().getAbsolutePath());
- IIndexCheckpointManager secondaryCheckpointManager = getIndexCheckpointManagerProvider().get(secondaryRef);
- IndexCheckpoint latestSecondaryCheckpoint = secondaryCheckpointManager.getLatest();
- Assert.assertEquals(latestSecondaryCheckpoint.getLastComponentId(), min);
- } catch (Throwable e) {
- e.printStackTrace();
- throw e;
- }
+ // create secondary
+ actor.add(new Request(Request.Action.INSERT_PATCH));
+ ensureDone(actor);
+ // search now and ensure partition 0 has all the records
+ StorageTestUtils.searchAndAssertCount(nc, 0, dataset, storageManager, RECORDS_PER_COMPONENT);
+ // flush
+ actor.add(new Request(Request.Action.FLUSH_DATASET));
+ ensureDone(actor);
+ // ensure primary has a component
+ Assert.assertEquals(1, primaryLsmBtree.getDiskComponents().size());
+ // ensure both checkpoint files has the same component id as the last flushed component id
+ ILSMDiskComponent primaryDiskComponent = primaryLsmBtree.getDiskComponents().get(0);
+ LSMComponentId id = (LSMComponentId) primaryDiskComponent.getId();
+ long min = id.getMinId();
+ // primary ref
+ Field fileManagerField = AbstractLSMIndex.class.getDeclaredField("fileManager"); //get private declared object from class
+ fileManagerField.setAccessible(true); //Make it accessible so you can access it
+ ILSMIndexFileManager primaryFileManager = (ILSMIndexFileManager) fileManagerField.get(primaryLsmBtree);
+ final ResourceReference primaryRef = ResourceReference
+ .of(primaryFileManager.getRelFlushFileReference().getInsertIndexFileReference().getAbsolutePath());
+ IIndexCheckpointManager primaryCheckpointManager = getIndexCheckpointManagerProvider().get(primaryRef);
+ IndexCheckpoint latestPrimaryCheckpoint = primaryCheckpointManager.getLatest();
+ Assert.assertEquals(latestPrimaryCheckpoint.getLastComponentId(), min);
+ createSecondaryIndex();
+ JobId jobId = nc.newJobId();
+ loadTaskCtx = nc.createTestContext(jobId, 0, false);
+ Pair<SecondaryIndexInfo, LSMIndexBulkLoadOperatorNodePushable> infoAndOp =
+ nc.getBulkLoadSecondaryOperator(loadTaskCtx, dataset, KEY_TYPES, RECORD_TYPE, META_TYPE, null,
+ KEY_INDEXES, KEY_INDICATORS_LIST, storageManager, secondaryIndex, RECORDS_PER_COMPONENT);
+ indexLoadOp = infoAndOp.getRight();
+ secondaryIndexInfo = infoAndOp.getLeft();
+ actor.add(new Request(Request.Action.LOAD_OPEN));
+ actor.add(new Request(Request.Action.LOAD_CLOSE));
+ ensureDone(actor);
+ latestPrimaryCheckpoint = primaryCheckpointManager.getLatest();
+ Assert.assertEquals(latestPrimaryCheckpoint.getLastComponentId(), min);
+ ILSMIndexFileManager secondaryFileManager = (ILSMIndexFileManager) fileManagerField.get(secondaryLsmBtree);
+ final ResourceReference secondaryRef = ResourceReference
+ .of(secondaryFileManager.getRelFlushFileReference().getInsertIndexFileReference().getAbsolutePath());
+ IIndexCheckpointManager secondaryCheckpointManager = getIndexCheckpointManagerProvider().get(secondaryRef);
+ IndexCheckpoint latestSecondaryCheckpoint = secondaryCheckpointManager.getLatest();
+ Assert.assertEquals(latestSecondaryCheckpoint.getLastComponentId(), min);
+ Assert.assertTrue(latestPrimaryCheckpoint.hasNullMissingValuesFix());
+ Assert.assertTrue(latestSecondaryCheckpoint.hasNullMissingValuesFix());
}
protected IIndexCheckpointManagerProvider getIndexCheckpointManagerProvider() {
@@ -566,9 +559,6 @@
default:
break;
}
- } catch (Throwable th) {
- th.printStackTrace();
- throw th;
} finally {
req.complete();
}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IndexCheckpoint.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IndexCheckpoint.java
index cb34600..3d0b9cb 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IndexCheckpoint.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IndexCheckpoint.java
@@ -27,14 +27,18 @@
import org.apache.logging.log4j.Logger;
import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
+@JsonIgnoreProperties(ignoreUnknown = true)
public class IndexCheckpoint {
private static final Logger LOGGER = LogManager.getLogger();
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
private static final long INITIAL_CHECKPOINT_ID = 0;
+ // TODO(mblow): remove this marker & related logic once we no longer are able to read indexes prior to the fix
+ private static final long HAS_NULL_MISSING_VALUES_FIX = -1;
private long id;
private long validComponentSequence;
private long lowWatermark;
@@ -48,6 +52,7 @@
firstCheckpoint.validComponentSequence = lastComponentSequence;
firstCheckpoint.lastComponentId = validComponentId;
firstCheckpoint.masterNodeFlushMap = new HashMap<>();
+ firstCheckpoint.masterNodeFlushMap.put(HAS_NULL_MISSING_VALUES_FIX, HAS_NULL_MISSING_VALUES_FIX);
return firstCheckpoint;
}
@@ -66,7 +71,7 @@
next.validComponentSequence = validComponentSequence;
next.masterNodeFlushMap = latest.getMasterNodeFlushMap();
// remove any lsn from the map that wont be used anymore
- next.masterNodeFlushMap.values().removeIf(lsn -> lsn <= lowWatermark);
+ next.masterNodeFlushMap.values().removeIf(lsn -> lsn <= lowWatermark && lsn != HAS_NULL_MISSING_VALUES_FIX);
return next;
}
@@ -94,6 +99,10 @@
return id;
}
+ public boolean hasNullMissingValuesFix() {
+ return masterNodeFlushMap.containsKey(HAS_NULL_MISSING_VALUES_FIX);
+ }
+
public String asJson() throws HyracksDataException {
try {
return OBJECT_MAPPER.writeValueAsString(this);