Merge commit 'fa6ce67' from stabilization-f69489
Change-Id: I01e1810f1c44d9ecaf569dd22577c33e8702c682
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetLifecycleManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetLifecycleManager.java
index 954c209..10eddb1 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetLifecycleManager.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetLifecycleManager.java
@@ -135,4 +135,12 @@
* @throws HyracksDataException
*/
void flushDataset(IReplicationStrategy replicationStrategy) throws HyracksDataException;
+
+ /**
+ * Waits for all ongoing IO operations on all open datasets that are matching {@code replicationStrategy}.
+ *
+ * @param replicationStrategy
+ * @throws HyracksDataException
+ */
+ void waitForIO(IReplicationStrategy replicationStrategy) throws HyracksDataException;
}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
index 5cde36c..d396d9b 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
@@ -594,6 +594,15 @@
}
}
+ @Override
+ public void waitForIO(IReplicationStrategy replicationStrategy) throws HyracksDataException {
+ for (DatasetResource dsr : datasets.values()) {
+ if (dsr.isOpen() && replicationStrategy.isMatch(dsr.getDatasetID())) {
+ dsr.getDatasetInfo().waitForIO();
+ }
+ }
+ }
+
private void closeIndex(IndexInfo indexInfo) throws HyracksDataException {
if (indexInfo.isOpen()) {
ILSMOperationTracker opTracker = indexInfo.getIndex().getOperationTracker();
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaSynchronizer.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaSynchronizer.java
index 09f1205..0f0b5bd 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaSynchronizer.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaSynchronizer.java
@@ -53,10 +53,12 @@
private void syncFiles() throws IOException {
final ReplicaFilesSynchronizer fileSync = new ReplicaFilesSynchronizer(appCtx, replica);
+ waitForReplicatedDatasetsIO();
fileSync.sync();
// flush replicated dataset to generate disk component for any remaining in-memory components
final IReplicationStrategy replStrategy = appCtx.getReplicationManager().getReplicationStrategy();
appCtx.getDatasetLifecycleManager().flushDataset(replStrategy);
+ waitForReplicatedDatasetsIO();
// sync any newly generated files
fileSync.sync();
}
@@ -75,4 +77,10 @@
(PersistentLocalResourceRepository) appCtx.getLocalResourceRepository();
return localResourceRepository.getReplicatedIndexesMaxComponentId(partition, replStrategy);
}
+
+ private void waitForReplicatedDatasetsIO() throws HyracksDataException {
+ // wait for IO operations to ensure replicated datasets files won't change during replica sync
+ final IReplicationStrategy replStrategy = appCtx.getReplicationManager().getReplicationStrategy();
+ appCtx.getDatasetLifecycleManager().waitForIO(replStrategy);
+ }
}
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/io/FileReference.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/io/FileReference.java
index 8b93d07..20e9ff6 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/io/FileReference.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/io/FileReference.java
@@ -58,12 +58,12 @@
if (!(o instanceof FileReference)) {
return false;
}
- return path.equals(((FileReference) o).path) && dev.equals(((FileReference) o).dev);
+ return file.getAbsolutePath().equals(((FileReference) o).getAbsolutePath());
}
@Override
public int hashCode() {
- return path.hashCode();
+ return file.getAbsolutePath().hashCode();
}
/**