[NO ISSUE][REP] Send correct component ID on bulkload
- user model changes: no
- storage format changes: no
- interface changes: yes
Details:
- When a secondary index is created, send the correct component id
to the replica.
- Update the component id on replicas when receiving a bulkloaded
component.
Change-Id: I5575cf9a61477636efc7e1291189a59e0a5266ae
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/14103
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
Reviewed-by: Ali Alsuliman <ali.al.solaiman@gmail.com>
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/IndexCheckpointManager.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/IndexCheckpointManager.java
index 2d40ec8..96e89a7 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/IndexCheckpointManager.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/IndexCheckpointManager.java
@@ -169,11 +169,12 @@
}
@Override
- public synchronized void advanceValidComponentSequence(long componentSequence) throws HyracksDataException {
+ public synchronized void advanceValidComponent(long componentSequence, long componentId)
+ throws HyracksDataException {
final IndexCheckpoint latest = getLatest();
if (componentSequence > latest.getValidComponentSequence()) {
- final IndexCheckpoint next = IndexCheckpoint.next(latest, latest.getLowWatermark(), componentSequence,
- latest.getLastComponentId(), null);
+ final IndexCheckpoint next =
+ IndexCheckpoint.next(latest, latest.getLowWatermark(), componentSequence, componentId, null);
persist(next);
}
}
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/storage/PersistentLocalResourceRepositoryTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/storage/PersistentLocalResourceRepositoryTest.java
index 50118fc..59efef4 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/storage/PersistentLocalResourceRepositoryTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/storage/PersistentLocalResourceRepositoryTest.java
@@ -189,7 +189,7 @@
DatasetResourceReference drr = DatasetResourceReference.of(localResource);
IIndexCheckpointManagerProvider indexCheckpointManagerProvider = ncAppCtx.getIndexCheckpointManagerProvider();
IIndexCheckpointManager indexCheckpointManager = indexCheckpointManagerProvider.get(drr);
- indexCheckpointManager.advanceValidComponentSequence(validComponentSequence);
+ indexCheckpointManager.advanceValidComponent(validComponentSequence, 1);
// create components to be merged
String btree = "_b";
String filter = "_f";
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IIndexCheckpointManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IIndexCheckpointManager.java
index f9230dd..beb8e07 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IIndexCheckpointManager.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IIndexCheckpointManager.java
@@ -125,12 +125,13 @@
IndexCheckpoint getLatest() throws HyracksDataException;
/**
- * Advance the last valid component sequence. Used for replicated bulkloaded components
+ * Advance the last valid component. Used for replicated bulkloaded components
*
* @param componentSequence
+ * @param componentId
* @throws HyracksDataException
*/
- void advanceValidComponentSequence(long componentSequence) throws HyracksDataException;
+ void advanceValidComponent(long componentSequence, long componentId) throws HyracksDataException;
/**
* Set the last component id. Used during recovery or after component delete
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/MarkComponentValidTask.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/MarkComponentValidTask.java
index c92a527..1ea076d 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/MarkComponentValidTask.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/MarkComponentValidTask.java
@@ -77,7 +77,7 @@
final IIndexCheckpointManagerProvider checkpointManagerProvider = appCtx.getIndexCheckpointManagerProvider();
final IIndexCheckpointManager indexCheckpointManager = checkpointManagerProvider.get(indexRef);
final long componentSequence = IndexComponentFileReference.of(indexRef.getName()).getSequenceEnd();
- indexCheckpointManager.advanceValidComponentSequence(componentSequence);
+ indexCheckpointManager.advanceValidComponent(componentSequence, lastComponentId);
}
private void ensureComponentLsnFlushed(INcApplicationContext appCtx)
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/IndexSynchronizer.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/IndexSynchronizer.java
index 48ad5a5..00e63ec 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/IndexSynchronizer.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/IndexSynchronizer.java
@@ -141,11 +141,12 @@
private long getReplicatedComponentId() throws HyracksDataException {
final ILSMIndexReplicationJob indexReplJob = (ILSMIndexReplicationJob) job;
- if (indexReplJob.getLSMOpType() != LSMOperationType.FLUSH) {
+ if (indexReplJob.getLSMOpType() != LSMOperationType.FLUSH
+ && indexReplJob.getLSMOpType() != LSMOperationType.LOAD) {
return -1L;
}
final ILSMIndexOperationContext ctx = indexReplJob.getLSMIndexOperationContext();
LSMComponentId id = (LSMComponentId) ctx.getComponentsToBeReplicated().get(0).getId();
- return id.getMinId();
+ return id.getMaxId();
}
}