[NO ISSUE][REP] Add replica sync progress
- user model changes: no
- storage format changes: no
- interface changes: yes
Details:
- Add replica sync progress based on the replica missing
files.
- Add replica last progress timestamp that can be used
to determine replica progress inactivity.
Change-Id: Iab2cd7e745c4150e2d0aef3af864ec0f66dd96e7
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/13063
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
Reviewed-by: Ali Alsuliman <ali.al.solaiman@gmail.com>
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/IPartitionReplica.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/IPartitionReplica.java
index 761b2c6..f311655 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/IPartitionReplica.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/IPartitionReplica.java
@@ -51,4 +51,18 @@
* @param failure
*/
void notifyFailure(Exception failure);
+
+ /**
+ * Gets the current sync progress
+ *
+ * @return the current sync progress
+ */
+ double getSyncProgress();
+
+ /**
+ * Gets the last progress time of this replica based on System.nanoTime
+ *
+ * @return the last progress time
+ */
+ long getLastProgressTime();
}
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/api/PartitionReplica.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/api/PartitionReplica.java
index 282d475..e265d03 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/api/PartitionReplica.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/api/PartitionReplica.java
@@ -52,6 +52,8 @@
private static final int INITIAL_BUFFER_SIZE = StorageUtil.getIntSizeInBytes(4, StorageUtil.StorageUnit.KILOBYTE);
private final INcApplicationContext appCtx;
private final ReplicaIdentifier id;
+ private double syncProgress = -1;
+ private long lastProgressTime = -1;
private ByteBuffer reusbaleBuf;
private PartitionReplicaStatus status = DISCONNECTED;
private ISocketChannel sc;
@@ -133,6 +135,16 @@
return reusbaleBuf;
}
+ public synchronized void setSyncProgress(double syncProgress) {
+ this.syncProgress = syncProgress;
+ lastProgressTime = System.nanoTime();
+ }
+
+ @Override
+ public synchronized double getSyncProgress() {
+ return syncProgress;
+ }
+
private JsonNode asJson() {
ObjectNode json = OBJECT_MAPPER.createObjectNode();
json.put("id", id.toString());
@@ -153,6 +165,19 @@
}
@Override
+ public synchronized long getLastProgressTime() {
+ switch (status) {
+ case IN_SYNC:
+ return System.nanoTime();
+ case CATCHING_UP:
+ return lastProgressTime;
+ case DISCONNECTED:
+ return -1;
+ }
+ return -1;
+ }
+
+ @Override
public int hashCode() {
return id.hashCode();
}
@@ -172,6 +197,17 @@
}
LOGGER.info(() -> "Replica " + this + " status changing: " + this.status + " -> " + status);
this.status = status;
+ switch (status) {
+ case IN_SYNC:
+ syncProgress = 1;
+ break;
+ case CATCHING_UP:
+ lastProgressTime = System.nanoTime();
+ break;
+ case DISCONNECTED:
+ syncProgress = -1;
+ break;
+ }
}
private void sendGoodBye() {
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaFilesSynchronizer.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaFilesSynchronizer.java
index 3c93d17..faf3f54 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaFilesSynchronizer.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaFilesSynchronizer.java
@@ -105,7 +105,12 @@
final FileSynchronizer sync = new FileSynchronizer(appCtx, replica);
// sort files to ensure index metadata files starting with "." are replicated first
files.sort(String::compareTo);
- files.forEach(sync::replicate);
+ int missingFilesCount = files.size();
+ for (int i = 0; i < missingFilesCount; i++) {
+ String file = files.get(i);
+ sync.replicate(file);
+ replica.setSyncProgress((i + 1d) / missingFilesCount);
+ }
}
private void deleteInvalidFiles(List<String> files) {