[NO ISSUE][REP] Pass NC active partitions from CC
- user model changes: no
- storage format changes: no
- interface changes: yes
Details:
- Pass NC active partitions from CC and set them
during node bootstrap.
- Move local storage clean up to LocalStorageCleanupTask
to clean up after all node partitions have been set.
- Maintain last LSN in master flush LSN to account for
cases where the first LSN received from master is the
same as the low water mark LSN.
- Reduce replica failure logging at error level.
Change-Id: I3782bb2be61f8a57ac45dd6dd6ae0942e83ddc40
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/12903
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
Reviewed-by: Ali Alsuliman <ali.al.solaiman@gmail.com>
Tested-by: Murtadha Hubail <mhubail@apache.org>
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/ReplicaManager.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/ReplicaManager.java
index 1d901bd..66a69ef 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/ReplicaManager.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/ReplicaManager.java
@@ -112,10 +112,17 @@
}
@Override
+ public synchronized void setActivePartitions(Set<Integer> activePartitions) {
+ partitions.clear();
+ partitions.addAll(activePartitions);
+ }
+
+ @Override
public synchronized void promote(int partition) throws HyracksDataException {
if (partitions.contains(partition)) {
return;
}
+ LOGGER.warn("promoting partition {}", partition);
final PersistentLocalResourceRepository localResourceRepository =
(PersistentLocalResourceRepository) appCtx.getLocalResourceRepository();
localResourceRepository.cleanup(partition);
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/LocalStorageCleanupTask.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/LocalStorageCleanupTask.java
index 5471fb8..a926c66 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/LocalStorageCleanupTask.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/LocalStorageCleanupTask.java
@@ -18,6 +18,8 @@
*/
package org.apache.asterix.app.nc.task;
+import java.util.Set;
+
import org.apache.asterix.common.api.INCLifecycleTask;
import org.apache.asterix.common.api.INcApplicationContext;
import org.apache.asterix.common.dataflow.DatasetLocalResource;
@@ -41,6 +43,17 @@
INcApplicationContext appContext = (INcApplicationContext) cs.getApplicationContext();
PersistentLocalResourceRepository localResourceRepository =
(PersistentLocalResourceRepository) appContext.getLocalResourceRepository();
+ deleteInvalidMetadataIndexes(localResourceRepository);
+ final Set<Integer> nodePartitions = appContext.getReplicaManager().getPartitions();
+ localResourceRepository.deleteCorruptedResources();
+ //TODO optimize this to cleanup all active partitions at once
+ for (Integer partition : nodePartitions) {
+ localResourceRepository.cleanup(partition);
+ }
+ }
+
+ private void deleteInvalidMetadataIndexes(PersistentLocalResourceRepository localResourceRepository)
+ throws HyracksDataException {
localResourceRepository.deleteInvalidIndexes(r -> {
DatasetLocalResource lr = (DatasetLocalResource) r.getResource();
return MetadataIndexImmutableProperties.isMetadataDataset(lr.getDatasetId())
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/UpdateNodeStatusTask.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/UpdateNodeStatusTask.java
index a1e11c2..fe579ad 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/UpdateNodeStatusTask.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/UpdateNodeStatusTask.java
@@ -18,25 +18,45 @@
*/
package org.apache.asterix.app.nc.task;
+import java.util.Set;
+
import org.apache.asterix.common.api.INCLifecycleTask;
+import org.apache.asterix.common.api.INcApplicationContext;
+import org.apache.asterix.common.storage.IReplicaManager;
import org.apache.hyracks.api.client.NodeStatus;
import org.apache.hyracks.api.control.CcId;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.service.IControllerService;
import org.apache.hyracks.control.nc.NodeControllerService;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
public class UpdateNodeStatusTask implements INCLifecycleTask {
- private static final long serialVersionUID = 1L;
+ private static final Logger LOGGER = LogManager.getLogger();
+ private static final long serialVersionUID = 2L;
private final NodeStatus status;
+ private Set<Integer> activePartitions;
- public UpdateNodeStatusTask(NodeStatus status) {
+ public UpdateNodeStatusTask(NodeStatus status, Set<Integer> activePartitions) {
this.status = status;
+ this.activePartitions = activePartitions;
}
@Override
- public void perform(CcId ccId, IControllerService cs) {
+ public void perform(CcId ccId, IControllerService cs) throws HyracksDataException {
NodeControllerService ncs = (NodeControllerService) cs;
ncs.setNodeStatus(status);
+ if (status != NodeStatus.ACTIVE) {
+ updateNodeActivePartitions(cs);
+ }
+ }
+
+ private void updateNodeActivePartitions(IControllerService cs) {
+ INcApplicationContext appCtx = (INcApplicationContext) cs.getApplicationContext();
+ IReplicaManager replicaManager = appCtx.getReplicaManager();
+ LOGGER.info("updating node active partitions to {}", activePartitions);
+ replicaManager.setActivePartitions(activePartitions);
}
@Override
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/NcLifecycleCoordinator.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/NcLifecycleCoordinator.java
index 97e1a56..22a0a84 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/NcLifecycleCoordinator.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/NcLifecycleCoordinator.java
@@ -213,12 +213,13 @@
protected List<INCLifecycleTask> buildIdleNcRegTasks(String newNodeId, boolean metadataNode, SystemState state,
Set<Integer> activePartitions) {
final List<INCLifecycleTask> tasks = new ArrayList<>();
- tasks.add(new UpdateNodeStatusTask(NodeStatus.BOOTING));
+ Set<Integer> nodeActivePartitions = getNodeActivePartitions(newNodeId, activePartitions, metadataNode);
+ tasks.add(new UpdateNodeStatusTask(NodeStatus.BOOTING, nodeActivePartitions));
int metadataPartitionId = clusterManager.getMetadataPartition().getPartitionId();
tasks.add(new LocalStorageCleanupTask(metadataPartitionId));
if (state == SystemState.CORRUPTED) {
// need to perform local recovery for node active partitions
- LocalRecoveryTask rt = new LocalRecoveryTask(activePartitions);
+ LocalRecoveryTask rt = new LocalRecoveryTask(nodeActivePartitions);
tasks.add(rt);
}
if (replicationEnabled) {
@@ -243,7 +244,7 @@
tasks.add(new ExportMetadataNodeTask(true));
tasks.add(new BindMetadataNodeTask());
}
- tasks.add(new UpdateNodeStatusTask(NodeStatus.ACTIVE));
+ tasks.add(new UpdateNodeStatusTask(NodeStatus.ACTIVE, nodeActivePartitions));
return tasks;
}
@@ -300,6 +301,13 @@
return true;
}
+ protected Set<Integer> getNodeActivePartitions(String nodeId, Set<Integer> nodePartitions, boolean metadataNode) {
+ if (metadataNode) {
+ nodePartitions.add(clusterManager.getMetadataPartition().getPartitionId());
+ }
+ return nodePartitions;
+ }
+
private void notifyFailedReplica(IClusterStateManager clusterManager, String nodeID,
InetSocketAddress replicaAddress) {
LOGGER.info("notify replica failure of nodeId {} at {}", nodeID, replicaAddress);
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/RegistrationTasksResponseMessage.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/RegistrationTasksResponseMessage.java
index d8e1894..b3546cc 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/RegistrationTasksResponseMessage.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/RegistrationTasksResponseMessage.java
@@ -73,7 +73,7 @@
}
NcLocalCounters localCounter = success ? NcLocalCounters.collect(getCcId(),
(NodeControllerService) appCtx.getServiceContext().getControllerService()) : null;
- Set<Integer> nodeActivePartitions = appCtx.getMetadataProperties().getNodeActivePartitions(nodeId);
+ Set<Integer> nodeActivePartitions = appCtx.getReplicaManager().getPartitions();
NCLifecycleTaskReportMessage result =
new NCLifecycleTaskReportMessage(nodeId, success, localCounter, nodeActivePartitions);
result.setException(exception);
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
index 18e8e65..be1cc7c 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
@@ -35,7 +35,6 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.Set;
import org.apache.asterix.algebra.base.ILangExtension;
import org.apache.asterix.api.http.server.BasicAuthServlet;
@@ -82,7 +81,6 @@
import org.apache.asterix.compiler.provider.ILangCompilationProvider;
import org.apache.asterix.messaging.MessagingChannelInterfaceFactory;
import org.apache.asterix.messaging.NCMessageBroker;
-import org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository;
import org.apache.asterix.translator.Receptionist;
import org.apache.asterix.util.MetadataBuiltinFunctions;
import org.apache.asterix.utils.RedactionUtil;
@@ -319,16 +317,7 @@
}
private void performLocalCleanUp() throws HyracksDataException {
- //Delete working area files from failed jobs
runtimeContext.getIoManager().deleteWorkspaceFiles();
- // Reclaim storage for orphaned index artifacts in NCs.
- final Set<Integer> nodePartitions = runtimeContext.getReplicaManager().getPartitions();
- final PersistentLocalResourceRepository localResourceRepository =
- (PersistentLocalResourceRepository) runtimeContext.getLocalResourceRepository();
- localResourceRepository.deleteCorruptedResources();
- for (Integer partition : nodePartitions) {
- localResourceRepository.cleanup(partition);
- }
}
private void updateOnNodeJoin() {
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java
index 806a6d4..ec05702 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java
@@ -135,7 +135,7 @@
for (ILSMIndex lsmIndex : indexes) {
if (lsmIndex.isPrimaryIndex()) {
if (lsmIndex.isCurrentMutableComponentEmpty()) {
- LOGGER.info("Primary index on dataset {} and partition {} is empty... skipping flush",
+ LOGGER.debug("Primary index on dataset {} and partition {} is empty... skipping flush",
dsInfo.getDatasetID(), partition);
return;
}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IReplicaManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IReplicaManager.java
index 9630303..4f46227 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IReplicaManager.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IReplicaManager.java
@@ -57,6 +57,13 @@
Set<Integer> getPartitions();
/**
+ * Sets the node active partitions
+ *
+ * @param activePartitions
+ */
+ void setActivePartitions(Set<Integer> activePartitions);
+
+ /**
* Promotes a partition by making this node its master replica
*
* @param partition
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IndexCheckpoint.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IndexCheckpoint.java
index 832f3e9..24b9ae69 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IndexCheckpoint.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IndexCheckpoint.java
@@ -71,7 +71,7 @@
next.validComponentSequence = validComponentSequence;
next.masterNodeFlushMap = latest.getMasterNodeFlushMap();
// remove any lsn from the map that wont be used anymore
- next.masterNodeFlushMap.values().removeIf(lsn -> lsn <= lowWatermark && lsn != HAS_NULL_MISSING_VALUES_FIX);
+ next.masterNodeFlushMap.values().removeIf(lsn -> lsn < lowWatermark && lsn != HAS_NULL_MISSING_VALUES_FIX);
return next;
}
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/api/PartitionReplica.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/api/PartitionReplica.java
index 6b97306..282d475 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/api/PartitionReplica.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/api/PartitionReplica.java
@@ -77,13 +77,17 @@
}
public synchronized void sync() {
+ sync(true);
+ }
+
+ public synchronized void sync(boolean register) {
if (status == IN_SYNC || status == CATCHING_UP) {
return;
}
setStatus(CATCHING_UP);
appCtx.getThreadExecutor().execute(() -> {
try {
- new ReplicaSynchronizer(appCtx, this).sync();
+ new ReplicaSynchronizer(appCtx, this).sync(register);
setStatus(IN_SYNC);
} catch (Exception e) {
LOGGER.error(() -> "Failed to sync replica " + this, e);
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/logging/RemoteLogsNotifier.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/logging/RemoteLogsNotifier.java
index 440f8ef..57463cb 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/logging/RemoteLogsNotifier.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/logging/RemoteLogsNotifier.java
@@ -70,7 +70,7 @@
+ System.lineSeparator()).getBytes());
break;
case LogType.FLUSH:
- checkpointReplicaIndexes(logRecord, logRecord.getDatasetId());
+ checkpointReplicaIndexes(logRecord, logRecord.getDatasetId(), logRecord.getResourcePartition());
break;
default:
throw new IllegalStateException("Unexpected log type: " + logRecord.getLogType());
@@ -83,11 +83,13 @@
}
}
- private void checkpointReplicaIndexes(RemoteLogRecord remoteLogMapping, int datasetId) throws HyracksDataException {
+ private void checkpointReplicaIndexes(RemoteLogRecord remoteLogMapping, int datasetId, int resourcePartition)
+ throws HyracksDataException {
final Set<Integer> masterPartitions = appCtx.getReplicaManager().getPartitions();
final Predicate<LocalResource> replicaIndexesPredicate = lr -> {
DatasetLocalResource dls = (DatasetLocalResource) lr.getResource();
- return dls.getDatasetId() == datasetId && !masterPartitions.contains(dls.getPartition());
+ return dls.getDatasetId() == datasetId && dls.getPartition() == resourcePartition
+ && !masterPartitions.contains(dls.getPartition());
};
final Map<Long, LocalResource> resources = localResourceRep.getResources(replicaIndexesPredicate);
final List<DatasetResourceReference> replicaIndexesRef =
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/IndexReplicationManager.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/IndexReplicationManager.java
index dd953c4..48eb8e3 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/IndexReplicationManager.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/IndexReplicationManager.java
@@ -83,8 +83,9 @@
if (failedDest.contains(dest)) {
return;
}
- LOGGER.error("Replica failed", e);
+ LOGGER.debug("Replica failed", e);
if (destinations.contains(dest)) {
+ LOGGER.error("replica at {} failed", dest);
failedDest.add(dest);
}
replicationManager.notifyFailure(dest, e);
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/LogReplicationManager.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/LogReplicationManager.java
index 6a23ae6..f1d8d4d 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/LogReplicationManager.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/LogReplicationManager.java
@@ -215,11 +215,15 @@
if (failedSockets.contains(replicaSocket)) {
return;
}
- LOGGER.error("Replica failed", e);
+ LOGGER.debug("Replica failed", e);
failedSockets.add(replicaSocket);
Optional<ReplicationDestination> socketDest = destinations.entrySet().stream()
.filter(entry -> entry.getValue().equals(replicaSocket)).map(Map.Entry::getKey).findFirst();
- socketDest.ifPresent(dest -> replicationManager.notifyFailure(dest, e));
+ if (socketDest.isPresent()) {
+ ReplicationDestination dest = socketDest.get();
+ LOGGER.error("replica at {} failed", dest);
+ replicationManager.notifyFailure(dest, e);
+ }
}
private class TxnAckListener implements Runnable {
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaSynchronizer.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaSynchronizer.java
index 261236c..3209a98 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaSynchronizer.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaSynchronizer.java
@@ -43,7 +43,7 @@
this.replica = replica;
}
- public void sync() throws IOException {
+ public void sync(boolean register) throws IOException {
synchronized (appCtx.getReplicaManager().getReplicaSyncLock()) {
final ICheckpointManager checkpointManager = appCtx.getTransactionSubsystem().getCheckpointManager();
try {
@@ -51,7 +51,9 @@
checkpointManager.suspend();
syncFiles();
checkpointReplicaIndexes();
- appCtx.getReplicationManager().register(replica);
+ if (register) {
+ appCtx.getReplicationManager().register(replica);
+ }
} finally {
checkpointManager.resume();
}