Introduce Strategy Based Replication and Fault-Tolerance
This change includes the following:
- Introduce new APIs for Replication and Fault-Tolerance Strategies.
- Add configuration in cluster description file for high-availability.
- Add built-in replication strategies (Metadata_Only, Chained_Declustering)
- Add built-in fault-tolerance strategies (Auto, Metadata_Node)
- Remove none cluster state functionality from ClusterStateManager.
- Add customizable NC startup sequence
Change-Id: I1d1012f5541ce786f127866efefb9f3db434fedd
Reviewed-on: https://asterix-gerrit.ics.uci.edu/1405
Sonar-Qube: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: abdullah alamoudi <bamousaa@gmail.com>
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java
index 83233f1..54804a1 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java
@@ -18,7 +18,23 @@
*/
package org.apache.asterix.api.common;
-import static org.apache.asterix.api.common.AsterixHyracksIntegrationUtil.LoggerHolder.LOGGER;
+import org.apache.asterix.common.api.IClusterManagementWork.ClusterState;
+import org.apache.asterix.common.config.GlobalConfig;
+import org.apache.asterix.common.config.PropertiesAccessor;
+import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.hyracks.bootstrap.CCApplicationEntryPoint;
+import org.apache.asterix.hyracks.bootstrap.NCApplicationEntryPoint;
+import org.apache.asterix.runtime.utils.ClusterStateManager;
+import org.apache.commons.io.FileUtils;
+import org.apache.hyracks.api.client.HyracksConnection;
+import org.apache.hyracks.api.client.IHyracksClientConnection;
+import org.apache.hyracks.api.job.JobFlag;
+import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.api.job.JobSpecification;
+import org.apache.hyracks.control.cc.ClusterControllerService;
+import org.apache.hyracks.control.common.controllers.CCConfig;
+import org.apache.hyracks.control.common.controllers.NCConfig;
+import org.apache.hyracks.control.nc.NodeControllerService;
import java.io.File;
import java.io.IOException;
@@ -30,21 +46,7 @@
import java.util.logging.Level;
import java.util.logging.Logger;
-import org.apache.asterix.common.config.PropertiesAccessor;
-import org.apache.asterix.common.config.GlobalConfig;
-import org.apache.asterix.common.exceptions.AsterixException;
-import org.apache.asterix.hyracks.bootstrap.CCApplicationEntryPoint;
-import org.apache.asterix.hyracks.bootstrap.NCApplicationEntryPoint;
-import org.apache.commons.io.FileUtils;
-import org.apache.hyracks.api.client.HyracksConnection;
-import org.apache.hyracks.api.client.IHyracksClientConnection;
-import org.apache.hyracks.api.job.JobFlag;
-import org.apache.hyracks.api.job.JobId;
-import org.apache.hyracks.api.job.JobSpecification;
-import org.apache.hyracks.control.cc.ClusterControllerService;
-import org.apache.hyracks.control.common.controllers.CCConfig;
-import org.apache.hyracks.control.common.controllers.NCConfig;
-import org.apache.hyracks.control.nc.NodeControllerService;
+import static org.apache.asterix.api.common.AsterixHyracksIntegrationUtil.LoggerHolder.LOGGER;
public class AsterixHyracksIntegrationUtil {
static class LoggerHolder {
@@ -100,6 +102,12 @@
for (Thread thread : startupThreads) {
thread.join();
}
+ // Wait until cluster becomes active
+ synchronized (ClusterStateManager.INSTANCE) {
+ while (ClusterStateManager.INSTANCE.getState() != ClusterState.ACTIVE) {
+ ClusterStateManager.INSTANCE.wait();
+ }
+ }
hcc = new HyracksConnection(cc.getConfig().clientNetIpAddress, cc.getConfig().clientNetPort);
ncs = nodeControllers.toArray(new NodeControllerService[nodeControllers.size()]);
}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
index b114e8c..b4b7a95 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
@@ -35,7 +35,6 @@
import org.apache.asterix.common.cluster.ClusterPartition;
import org.apache.asterix.common.config.AsterixExtension;
import org.apache.asterix.common.config.BuildProperties;
-import org.apache.asterix.common.config.ClusterProperties;
import org.apache.asterix.common.config.CompilerProperties;
import org.apache.asterix.common.config.ExtensionProperties;
import org.apache.asterix.common.config.ExternalProperties;
@@ -206,7 +205,7 @@
activeManager = new ActiveManager(threadExecutor, ncApplicationContext.getNodeId(),
feedProperties.getMemoryComponentGlobalBudget(), compilerProperties.getFrameSize());
- if (ClusterProperties.INSTANCE.isReplicationEnabled()) {
+ if (replicationProperties.isParticipant(ncApplicationContext.getNodeId())) {
String nodeId = ncApplicationContext.getNodeId();
replicaResourcesManager = new ReplicaResourcesManager(localResourceRepository, metadataProperties);
@@ -225,10 +224,8 @@
* add the partitions that will be replicated in this node as inactive partitions
*/
//get nodes which replicate to this node
- Set<String> replicationClients = replicationProperties.getNodeReplicationClients(nodeId);
- //remove the node itself
- replicationClients.remove(nodeId);
- for (String clientId : replicationClients) {
+ Set<String> remotePrimaryReplicas = replicationProperties.getRemotePrimaryReplicasIds(nodeId);
+ for (String clientId : remotePrimaryReplicas) {
//get the partitions of each client
ClusterPartition[] clientPartitions = metadataProperties.getNodePartitions().get(clientId);
for (ClusterPartition partition : clientPartitions) {
@@ -475,5 +472,4 @@
public IStorageComponentProvider getStorageComponentProvider() {
return componentProvider;
}
-
}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java
index 4edf991..2efb139 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java
@@ -43,7 +43,8 @@
import java.util.logging.Logger;
import org.apache.asterix.common.api.IDatasetLifecycleManager;
-import org.apache.asterix.common.config.ClusterProperties;
+import org.apache.asterix.common.config.IPropertiesProvider;
+import org.apache.asterix.common.config.ReplicationProperties;
import org.apache.asterix.common.exceptions.ACIDException;
import org.apache.asterix.common.ioopcallbacks.AbstractLSMIOOperationCallback;
import org.apache.asterix.common.replication.IReplicaResourcesManager;
@@ -97,9 +98,11 @@
this.appCtx = appCtx;
this.txnSubsystem = txnSubsystem;
logMgr = (LogManager) txnSubsystem.getLogManager();
- replicationEnabled = ClusterProperties.INSTANCE.isReplicationEnabled();
- localResourceRepository = (PersistentLocalResourceRepository) txnSubsystem
- .getAsterixAppRuntimeContextProvider().getLocalResourceRepository();
+ ReplicationProperties repProperties = ((IPropertiesProvider) txnSubsystem.getAsterixAppRuntimeContextProvider()
+ .getAppContext()).getReplicationProperties();
+ replicationEnabled = repProperties.isParticipant(txnSubsystem.getId());
+ localResourceRepository = (PersistentLocalResourceRepository) txnSubsystem.getAsterixAppRuntimeContextProvider()
+ .getLocalResourceRepository();
cachedEntityCommitsPerJobSize = txnSubsystem.getTransactionProperties().getJobRecoveryMemorySize();
checkpointManager = txnSubsystem.getCheckpointManager();
}
@@ -127,15 +130,15 @@
}
if (replicationEnabled) {
- if (checkpointObject.getMinMCTFirstLsn() == AbstractCheckpointManager.SHARP_CHECKPOINT_LSN
- || (checkpointObject.getCheckpointLsn() == logMgr.getAppendLSN() && checkpointObject.isSharp())) {
- //no logs exist or only remote logs exist
+ if (checkpointObject.getMinMCTFirstLsn() == AbstractCheckpointManager.SHARP_CHECKPOINT_LSN) {
+ //no logs exist
state = SystemState.HEALTHY;
- return state;
+ } else if (checkpointObject.getCheckpointLsn() == logMgr.getAppendLSN() && checkpointObject.isSharp()) {
+ //only remote logs exist
+ state = SystemState.HEALTHY;
} else {
//need to perform remote recovery
state = SystemState.CORRUPTED;
- return state;
}
} else {
long readableSmallestLSN = logMgr.getReadableSmallestLSN();
@@ -145,16 +148,14 @@
//No choice but continuing when the log files are lost.
}
state = SystemState.HEALTHY;
- return state;
} else if (checkpointObject.getCheckpointLsn() == logMgr.getAppendLSN()
&& checkpointObject.getMinMCTFirstLsn() == AbstractCheckpointManager.SHARP_CHECKPOINT_LSN) {
state = SystemState.HEALTHY;
- return state;
} else {
state = SystemState.CORRUPTED;
- return state;
}
}
+ return state;
}
//This method is used only when replication is disabled.
@@ -179,6 +180,25 @@
}
@Override
+ public void startLocalRecovery(Set<Integer> partitions) throws IOException, ACIDException {
+ state = SystemState.RECOVERING;
+ LOGGER.log(Level.INFO, "starting recovery ...");
+
+ long readableSmallestLSN = logMgr.getReadableSmallestLSN();
+ Checkpoint checkpointObject = checkpointManager.getLatest();
+ long lowWaterMarkLSN = checkpointObject.getMinMCTFirstLsn();
+ if (lowWaterMarkLSN < readableSmallestLSN) {
+ lowWaterMarkLSN = readableSmallestLSN;
+ }
+
+ //delete any recovery files from previous failed recovery attempts
+ deleteRecoveryTemporaryFiles();
+
+ //get active partitions on this node
+ replayPartitionsLogs(partitions, logMgr.getLogReader(true), lowWaterMarkLSN);
+ }
+
+ @Override
public synchronized void replayPartitionsLogs(Set<Integer> partitions, ILogReader logReader, long lowWaterMarkLSN)
throws IOException, ACIDException {
try {
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/TransactionSubsystem.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/TransactionSubsystem.java
index 808a252..8e9463b 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/TransactionSubsystem.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/TransactionSubsystem.java
@@ -21,11 +21,11 @@
import java.util.concurrent.Callable;
import java.util.logging.Logger;
-import org.apache.asterix.common.config.ClusterProperties;
import org.apache.asterix.common.config.IPropertiesProvider;
import org.apache.asterix.common.config.ReplicationProperties;
import org.apache.asterix.common.config.TransactionProperties;
import org.apache.asterix.common.exceptions.ACIDException;
+import org.apache.asterix.common.replication.IReplicationStrategy;
import org.apache.asterix.common.transactions.Checkpoint;
import org.apache.asterix.common.transactions.CheckpointProperties;
import org.apache.asterix.common.transactions.IAppRuntimeContextProvider;
@@ -70,7 +70,11 @@
this.txnProperties = txnProperties;
this.transactionManager = new TransactionManager(this);
this.lockManager = new ConcurrentLockManager(txnProperties.getLockManagerShrinkTimer());
- final boolean replicationEnabled = ClusterProperties.INSTANCE.isReplicationEnabled();
+ ReplicationProperties repProperties = ((IPropertiesProvider) asterixAppRuntimeContextProvider
+ .getAppContext()).getReplicationProperties();
+ IReplicationStrategy replicationStrategy = repProperties.getReplicationStrategy();
+ final boolean replicationEnabled = repProperties.isParticipant(id);
+
final CheckpointProperties checkpointProperties = new CheckpointProperties(txnProperties, id);
checkpointManager = CheckpointManagerFactory.create(this, checkpointProperties, replicationEnabled);
final Checkpoint latestCheckpoint = checkpointManager.getLatest();
@@ -80,14 +84,8 @@
latestCheckpoint.getStorageVersion(), StorageConstants.VERSION));
}
- ReplicationProperties asterixReplicationProperties = null;
- if (asterixAppRuntimeContextProvider != null) {
- asterixReplicationProperties = ((IPropertiesProvider) asterixAppRuntimeContextProvider.getAppContext())
- .getReplicationProperties();
- }
-
- if (asterixReplicationProperties != null && replicationEnabled) {
- this.logManager = new LogManagerWithReplication(this);
+ if (replicationEnabled) {
+ this.logManager = new LogManagerWithReplication(this, replicationStrategy);
} else {
this.logManager = new LogManager(this);
}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/BindMetadataNodeTask.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/BindMetadataNodeTask.java
new file mode 100644
index 0000000..d1edec0
--- /dev/null
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/BindMetadataNodeTask.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.app.nc.task;
+
+import org.apache.asterix.common.api.IAppRuntimeContext;
+import org.apache.asterix.common.api.INCLifecycleTask;
+import org.apache.asterix.common.exceptions.ExceptionUtils;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.service.IControllerService;
+import org.apache.hyracks.control.nc.NodeControllerService;
+
+public class BindMetadataNodeTask implements INCLifecycleTask {
+
+ private static final long serialVersionUID = 1L;
+ private final boolean exportStub;
+
+ public BindMetadataNodeTask(boolean exportStub) {
+ this.exportStub = exportStub;
+ }
+
+ @Override
+ public void perform(IControllerService cs) throws HyracksDataException {
+ NodeControllerService ncs = (NodeControllerService) cs;
+ IAppRuntimeContext runtimeContext = (IAppRuntimeContext) ncs.getApplicationContext().getApplicationObject();
+ try {
+ if (exportStub) {
+ runtimeContext.exportMetadataNodeStub();
+ } else {
+ runtimeContext.unexportMetadataNodeStub();
+ }
+ } catch (Exception e) {
+ throw ExceptionUtils.convertToHyracksDataException(e);
+ }
+ }
+}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/CheckpointTask.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/CheckpointTask.java
new file mode 100644
index 0000000..8ef6ae3
--- /dev/null
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/CheckpointTask.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.app.nc.task;
+
+import org.apache.asterix.common.api.IAppRuntimeContext;
+import org.apache.asterix.common.api.INCLifecycleTask;
+import org.apache.asterix.common.transactions.ICheckpointManager;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.service.IControllerService;
+import org.apache.hyracks.control.nc.NodeControllerService;
+
+public class CheckpointTask implements INCLifecycleTask {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public void perform(IControllerService cs) throws HyracksDataException {
+ NodeControllerService ncs = (NodeControllerService) cs;
+ IAppRuntimeContext runtimeContext = (IAppRuntimeContext) ncs.getApplicationContext().getApplicationObject();
+ ICheckpointManager checkpointMgr = runtimeContext.getTransactionSubsystem().getCheckpointManager();
+ checkpointMgr.doSharpCheckpoint();
+ }
+}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/ExternalLibrarySetupTask.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/ExternalLibrarySetupTask.java
new file mode 100644
index 0000000..8e842a9
--- /dev/null
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/ExternalLibrarySetupTask.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.app.nc.task;
+
+import org.apache.asterix.app.external.ExternalLibraryUtils;
+import org.apache.asterix.common.api.IAppRuntimeContext;
+import org.apache.asterix.common.api.INCLifecycleTask;
+import org.apache.asterix.common.exceptions.ExceptionUtils;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.service.IControllerService;
+import org.apache.hyracks.control.nc.NodeControllerService;
+
+public class ExternalLibrarySetupTask implements INCLifecycleTask {
+
+ private static final long serialVersionUID = 1L;
+ private final boolean metadataNode;
+
+ public ExternalLibrarySetupTask(boolean metadataNode) {
+ this.metadataNode = metadataNode;
+ }
+
+ @Override
+ public void perform(IControllerService cs) throws HyracksDataException {
+ NodeControllerService ncs = (NodeControllerService) cs;
+ IAppRuntimeContext appContext = (IAppRuntimeContext) ncs.getApplicationContext().getApplicationObject();
+ try {
+ ExternalLibraryUtils.setUpExternaLibraries(appContext.getLibraryManager(), metadataNode);
+ } catch (Exception e) {
+ throw ExceptionUtils.convertToHyracksDataException(e);
+ }
+ }
+}
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/LocalRecoveryTask.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/LocalRecoveryTask.java
new file mode 100644
index 0000000..203e453
--- /dev/null
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/LocalRecoveryTask.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.app.nc.task;
+
+import java.io.IOException;
+import java.util.Set;
+
+import org.apache.asterix.common.api.IAppRuntimeContext;
+import org.apache.asterix.common.api.INCLifecycleTask;
+import org.apache.asterix.common.exceptions.ACIDException;
+import org.apache.asterix.common.exceptions.ExceptionUtils;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.service.IControllerService;
+import org.apache.hyracks.control.nc.NodeControllerService;
+
+public class LocalRecoveryTask implements INCLifecycleTask {
+
+ private static final long serialVersionUID = 1L;
+ private final Set<Integer> partitions;
+
+ public LocalRecoveryTask(Set<Integer> partitions) {
+ this.partitions = partitions;
+ }
+
+ @Override
+ public void perform(IControllerService cs) throws HyracksDataException {
+ NodeControllerService ncs = (NodeControllerService) cs;
+ IAppRuntimeContext runtimeContext = (IAppRuntimeContext) ncs.getApplicationContext().getApplicationObject();
+ try {
+ runtimeContext.getTransactionSubsystem().getRecoveryManager().startLocalRecovery(partitions);
+ } catch (IOException | ACIDException e) {
+ throw ExceptionUtils.convertToHyracksDataException(e);
+ }
+ }
+}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/MetadataBootstrapTask.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/MetadataBootstrapTask.java
new file mode 100644
index 0000000..b6d85d9
--- /dev/null
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/MetadataBootstrapTask.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.app.nc.task;
+
+import org.apache.asterix.common.api.IAppRuntimeContext;
+import org.apache.asterix.common.api.INCLifecycleTask;
+import org.apache.asterix.common.exceptions.ExceptionUtils;
+import org.apache.asterix.common.transactions.IRecoveryManager.SystemState;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.service.IControllerService;
+import org.apache.hyracks.control.nc.NodeControllerService;
+
+public class MetadataBootstrapTask implements INCLifecycleTask {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public void perform(IControllerService cs) throws HyracksDataException {
+ NodeControllerService ncs = (NodeControllerService) cs;
+ IAppRuntimeContext appContext = (IAppRuntimeContext) ncs.getApplicationContext().getApplicationObject();
+ try {
+ SystemState state = appContext.getTransactionSubsystem().getRecoveryManager().getSystemState();
+ appContext.initializeMetadata(state == SystemState.NEW_UNIVERSE);
+ } catch (Exception e) {
+ throw ExceptionUtils.convertToHyracksDataException(e);
+ }
+ }
+}
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/RemoteRecoveryTask.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/RemoteRecoveryTask.java
new file mode 100644
index 0000000..4574304
--- /dev/null
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/RemoteRecoveryTask.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.app.nc.task;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.asterix.common.api.IAppRuntimeContext;
+import org.apache.asterix.common.api.INCLifecycleTask;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.service.IControllerService;
+import org.apache.hyracks.control.nc.NodeControllerService;
+
+public class RemoteRecoveryTask implements INCLifecycleTask {
+
+ private static final long serialVersionUID = 1L;
+ private final Map<String, Set<Integer>> recoveryPlan;
+
+ public RemoteRecoveryTask(Map<String, Set<Integer>> recoveryPlan) {
+ this.recoveryPlan = recoveryPlan;
+ }
+
+ @Override
+ public void perform(IControllerService cs) throws HyracksDataException {
+ NodeControllerService ncs = (NodeControllerService) cs;
+ IAppRuntimeContext runtimeContext = (IAppRuntimeContext) ncs.getApplicationContext().getApplicationObject();
+ runtimeContext.getRemoteRecoveryManager().doRemoteRecoveryPlan(recoveryPlan);
+ }
+
+ private void writeObject(java.io.ObjectOutputStream out) throws IOException {
+ out.defaultWriteObject();
+ }
+
+ private void readObject(java.io.ObjectInputStream in) throws IOException, ClassNotFoundException {
+ in.defaultReadObject();
+ }
+}
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/ReportMaxResourceIdTask.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/ReportMaxResourceIdTask.java
new file mode 100644
index 0000000..7c30a29
--- /dev/null
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/ReportMaxResourceIdTask.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.app.nc.task;
+
+import org.apache.asterix.common.api.INCLifecycleTask;
+import org.apache.asterix.runtime.message.ReportMaxResourceIdMessage;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.service.IControllerService;
+import org.apache.hyracks.control.nc.NodeControllerService;
+
+public class ReportMaxResourceIdTask implements INCLifecycleTask {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public void perform(IControllerService cs) throws HyracksDataException {
+ ReportMaxResourceIdMessage.send((NodeControllerService) cs);
+ }
+}
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/StartFailbackTask.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/StartFailbackTask.java
new file mode 100644
index 0000000..6ae4487
--- /dev/null
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/StartFailbackTask.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.app.nc.task;
+
+import org.apache.asterix.common.api.IAppRuntimeContext;
+import org.apache.asterix.common.api.INCLifecycleTask;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.service.IControllerService;
+import org.apache.hyracks.control.nc.NodeControllerService;
+
+public class StartFailbackTask implements INCLifecycleTask {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public void perform(IControllerService cs) throws HyracksDataException {
+ NodeControllerService ncs = (NodeControllerService) cs;
+ IAppRuntimeContext runtimeContext = (IAppRuntimeContext) ncs.getApplicationContext().getApplicationObject();
+ runtimeContext.getRemoteRecoveryManager().startFailbackProcess();
+ }
+}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/StartLifecycleComponentsTask.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/StartLifecycleComponentsTask.java
new file mode 100644
index 0000000..3beb573
--- /dev/null
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/StartLifecycleComponentsTask.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.app.nc.task;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.asterix.common.api.IAppRuntimeContext;
+import org.apache.asterix.common.api.INCLifecycleTask;
+import org.apache.asterix.common.config.IPropertiesProvider;
+import org.apache.asterix.common.config.MetadataProperties;
+import org.apache.asterix.hyracks.bootstrap.AsterixStateDumpHandler;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.lifecycle.ILifeCycleComponentManager;
+import org.apache.hyracks.api.lifecycle.LifeCycleComponentManager;
+import org.apache.hyracks.api.service.IControllerService;
+import org.apache.hyracks.control.nc.NodeControllerService;
+import org.apache.hyracks.control.nc.application.NCApplicationContext;
+
+public class StartLifecycleComponentsTask implements INCLifecycleTask {
+
+ private static final Logger LOGGER = Logger.getLogger(StartLifecycleComponentsTask.class.getName());
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public void perform(IControllerService cs) throws HyracksDataException {
+ NodeControllerService ncs = (NodeControllerService) cs;
+ IAppRuntimeContext runtimeContext = (IAppRuntimeContext) ncs.getApplicationContext().getApplicationObject();
+ NCApplicationContext appContext = ncs.getApplicationContext();
+ MetadataProperties metadataProperties = ((IPropertiesProvider) runtimeContext).getMetadataProperties();
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Starting lifecycle components");
+ }
+ Map<String, String> lifecycleMgmtConfiguration = new HashMap<>();
+ String dumpPathKey = LifeCycleComponentManager.Config.DUMP_PATH_KEY;
+ String dumpPath = metadataProperties.getCoredumpPath(appContext.getNodeId());
+ lifecycleMgmtConfiguration.put(dumpPathKey, dumpPath);
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Coredump directory for NC is: " + dumpPath);
+ }
+ ILifeCycleComponentManager lccm = appContext.getLifeCycleComponentManager();
+ lccm.configure(lifecycleMgmtConfiguration);
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Configured:" + lccm);
+ }
+ appContext.setStateDumpHandler(new AsterixStateDumpHandler(appContext.getNodeId(), lccm.getDumpPath(), lccm));
+ lccm.startAll();
+ }
+}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/StartReplicationServiceTask.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/StartReplicationServiceTask.java
new file mode 100644
index 0000000..d060f61
--- /dev/null
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/StartReplicationServiceTask.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.app.nc.task;
+
+import org.apache.asterix.common.api.IAppRuntimeContext;
+import org.apache.asterix.common.api.INCLifecycleTask;
+import org.apache.asterix.common.exceptions.ExceptionUtils;
+import org.apache.asterix.common.replication.IReplicationManager;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.service.IControllerService;
+import org.apache.hyracks.control.nc.NodeControllerService;
+
+public class StartReplicationServiceTask implements INCLifecycleTask {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public void perform(IControllerService cs) throws HyracksDataException {
+ NodeControllerService ncs = (NodeControllerService) cs;
+ IAppRuntimeContext runtimeContext = (IAppRuntimeContext) ncs.getApplicationContext().getApplicationObject();
+ try {
+ //Open replication channel
+ runtimeContext.getReplicationChannel().start();
+ final IReplicationManager replicationManager = runtimeContext.getReplicationManager();
+ //Check the state of remote replicas
+ replicationManager.initializeReplicasState();
+ //Start replication after the state of remote replicas has been initialized.
+ replicationManager.startReplicationThreads();
+ } catch (Exception e) {
+ throw ExceptionUtils.convertToHyracksDataException(e);
+ }
+ }
+}
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/AutoFaultToleranceStrategy.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/AutoFaultToleranceStrategy.java
new file mode 100644
index 0000000..084563f
--- /dev/null
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/AutoFaultToleranceStrategy.java
@@ -0,0 +1,526 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.app.replication;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.asterix.app.nc.task.BindMetadataNodeTask;
+import org.apache.asterix.app.nc.task.CheckpointTask;
+import org.apache.asterix.app.nc.task.ExternalLibrarySetupTask;
+import org.apache.asterix.app.nc.task.MetadataBootstrapTask;
+import org.apache.asterix.app.nc.task.ReportMaxResourceIdTask;
+import org.apache.asterix.app.nc.task.StartFailbackTask;
+import org.apache.asterix.app.nc.task.StartLifecycleComponentsTask;
+import org.apache.asterix.app.nc.task.StartReplicationServiceTask;
+import org.apache.asterix.app.replication.NodeFailbackPlan.FailbackPlanState;
+import org.apache.asterix.app.replication.message.CompleteFailbackRequestMessage;
+import org.apache.asterix.app.replication.message.CompleteFailbackResponseMessage;
+import org.apache.asterix.app.replication.message.NCLifecycleTaskReportMessage;
+import org.apache.asterix.app.replication.message.PreparePartitionsFailbackRequestMessage;
+import org.apache.asterix.app.replication.message.PreparePartitionsFailbackResponseMessage;
+import org.apache.asterix.app.replication.message.StartupTaskRequestMessage;
+import org.apache.asterix.app.replication.message.StartupTaskResponseMessage;
+import org.apache.asterix.app.replication.message.TakeoverMetadataNodeRequestMessage;
+import org.apache.asterix.app.replication.message.TakeoverMetadataNodeResponseMessage;
+import org.apache.asterix.app.replication.message.TakeoverPartitionsRequestMessage;
+import org.apache.asterix.app.replication.message.TakeoverPartitionsResponseMessage;
+import org.apache.asterix.common.api.IClusterManagementWork.ClusterState;
+import org.apache.asterix.common.api.INCLifecycleTask;
+import org.apache.asterix.common.cluster.ClusterPartition;
+import org.apache.asterix.common.cluster.IClusterStateManager;
+import org.apache.asterix.common.config.ReplicationProperties;
+import org.apache.asterix.common.exceptions.ExceptionUtils;
+import org.apache.asterix.common.messaging.api.ICCMessageBroker;
+import org.apache.asterix.common.replication.IFaultToleranceStrategy;
+import org.apache.asterix.common.replication.INCLifecycleMessage;
+import org.apache.asterix.common.replication.IReplicationStrategy;
+import org.apache.asterix.common.transactions.IRecoveryManager.SystemState;
+import org.apache.asterix.metadata.MetadataManager;
+import org.apache.asterix.runtime.utils.AppContextInfo;
+import org.apache.asterix.util.FaultToleranceUtil;
+import org.apache.hyracks.api.application.IClusterLifecycleListener.ClusterEventType;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public class AutoFaultToleranceStrategy implements IFaultToleranceStrategy {
+
+ private static final Logger LOGGER = Logger.getLogger(AutoFaultToleranceStrategy.class.getName());
+ private long clusterRequestId = 0;
+
+ private Set<String> failedNodes = new HashSet<>();
+ private LinkedList<NodeFailbackPlan> pendingProcessingFailbackPlans = new LinkedList<>();
+ private Map<Long, NodeFailbackPlan> planId2FailbackPlanMap = new HashMap<>();
+ private Map<Long, TakeoverPartitionsRequestMessage> pendingTakeoverRequests = new HashMap<>();;
+ private String currentMetadataNode;
+ private boolean metadataNodeActive = false;
+ private IClusterStateManager clusterManager;
+ private ICCMessageBroker messageBroker;
+ private IReplicationStrategy replicationStrategy;
+ private Set<String> pendingStartupCompletionNodes = new HashSet<>();
+
+ @Override
+ public void notifyNodeJoin(String nodeId) throws HyracksDataException {
+ pendingStartupCompletionNodes.add(nodeId);
+ }
+
+ @Override
+ public void notifyNodeFailure(String nodeId) throws HyracksDataException {
+ //if this node was waiting for failback and failed before it completed
+ if (failedNodes.contains(nodeId)) {
+ notifyFailbackPlansNodeFailure(nodeId);
+ revertFailedFailbackPlanEffects();
+ return;
+ }
+ //an active node failed
+ failedNodes.add(nodeId);
+ clusterManager.updateNodePartitions(nodeId, false);
+ if (nodeId.equals(currentMetadataNode)) {
+ metadataNodeActive = false;
+ clusterManager.updateMetadataNode(nodeId, metadataNodeActive);
+ }
+ validateClusterState();
+ FaultToleranceUtil.notifyImpactedReplicas(nodeId, ClusterEventType.NODE_FAILURE, clusterManager, messageBroker,
+ replicationStrategy);
+ notifyFailbackPlansNodeFailure(nodeId);
+ requestPartitionsTakeover(nodeId);
+ }
+
+ private synchronized void notifyFailbackPlansNodeFailure(String nodeId) {
+ Iterator<NodeFailbackPlan> iterator = planId2FailbackPlanMap.values().iterator();
+ while (iterator.hasNext()) {
+ NodeFailbackPlan plan = iterator.next();
+ plan.notifyNodeFailure(nodeId);
+ }
+ }
+
+ private synchronized void revertFailedFailbackPlanEffects() {
+ Iterator<NodeFailbackPlan> iterator = planId2FailbackPlanMap.values().iterator();
+ while (iterator.hasNext()) {
+ NodeFailbackPlan plan = iterator.next();
+ if (plan.getState() == FailbackPlanState.PENDING_ROLLBACK) {
+ //TODO if the failing back node is still active, notify it to construct a new plan for it
+ iterator.remove();
+
+ //reassign the partitions that were supposed to be failed back to an active replica
+ requestPartitionsTakeover(plan.getNodeId());
+ }
+ }
+ }
+
+ private synchronized void requestPartitionsTakeover(String failedNodeId) {
+ //replica -> list of partitions to takeover
+ Map<String, List<Integer>> partitionRecoveryPlan = new HashMap<>();
+ ReplicationProperties replicationProperties = AppContextInfo.INSTANCE.getReplicationProperties();
+
+ //collect the partitions of the failed NC
+ List<ClusterPartition> lostPartitions = getNodeAssignedPartitions(failedNodeId);
+ if (!lostPartitions.isEmpty()) {
+ for (ClusterPartition partition : lostPartitions) {
+ //find replicas for this partitions
+ Set<String> partitionReplicas = replicationProperties.getNodeReplicasIds(partition.getNodeId());
+ //find a replica that is still active
+ for (String replica : partitionReplicas) {
+ //TODO (mhubail) currently this assigns the partition to the first found active replica.
+ //It needs to be modified to consider load balancing.
+ if (addActiveReplica(replica, partition, partitionRecoveryPlan)) {
+ break;
+ }
+ }
+ }
+
+ if (partitionRecoveryPlan.size() == 0) {
+ //no active replicas were found for the failed node
+ LOGGER.severe("Could not find active replicas for the partitions " + lostPartitions);
+ return;
+ } else {
+ LOGGER.info("Partitions to recover: " + lostPartitions);
+ }
+ //For each replica, send a request to takeover the assigned partitions
+ for (Entry<String, List<Integer>> entry : partitionRecoveryPlan.entrySet()) {
+ String replica = entry.getKey();
+ Integer[] partitionsToTakeover = entry.getValue().toArray(new Integer[entry.getValue().size()]);
+ long requestId = clusterRequestId++;
+ TakeoverPartitionsRequestMessage takeoverRequest = new TakeoverPartitionsRequestMessage(requestId,
+ replica, partitionsToTakeover);
+ pendingTakeoverRequests.put(requestId, takeoverRequest);
+ try {
+ messageBroker.sendApplicationMessageToNC(takeoverRequest, replica);
+ } catch (Exception e) {
+ /**
+ * if we fail to send the request, it means the NC we tried to send the request to
+ * has failed. When the failure notification arrives, we will send any pending request
+ * that belongs to the failed NC to a different active replica.
+ */
+ LOGGER.log(Level.WARNING, "Failed to send takeover request: " + takeoverRequest, e);
+ }
+ }
+ }
+ }
+
+ private boolean addActiveReplica(String replica, ClusterPartition partition,
+ Map<String, List<Integer>> partitionRecoveryPlan) {
+ Map<String, Map<String, String>> activeNcConfiguration = clusterManager.getActiveNcConfiguration();
+ if (activeNcConfiguration.containsKey(replica) && !failedNodes.contains(replica)) {
+ if (!partitionRecoveryPlan.containsKey(replica)) {
+ List<Integer> replicaPartitions = new ArrayList<>();
+ replicaPartitions.add(partition.getPartitionId());
+ partitionRecoveryPlan.put(replica, replicaPartitions);
+ } else {
+ partitionRecoveryPlan.get(replica).add(partition.getPartitionId());
+ }
+ return true;
+ }
+ return false;
+ }
+
+ private synchronized void prepareFailbackPlan(String failingBackNodeId) {
+ NodeFailbackPlan plan = NodeFailbackPlan.createPlan(failingBackNodeId);
+ pendingProcessingFailbackPlans.add(plan);
+ planId2FailbackPlanMap.put(plan.getPlanId(), plan);
+
+ //get all partitions this node requires to resync
+ ReplicationProperties replicationProperties = AppContextInfo.INSTANCE.getReplicationProperties();
+ Set<String> nodeReplicas = replicationProperties.getNodeReplicasIds(failingBackNodeId);
+ clusterManager.getClusterPartitons();
+ for (String replicaId : nodeReplicas) {
+ ClusterPartition[] nodePartitions = clusterManager.getNodePartitions(replicaId);
+ for (ClusterPartition partition : nodePartitions) {
+ plan.addParticipant(partition.getActiveNodeId());
+ /**
+ * if the partition original node is the returning node,
+ * add it to the list of the partitions which will be failed back
+ */
+ if (partition.getNodeId().equals(failingBackNodeId)) {
+ plan.addPartitionToFailback(partition.getPartitionId(), partition.getActiveNodeId());
+ }
+ }
+ }
+
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Prepared Failback plan: " + plan.toString());
+ }
+
+ processPendingFailbackPlans();
+ }
+
+ private synchronized void processPendingFailbackPlans() {
+ ClusterState state = clusterManager.getState();
+ /**
+ * if the cluster state is not ACTIVE, then failbacks should not be processed
+ * since some partitions are not active
+ */
+ if (state == ClusterState.ACTIVE) {
+ while (!pendingProcessingFailbackPlans.isEmpty()) {
+ //take the first pending failback plan
+ NodeFailbackPlan plan = pendingProcessingFailbackPlans.pop();
+ /**
+ * A plan at this stage will be in one of two states:
+ * 1. PREPARING -> the participants were selected but we haven't sent any request.
+ * 2. PENDING_ROLLBACK -> a participant failed before we send any requests
+ */
+ if (plan.getState() == FailbackPlanState.PREPARING) {
+ //set the partitions that will be failed back as inactive
+ String failbackNode = plan.getNodeId();
+ for (Integer partitionId : plan.getPartitionsToFailback()) {
+ //partition expected to be returned to the failing back node
+ clusterManager.updateClusterPartition(partitionId, failbackNode, false);
+ }
+
+ /**
+ * if the returning node is the original metadata node,
+ * then metadata node will change after the failback completes
+ */
+ String originalMetadataNode = AppContextInfo.INSTANCE.getMetadataProperties().getMetadataNodeName();
+ if (originalMetadataNode.equals(failbackNode)) {
+ plan.setNodeToReleaseMetadataManager(currentMetadataNode);
+ currentMetadataNode = "";
+ metadataNodeActive = false;
+ clusterManager.updateMetadataNode(currentMetadataNode, metadataNodeActive);
+ }
+
+ //force new jobs to wait
+ clusterManager.setState(ClusterState.REBALANCING);
+ handleFailbackRequests(plan, messageBroker);
+ /**
+ * wait until the current plan is completed before processing the next plan.
+ * when the current one completes or is reverted, the cluster state will be
+ * ACTIVE again, and the next failback plan (if any) will be processed.
+ */
+ break;
+ } else if (plan.getState() == FailbackPlanState.PENDING_ROLLBACK) {
+ //this plan failed before sending any requests -> nothing to rollback
+ planId2FailbackPlanMap.remove(plan.getPlanId());
+ }
+ }
+ }
+ }
+
+ private void handleFailbackRequests(NodeFailbackPlan plan, ICCMessageBroker messageBroker) {
+ //send requests to other nodes to complete on-going jobs and prepare partitions for failback
+ for (PreparePartitionsFailbackRequestMessage request : plan.getPlanFailbackRequests()) {
+ try {
+ messageBroker.sendApplicationMessageToNC(request, request.getNodeID());
+ plan.addPendingRequest(request);
+ } catch (Exception e) {
+ LOGGER.log(Level.WARNING, "Failed to send failback request to: " + request.getNodeID(), e);
+ plan.notifyNodeFailure(request.getNodeID());
+ revertFailedFailbackPlanEffects();
+ break;
+ }
+ }
+ }
+
+ public synchronized List<ClusterPartition> getNodeAssignedPartitions(String nodeId) {
+ List<ClusterPartition> nodePartitions = new ArrayList<>();
+ ClusterPartition[] clusterPartitons = clusterManager.getClusterPartitons();
+ Map<Integer, ClusterPartition> clusterPartitionsMap = new HashMap<>();
+ for (ClusterPartition partition : clusterPartitons) {
+ clusterPartitionsMap.put(partition.getPartitionId(), partition);
+ }
+ for (ClusterPartition partition : clusterPartitons) {
+ if (partition.getActiveNodeId().equals(nodeId)) {
+ nodePartitions.add(partition);
+ }
+ }
+ /**
+ * if there is any pending takeover request this node was supposed to handle,
+ * it needs to be sent to a different replica
+ */
+ List<Long> failedTakeoverRequests = new ArrayList<>();
+ for (TakeoverPartitionsRequestMessage request : pendingTakeoverRequests.values()) {
+ if (request.getNodeId().equals(nodeId)) {
+ for (Integer partitionId : request.getPartitions()) {
+ nodePartitions.add(clusterPartitionsMap.get(partitionId));
+ }
+ failedTakeoverRequests.add(request.getRequestId());
+ }
+ }
+
+ //remove failed requests
+ for (Long requestId : failedTakeoverRequests) {
+ pendingTakeoverRequests.remove(requestId);
+ }
+ return nodePartitions;
+ }
+
+ public synchronized void process(TakeoverPartitionsResponseMessage response) throws HyracksDataException {
+ for (Integer partitonId : response.getPartitions()) {
+ clusterManager.updateClusterPartition(partitonId, response.getNodeId(), true);
+ }
+ pendingTakeoverRequests.remove(response.getRequestId());
+ validateClusterState();
+ }
+
+ public synchronized void process(TakeoverMetadataNodeResponseMessage response) throws HyracksDataException {
+ currentMetadataNode = response.getNodeId();
+ metadataNodeActive = true;
+ clusterManager.updateMetadataNode(currentMetadataNode, metadataNodeActive);
+ validateClusterState();
+ }
+
+ private void validateClusterState() throws HyracksDataException {
+ clusterManager.refreshState();
+ ClusterState newState = clusterManager.getState();
+ // PENDING: all partitions are active but metadata node is not
+ if (newState == ClusterState.PENDING) {
+ requestMetadataNodeTakeover();
+ } else if (newState == ClusterState.ACTIVE) {
+ processPendingFailbackPlans();
+ }
+ }
+
+ public synchronized void process(PreparePartitionsFailbackResponseMessage msg) {
+ NodeFailbackPlan plan = planId2FailbackPlanMap.get(msg.getPlanId());
+ plan.markRequestCompleted(msg.getRequestId());
+ /**
+ * A plan at this stage will be in one of three states:
+ * 1. PENDING_PARTICIPANT_REPONSE -> one or more responses are still expected (wait).
+ * 2. PENDING_COMPLETION -> all responses received (time to send completion request).
+ * 3. PENDING_ROLLBACK -> the plan failed and we just received the final pending response (revert).
+ */
+ if (plan.getState() == FailbackPlanState.PENDING_COMPLETION) {
+ CompleteFailbackRequestMessage request = plan.getCompleteFailbackRequestMessage();
+
+ //send complete resync and takeover partitions to the failing back node
+ try {
+ messageBroker.sendApplicationMessageToNC(request, request.getNodeId());
+ } catch (Exception e) {
+ LOGGER.log(Level.WARNING, "Failed to send complete failback request to: " + request.getNodeId(), e);
+ notifyFailbackPlansNodeFailure(request.getNodeId());
+ revertFailedFailbackPlanEffects();
+ }
+ } else if (plan.getState() == FailbackPlanState.PENDING_ROLLBACK) {
+ revertFailedFailbackPlanEffects();
+ }
+ }
+
+ public synchronized void process(CompleteFailbackResponseMessage response) throws HyracksDataException {
+ /**
+ * the failback plan completed successfully:
+ * Remove all references to it.
+ * Remove the the failing back node from the failed nodes list.
+ * Notify its replicas to reconnect to it.
+ * Set the failing back node partitions as active.
+ */
+ NodeFailbackPlan plan = planId2FailbackPlanMap.remove(response.getPlanId());
+ String nodeId = plan.getNodeId();
+ failedNodes.remove(nodeId);
+ //notify impacted replicas they can reconnect to this node
+ FaultToleranceUtil.notifyImpactedReplicas(nodeId, ClusterEventType.NODE_JOIN, clusterManager, messageBroker,
+ replicationStrategy);
+ clusterManager.updateNodePartitions(nodeId, true);
+ validateClusterState();
+ }
+
+ private synchronized void requestMetadataNodeTakeover() {
+ //need a new node to takeover metadata node
+ ClusterPartition metadataPartiton = AppContextInfo.INSTANCE.getMetadataProperties().getMetadataPartition();
+ //request the metadataPartition node to register itself as the metadata node
+ TakeoverMetadataNodeRequestMessage takeoverRequest = new TakeoverMetadataNodeRequestMessage();
+ try {
+ messageBroker.sendApplicationMessageToNC(takeoverRequest, metadataPartiton.getActiveNodeId());
+ // Since the metadata node will be changed, we need to rebind the proxy object
+ MetadataManager.INSTANCE.rebindMetadataNode();
+ } catch (Exception e) {
+
+ /**
+ * if we fail to send the request, it means the NC we tried to send the request to
+ * has failed. When the failure notification arrives, a new NC will be assigned to
+ * the metadata partition and a new metadata node takeover request will be sent to it.
+ */
+ LOGGER.log(Level.WARNING,
+ "Failed to send metadata node takeover request to: " + metadataPartiton.getActiveNodeId(), e);
+ }
+ }
+
+ @Override
+ public IFaultToleranceStrategy from(IReplicationStrategy replicationStrategy, ICCMessageBroker messageBroker) {
+ AutoFaultToleranceStrategy ft = new AutoFaultToleranceStrategy();
+ ft.messageBroker = messageBroker;
+ ft.replicationStrategy = replicationStrategy;
+ return ft;
+ }
+
+ @Override
+ public synchronized void process(INCLifecycleMessage message) throws HyracksDataException {
+ switch (message.getType()) {
+ case STARTUP_TASK_REQUEST:
+ process((StartupTaskRequestMessage) message);
+ break;
+ case STARTUP_TASK_RESULT:
+ process((NCLifecycleTaskReportMessage) message);
+ break;
+ case TAKEOVER_PARTITION_RESPONSE:
+ process((TakeoverPartitionsResponseMessage) message);
+ break;
+ case TAKEOVER_METADATA_NODE_RESPONSE:
+ process((TakeoverMetadataNodeResponseMessage) message);
+ break;
+ case PREPARE_FAILBACK_RESPONSE:
+ process((PreparePartitionsFailbackResponseMessage) message);
+ break;
+ case COMPLETE_FAILBACK_RESPONSE:
+ process((CompleteFailbackResponseMessage) message);
+ break;
+ default:
+ throw new HyracksDataException("Unsupported message type: " + message.getType().name());
+ }
+ }
+
+ private synchronized void process(NCLifecycleTaskReportMessage msg) throws HyracksDataException {
+ final String nodeId = msg.getNodeId();
+ pendingStartupCompletionNodes.remove(nodeId);
+ if (msg.isSuccess()) {
+ if (failedNodes.contains(nodeId)) {
+ prepareFailbackPlan(nodeId);
+ return;
+ }
+ // If this node failed and recovered, notify impacted replicas to reconnect to it
+ if (replicationStrategy.isParticipant(nodeId) && failedNodes.remove(nodeId)) {
+ FaultToleranceUtil.notifyImpactedReplicas(nodeId, ClusterEventType.NODE_JOIN, clusterManager,
+ messageBroker, replicationStrategy);
+ }
+ clusterManager.updateNodePartitions(nodeId, true);
+ if (msg.getNodeId().equals(currentMetadataNode)) {
+ clusterManager.updateMetadataNode(currentMetadataNode, true);
+ }
+ clusterManager.refreshState();
+ } else {
+ LOGGER.log(Level.SEVERE, msg.getNodeId() + " failed to complete startup. ", msg.getException());
+ }
+ }
+
+ @Override
+ public void bindTo(IClusterStateManager clusterManager) {
+ this.clusterManager = clusterManager;
+ currentMetadataNode = clusterManager.getCurrentMetadataNodeId();
+ }
+
+ private synchronized void process(StartupTaskRequestMessage msg) throws HyracksDataException {
+ final String nodeId = msg.getNodeId();
+ final SystemState state = msg.getState();
+ List<INCLifecycleTask> tasks;
+ if (state == SystemState.INITIAL_RUN || state == SystemState.HEALTHY) {
+ tasks = buildStartupSequence(nodeId);
+ } else {
+ // failed node returned. Need to start failback process
+ tasks = buildFailbackStartupSequence();
+ }
+ StartupTaskResponseMessage response = new StartupTaskResponseMessage(nodeId, tasks);
+ try {
+ messageBroker.sendApplicationMessageToNC(response, msg.getNodeId());
+ } catch (Exception e) {
+ throw ExceptionUtils.convertToHyracksDataException(e);
+ }
+ }
+
+ private List<INCLifecycleTask> buildFailbackStartupSequence() {
+ final List<INCLifecycleTask> tasks = new ArrayList<>();
+ tasks.add(new StartFailbackTask());
+ tasks.add(new ReportMaxResourceIdTask());
+ tasks.add(new StartLifecycleComponentsTask());
+ return tasks;
+ }
+
+ private List<INCLifecycleTask> buildStartupSequence(String nodeId) {
+ final List<INCLifecycleTask> tasks = new ArrayList<>();
+ tasks.add(new StartReplicationServiceTask());
+ final boolean isMetadataNode = nodeId.equals(currentMetadataNode);
+ if (isMetadataNode) {
+ tasks.add(new MetadataBootstrapTask());
+ }
+ tasks.add(new ExternalLibrarySetupTask(isMetadataNode));
+ tasks.add(new ReportMaxResourceIdTask());
+ tasks.add(new CheckpointTask());
+ tasks.add(new StartLifecycleComponentsTask());
+ if (isMetadataNode) {
+ tasks.add(new BindMetadataNodeTask(true));
+ }
+ return tasks;
+ }
+}
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/FaultToleranceStrategyFactory.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/FaultToleranceStrategyFactory.java
new file mode 100644
index 0000000..8d382a1
--- /dev/null
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/FaultToleranceStrategyFactory.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.app.replication;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.asterix.common.messaging.api.ICCMessageBroker;
+import org.apache.asterix.common.replication.IFaultToleranceStrategy;
+import org.apache.asterix.common.replication.IReplicationStrategy;
+import org.apache.asterix.event.schema.cluster.Cluster;
+
+public class FaultToleranceStrategyFactory {
+
+ private static final Map<String, Class<? extends IFaultToleranceStrategy>>
+ BUILT_IN_FAULT_TOLERANCE_STRATEGY = new HashMap<>();
+
+ static {
+ BUILT_IN_FAULT_TOLERANCE_STRATEGY.put("no_fault_tolerance", NoFaultToleranceStrategy.class);
+ BUILT_IN_FAULT_TOLERANCE_STRATEGY.put("metadata_node", MetadataNodeFaultToleranceStrategy.class);
+ BUILT_IN_FAULT_TOLERANCE_STRATEGY.put("auto", AutoFaultToleranceStrategy.class);
+ }
+
+ private FaultToleranceStrategyFactory() {
+ throw new AssertionError();
+ }
+
+ public static IFaultToleranceStrategy create(Cluster cluster, IReplicationStrategy repStrategy,
+ ICCMessageBroker messageBroker) {
+ boolean highAvailabilityEnabled = cluster.getHighAvailability() != null
+ && cluster.getHighAvailability().getEnabled() != null
+ && Boolean.valueOf(cluster.getHighAvailability().getEnabled());
+
+ if (!highAvailabilityEnabled || cluster.getHighAvailability().getFaultTolerance() == null
+ || cluster.getHighAvailability().getFaultTolerance().getStrategy() == null) {
+ return new NoFaultToleranceStrategy().from(repStrategy, messageBroker);
+ }
+ String strategyName = cluster.getHighAvailability().getFaultTolerance().getStrategy().toLowerCase();
+ if (!BUILT_IN_FAULT_TOLERANCE_STRATEGY.containsKey(strategyName)) {
+ throw new IllegalArgumentException(String.format("Unsupported Replication Strategy. Available types: %s",
+ BUILT_IN_FAULT_TOLERANCE_STRATEGY.keySet().toString()));
+ }
+ Class<? extends IFaultToleranceStrategy> clazz = BUILT_IN_FAULT_TOLERANCE_STRATEGY.get(strategyName);
+ try {
+ return clazz.newInstance().from(repStrategy, messageBroker);
+ } catch (InstantiationException | IllegalAccessException e) {
+ throw new IllegalStateException(e);
+ }
+ }
+}
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/MetadataNodeFaultToleranceStrategy.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/MetadataNodeFaultToleranceStrategy.java
new file mode 100644
index 0000000..e6638e8
--- /dev/null
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/MetadataNodeFaultToleranceStrategy.java
@@ -0,0 +1,253 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.app.replication;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+import java.util.stream.Collectors;
+
+import org.apache.asterix.app.nc.task.BindMetadataNodeTask;
+import org.apache.asterix.app.nc.task.CheckpointTask;
+import org.apache.asterix.app.nc.task.ExternalLibrarySetupTask;
+import org.apache.asterix.app.nc.task.LocalRecoveryTask;
+import org.apache.asterix.app.nc.task.MetadataBootstrapTask;
+import org.apache.asterix.app.nc.task.RemoteRecoveryTask;
+import org.apache.asterix.app.nc.task.ReportMaxResourceIdTask;
+import org.apache.asterix.app.nc.task.StartLifecycleComponentsTask;
+import org.apache.asterix.app.nc.task.StartReplicationServiceTask;
+import org.apache.asterix.app.replication.message.NCLifecycleTaskReportMessage;
+import org.apache.asterix.app.replication.message.ReplayPartitionLogsRequestMessage;
+import org.apache.asterix.app.replication.message.ReplayPartitionLogsResponseMessage;
+import org.apache.asterix.app.replication.message.StartupTaskRequestMessage;
+import org.apache.asterix.app.replication.message.StartupTaskResponseMessage;
+import org.apache.asterix.common.api.INCLifecycleTask;
+import org.apache.asterix.common.cluster.ClusterPartition;
+import org.apache.asterix.common.cluster.IClusterStateManager;
+import org.apache.asterix.common.exceptions.ExceptionUtils;
+import org.apache.asterix.common.messaging.api.ICCMessageBroker;
+import org.apache.asterix.common.replication.IFaultToleranceStrategy;
+import org.apache.asterix.common.replication.INCLifecycleMessage;
+import org.apache.asterix.common.replication.IReplicationStrategy;
+import org.apache.asterix.common.replication.Replica;
+import org.apache.asterix.common.transactions.IRecoveryManager.SystemState;
+import org.apache.asterix.runtime.utils.AppContextInfo;
+import org.apache.asterix.util.FaultToleranceUtil;
+import org.apache.hyracks.api.application.IClusterLifecycleListener.ClusterEventType;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public class MetadataNodeFaultToleranceStrategy implements IFaultToleranceStrategy {
+
+ private static final Logger LOGGER = Logger.getLogger(MetadataNodeFaultToleranceStrategy.class.getName());
+ private IClusterStateManager clusterManager;
+ private String metadataNodeId;
+ private IReplicationStrategy replicationStrategy;
+ private ICCMessageBroker messageBroker;
+ private final Set<String> hotStandbyMetadataReplica = new HashSet<>();
+ private final Set<String> failedNodes = new HashSet<>();
+ private Set<String> pendingStartupCompletionNodes = new HashSet<>();
+
+ @Override
+ public synchronized void notifyNodeJoin(String nodeId) throws HyracksDataException {
+ pendingStartupCompletionNodes.add(nodeId);
+ }
+
+ @Override
+ public synchronized void notifyNodeFailure(String nodeId) throws HyracksDataException {
+ failedNodes.add(nodeId);
+ hotStandbyMetadataReplica.remove(nodeId);
+ clusterManager.updateNodePartitions(nodeId, false);
+ if (nodeId.equals(metadataNodeId)) {
+ clusterManager.updateMetadataNode(metadataNodeId, false);
+ }
+ clusterManager.refreshState();
+ if (replicationStrategy.isParticipant(nodeId)) {
+ // Notify impacted replica
+ FaultToleranceUtil.notifyImpactedReplicas(nodeId, ClusterEventType.NODE_FAILURE, clusterManager,
+ messageBroker, replicationStrategy);
+ }
+ // If the failed node is the metadata node, ask its replicas to replay any committed jobs
+ if (nodeId.equals(metadataNodeId)) {
+ int metadataPartitionId = AppContextInfo.INSTANCE.getMetadataProperties().getMetadataPartition()
+ .getPartitionId();
+ Set<Integer> metadataPartition = new HashSet<>(Arrays.asList(metadataPartitionId));
+ Set<Replica> activeRemoteReplicas = replicationStrategy.getRemoteReplicas(metadataNodeId).stream()
+ .filter(replica -> !failedNodes.contains(replica.getId())).collect(Collectors.toSet());
+ //TODO Do election to identity the node with latest state
+ for (Replica replica : activeRemoteReplicas) {
+ ReplayPartitionLogsRequestMessage msg = new ReplayPartitionLogsRequestMessage(metadataPartition);
+ try {
+ messageBroker.sendApplicationMessageToNC(msg, replica.getId());
+ } catch (Exception e) {
+ LOGGER.log(Level.WARNING, "Failed sending an application message to an NC", e);
+ continue;
+ }
+ }
+ }
+ }
+
+ @Override
+ public IFaultToleranceStrategy from(IReplicationStrategy replicationStrategy, ICCMessageBroker messageBroker) {
+ MetadataNodeFaultToleranceStrategy ft = new MetadataNodeFaultToleranceStrategy();
+ ft.replicationStrategy = replicationStrategy;
+ ft.messageBroker = messageBroker;
+ return ft;
+ }
+
+ @Override
+ public synchronized void process(INCLifecycleMessage message) throws HyracksDataException {
+ switch (message.getType()) {
+ case STARTUP_TASK_REQUEST:
+ process((StartupTaskRequestMessage) message);
+ break;
+ case STARTUP_TASK_RESULT:
+ process((NCLifecycleTaskReportMessage) message);
+ break;
+ case REPLAY_LOGS_RESPONSE:
+ process((ReplayPartitionLogsResponseMessage) message);
+ break;
+ default:
+ throw new HyracksDataException("Unsupported message type: " + message.getType().name());
+ }
+ }
+
+ @Override
+ public synchronized void bindTo(IClusterStateManager clusterManager) {
+ this.clusterManager = clusterManager;
+ this.metadataNodeId = clusterManager.getCurrentMetadataNodeId();
+ }
+
+ private synchronized void process(ReplayPartitionLogsResponseMessage msg) {
+ hotStandbyMetadataReplica.add(msg.getNodeId());
+ LOGGER.log(Level.INFO, "Hot Standby Metadata Replicas: " + hotStandbyMetadataReplica);
+ }
+
+ private synchronized void process(StartupTaskRequestMessage msg) throws HyracksDataException {
+ final String nodeId = msg.getNodeId();
+ final SystemState state = msg.getState();
+ final boolean isParticipant = replicationStrategy.isParticipant(nodeId);
+ List<INCLifecycleTask> tasks;
+ if (!isParticipant) {
+ tasks = buildNonParticipantStartupSequence(nodeId, state);
+ } else {
+ tasks = buildParticipantStartupSequence(nodeId, state);
+ }
+ StartupTaskResponseMessage response = new StartupTaskResponseMessage(nodeId, tasks);
+ try {
+ messageBroker.sendApplicationMessageToNC(response, msg.getNodeId());
+ } catch (Exception e) {
+ throw ExceptionUtils.convertToHyracksDataException(e);
+ }
+ }
+
+ private synchronized void process(NCLifecycleTaskReportMessage msg) throws HyracksDataException {
+ final String nodeId = msg.getNodeId();
+ pendingStartupCompletionNodes.remove(nodeId);
+ if (msg.isSuccess()) {
+ // If this node failed and recovered, notify impacted replicas to reconnect to it
+ if (replicationStrategy.isParticipant(nodeId) && failedNodes.remove(nodeId)) {
+ FaultToleranceUtil.notifyImpactedReplicas(nodeId, ClusterEventType.NODE_JOIN, clusterManager,
+ messageBroker, replicationStrategy);
+ }
+ clusterManager.updateNodePartitions(msg.getNodeId(), true);
+ if (msg.getNodeId().equals(metadataNodeId)) {
+ clusterManager.updateMetadataNode(metadataNodeId, true);
+ // When metadata node is active, it is the only hot standby replica
+ hotStandbyMetadataReplica.clear();
+ hotStandbyMetadataReplica.add(metadataNodeId);
+ }
+ clusterManager.refreshState();
+ } else {
+ LOGGER.log(Level.SEVERE, msg.getNodeId() + " failed to complete startup. ", msg.getException());
+ }
+ }
+
+ private List<INCLifecycleTask> buildNonParticipantStartupSequence(String nodeId, SystemState state) {
+ final List<INCLifecycleTask> tasks = new ArrayList<>();
+ if (state == SystemState.CORRUPTED) {
+ //need to perform local recovery for node partitions
+ LocalRecoveryTask rt = new LocalRecoveryTask(Arrays.asList(clusterManager.getNodePartitions(nodeId))
+ .stream().map(ClusterPartition::getPartitionId).collect(Collectors.toSet()));
+ tasks.add(rt);
+ }
+ tasks.add(new ExternalLibrarySetupTask(false));
+ tasks.add(new ReportMaxResourceIdTask());
+ tasks.add(new CheckpointTask());
+ tasks.add(new StartLifecycleComponentsTask());
+ return tasks;
+ }
+
+ private List<INCLifecycleTask> buildParticipantStartupSequence(String nodeId, SystemState state) {
+ final List<INCLifecycleTask> tasks = new ArrayList<>();
+ switch (state) {
+ case NEW_UNIVERSE:
+ // If the metadata node (or replica) failed and lost its data
+ // => Metadata Remote Recovery from standby replica
+ tasks.add(getMetadataPartitionRecoveryPlan());
+ // Checkpoint after remote recovery to move node to HEALTHY state
+ tasks.add(new CheckpointTask());
+ break;
+ case CORRUPTED:
+ // If the metadata node (or replica) failed and started again without losing data => Local Recovery
+ LocalRecoveryTask rt = new LocalRecoveryTask(Arrays.asList(clusterManager.getNodePartitions(nodeId))
+ .stream().map(ClusterPartition::getPartitionId).collect(Collectors.toSet()));
+ tasks.add(rt);
+ break;
+ case INITIAL_RUN:
+ case HEALTHY:
+ case RECOVERING:
+ break;
+ default:
+ break;
+ }
+ tasks.add(new StartReplicationServiceTask());
+ final boolean isMetadataNode = nodeId.equals(metadataNodeId);
+ if (isMetadataNode) {
+ tasks.add(new MetadataBootstrapTask());
+ }
+ tasks.add(new ExternalLibrarySetupTask(isMetadataNode));
+ tasks.add(new ReportMaxResourceIdTask());
+ tasks.add(new CheckpointTask());
+ tasks.add(new StartLifecycleComponentsTask());
+ if (isMetadataNode) {
+ tasks.add(new BindMetadataNodeTask(true));
+ }
+ return tasks;
+ }
+
+ private RemoteRecoveryTask getMetadataPartitionRecoveryPlan() {
+ if (hotStandbyMetadataReplica.isEmpty()) {
+ throw new IllegalStateException("No metadata replicas to recover from");
+ }
+ // Construct recovery plan: Node => Set of partitions to recover from it
+ Map<String, Set<Integer>> recoveryPlan = new HashMap<>();
+ // Recover metadata partition from any metadata hot standby replica
+ int metadataPartitionId = AppContextInfo.INSTANCE.getMetadataProperties().getMetadataPartition()
+ .getPartitionId();
+ Set<Integer> metadataPartition = new HashSet<>(Arrays.asList(metadataPartitionId));
+ recoveryPlan.put(hotStandbyMetadataReplica.iterator().next(), metadataPartition);
+ return new RemoteRecoveryTask(recoveryPlan);
+ }
+}
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/NoFaultToleranceStrategy.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/NoFaultToleranceStrategy.java
new file mode 100644
index 0000000..0ee4f6a
--- /dev/null
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/NoFaultToleranceStrategy.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.app.replication;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+import java.util.stream.Collectors;
+
+import org.apache.asterix.app.nc.task.BindMetadataNodeTask;
+import org.apache.asterix.app.nc.task.CheckpointTask;
+import org.apache.asterix.app.nc.task.ExternalLibrarySetupTask;
+import org.apache.asterix.app.nc.task.LocalRecoveryTask;
+import org.apache.asterix.app.nc.task.MetadataBootstrapTask;
+import org.apache.asterix.app.nc.task.ReportMaxResourceIdTask;
+import org.apache.asterix.app.nc.task.StartLifecycleComponentsTask;
+import org.apache.asterix.app.replication.message.NCLifecycleTaskReportMessage;
+import org.apache.asterix.app.replication.message.StartupTaskRequestMessage;
+import org.apache.asterix.app.replication.message.StartupTaskResponseMessage;
+import org.apache.asterix.common.api.INCLifecycleTask;
+import org.apache.asterix.common.cluster.ClusterPartition;
+import org.apache.asterix.common.cluster.IClusterStateManager;
+import org.apache.asterix.common.exceptions.ExceptionUtils;
+import org.apache.asterix.common.messaging.api.ICCMessageBroker;
+import org.apache.asterix.common.replication.IFaultToleranceStrategy;
+import org.apache.asterix.common.replication.INCLifecycleMessage;
+import org.apache.asterix.common.replication.IReplicationStrategy;
+import org.apache.asterix.common.transactions.IRecoveryManager.SystemState;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public class NoFaultToleranceStrategy implements IFaultToleranceStrategy {
+
+ private static final Logger LOGGER = Logger.getLogger(NoFaultToleranceStrategy.class.getName());
+ IClusterStateManager clusterManager;
+ private String metadataNodeId;
+ private ICCMessageBroker messageBroker;
+ private Set<String> pendingStartupCompletionNodes = new HashSet<>();
+
+ @Override
+ public void notifyNodeJoin(String nodeId) throws HyracksDataException {
+ pendingStartupCompletionNodes.add(nodeId);
+ }
+
+ @Override
+ public void notifyNodeFailure(String nodeId) throws HyracksDataException {
+ pendingStartupCompletionNodes.remove(nodeId);
+ clusterManager.updateNodePartitions(nodeId, false);
+ if (nodeId.equals(metadataNodeId)) {
+ clusterManager.updateMetadataNode(metadataNodeId, false);
+ }
+ clusterManager.refreshState();
+ }
+
+ @Override
+ public void process(INCLifecycleMessage message) throws HyracksDataException {
+ switch (message.getType()) {
+ case STARTUP_TASK_REQUEST:
+ process((StartupTaskRequestMessage) message);
+ break;
+ case STARTUP_TASK_RESULT:
+ process((NCLifecycleTaskReportMessage) message);
+ break;
+ default:
+ throw new HyracksDataException("Unsupported message type: " + message.getType().name());
+ }
+ }
+
+ @Override
+ public IFaultToleranceStrategy from(IReplicationStrategy replicationStrategy, ICCMessageBroker messageBroker) {
+ NoFaultToleranceStrategy ft = new NoFaultToleranceStrategy();
+ ft.messageBroker = messageBroker;
+ return ft;
+ }
+
+ @Override
+ public void bindTo(IClusterStateManager clusterManager) {
+ this.clusterManager = clusterManager;
+ metadataNodeId = clusterManager.getCurrentMetadataNodeId();
+ }
+
+ private void process(StartupTaskRequestMessage msg) throws HyracksDataException {
+ final String nodeId = msg.getNodeId();
+ List<INCLifecycleTask> tasks = buildNCStartupSequence(msg.getNodeId(), msg.getState());
+ StartupTaskResponseMessage response = new StartupTaskResponseMessage(nodeId, tasks);
+ try {
+ messageBroker.sendApplicationMessageToNC(response, msg.getNodeId());
+ } catch (Exception e) {
+ throw ExceptionUtils.convertToHyracksDataException(e);
+ }
+ }
+
+ private void process(NCLifecycleTaskReportMessage msg) throws HyracksDataException {
+ pendingStartupCompletionNodes.remove(msg.getNodeId());
+ if (msg.isSuccess()) {
+ clusterManager.updateNodePartitions(msg.getNodeId(), true);
+ if (msg.getNodeId().equals(metadataNodeId)) {
+ clusterManager.updateMetadataNode(metadataNodeId, true);
+ }
+ clusterManager.refreshState();
+ } else {
+ LOGGER.log(Level.SEVERE, msg.getNodeId() + " failed to complete startup. ", msg.getException());
+ }
+ }
+
+ private List<INCLifecycleTask> buildNCStartupSequence(String nodeId, SystemState state) {
+ final List<INCLifecycleTask> tasks = new ArrayList<>();
+ if (state == SystemState.CORRUPTED) {
+ //need to perform local recovery for node partitions
+ LocalRecoveryTask rt = new LocalRecoveryTask(Arrays.asList(clusterManager.getNodePartitions(nodeId))
+ .stream().map(ClusterPartition::getPartitionId).collect(Collectors.toSet()));
+ tasks.add(rt);
+ }
+ final boolean isMetadataNode = nodeId.equals(metadataNodeId);
+ if (isMetadataNode) {
+ tasks.add(new MetadataBootstrapTask());
+ }
+ tasks.add(new ExternalLibrarySetupTask(isMetadataNode));
+ tasks.add(new ReportMaxResourceIdTask());
+ tasks.add(new CheckpointTask());
+ tasks.add(new StartLifecycleComponentsTask());
+ if (isMetadataNode) {
+ tasks.add(new BindMetadataNodeTask(true));
+ }
+ return tasks;
+ }
+}
\ No newline at end of file
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/NodeFailbackPlan.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/NodeFailbackPlan.java
similarity index 96%
rename from asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/NodeFailbackPlan.java
rename to asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/NodeFailbackPlan.java
index 24f5fe8..1abc3f0 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/NodeFailbackPlan.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/NodeFailbackPlan.java
@@ -16,13 +16,16 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.asterix.runtime.message;
+package org.apache.asterix.app.replication;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
+import org.apache.asterix.app.replication.message.CompleteFailbackRequestMessage;
+import org.apache.asterix.app.replication.message.PreparePartitionsFailbackRequestMessage;
+
public class NodeFailbackPlan {
public enum FailbackPlanState {
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/CompleteFailbackRequestMessage.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/CompleteFailbackRequestMessage.java
similarity index 93%
rename from asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/CompleteFailbackRequestMessage.java
rename to asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/CompleteFailbackRequestMessage.java
index 2454800..2d423f9 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/CompleteFailbackRequestMessage.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/CompleteFailbackRequestMessage.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.asterix.runtime.message;
+package org.apache.asterix.app.replication.message;
import java.io.IOException;
import java.util.Set;
@@ -27,6 +27,7 @@
import org.apache.asterix.common.exceptions.ExceptionUtils;
import org.apache.asterix.common.messaging.api.INCMessageBroker;
import org.apache.asterix.common.replication.IRemoteRecoveryManager;
+import org.apache.asterix.runtime.message.AbstractFailbackPlanMessage;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.service.IControllerService;
import org.apache.hyracks.control.nc.NodeControllerService;
@@ -89,4 +90,9 @@
throw hde;
}
}
-}
+
+ @Override
+ public MessageType getType() {
+ return MessageType.COMPLETE_FAILBACK_REQUEST;
+ }
+}
\ No newline at end of file
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/CompleteFailbackResponseMessage.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/CompleteFailbackResponseMessage.java
similarity index 82%
rename from asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/CompleteFailbackResponseMessage.java
rename to asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/CompleteFailbackResponseMessage.java
index 07c366c..fb45892 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/CompleteFailbackResponseMessage.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/CompleteFailbackResponseMessage.java
@@ -16,14 +16,15 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.asterix.runtime.message;
+package org.apache.asterix.app.replication.message;
-import java.util.Set;
-
-import org.apache.asterix.runtime.utils.ClusterStateManager;
+import org.apache.asterix.runtime.message.AbstractFailbackPlanMessage;
+import org.apache.asterix.runtime.utils.AppContextInfo;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.service.IControllerService;
+import java.util.Set;
+
public class CompleteFailbackResponseMessage extends AbstractFailbackPlanMessage {
private static final long serialVersionUID = 1L;
@@ -49,6 +50,11 @@
@Override
public void handle(IControllerService cs) throws HyracksDataException, InterruptedException {
- ClusterStateManager.INSTANCE.processCompleteFailbackResponse(this);
+ AppContextInfo.INSTANCE.getFaultToleranceStrategy().process(this);
+ }
+
+ @Override
+ public MessageType getType() {
+ return MessageType.COMPLETE_FAILBACK_RESPONSE;
}
}
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/TakeoverPartitionsResponseMessage.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/NCLifecycleTaskReportMessage.java
similarity index 60%
copy from asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/TakeoverPartitionsResponseMessage.java
copy to asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/NCLifecycleTaskReportMessage.java
index 81492c2..3af075e 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/TakeoverPartitionsResponseMessage.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/NCLifecycleTaskReportMessage.java
@@ -16,45 +16,48 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.asterix.runtime.message;
+package org.apache.asterix.app.replication.message;
-import org.apache.asterix.common.messaging.api.IApplicationMessage;
-import org.apache.asterix.runtime.utils.ClusterStateManager;
+import org.apache.asterix.common.replication.INCLifecycleMessage;
+import org.apache.asterix.runtime.utils.AppContextInfo;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.service.IControllerService;
-public class TakeoverPartitionsResponseMessage implements IApplicationMessage {
+public class NCLifecycleTaskReportMessage implements INCLifecycleMessage {
private static final long serialVersionUID = 1L;
- private final Integer[] partitions;
private final String nodeId;
- private final long requestId;
+ private final boolean success;
+ private Exception exception;
- public TakeoverPartitionsResponseMessage(long requestId, String nodeId, Integer[] partitionsToTakeover) {
- this.requestId = requestId;
+ public NCLifecycleTaskReportMessage(String nodeId, boolean success) {
this.nodeId = nodeId;
- this.partitions = partitionsToTakeover;
+ this.success = success;
}
- public Integer[] getPartitions() {
- return partitions;
+ @Override
+ public void handle(IControllerService cs) throws HyracksDataException, InterruptedException {
+ AppContextInfo.INSTANCE.getFaultToleranceStrategy().process(this);
}
public String getNodeId() {
return nodeId;
}
- public long getRequestId() {
- return requestId;
+ public boolean isSuccess() {
+ return success;
+ }
+
+ public Exception getException() {
+ return exception;
+ }
+
+ public void setException(Exception exception) {
+ this.exception = exception;
}
@Override
- public void handle(IControllerService cs) throws HyracksDataException, InterruptedException {
- ClusterStateManager.INSTANCE.processPartitionTakeoverResponse(this);
+ public MessageType getType() {
+ return MessageType.STARTUP_TASK_RESULT;
}
-
- @Override
- public String toString() {
- return TakeoverPartitionsResponseMessage.class.getSimpleName();
- }
-}
+}
\ No newline at end of file
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/PreparePartitionsFailbackRequestMessage.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/PreparePartitionsFailbackRequestMessage.java
similarity index 95%
rename from asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/PreparePartitionsFailbackRequestMessage.java
rename to asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/PreparePartitionsFailbackRequestMessage.java
index 985c741..2104f9c 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/PreparePartitionsFailbackRequestMessage.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/PreparePartitionsFailbackRequestMessage.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.asterix.runtime.message;
+package org.apache.asterix.app.replication.message;
import java.rmi.RemoteException;
import java.util.Set;
@@ -26,6 +26,7 @@
import org.apache.asterix.common.api.IAppRuntimeContext;
import org.apache.asterix.common.exceptions.ExceptionUtils;
import org.apache.asterix.common.messaging.api.INCMessageBroker;
+import org.apache.asterix.runtime.message.AbstractFailbackPlanMessage;
import org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.service.IControllerService;
@@ -116,4 +117,9 @@
throw ExceptionUtils.convertToHyracksDataException(e);
}
}
+
+ @Override
+ public MessageType getType() {
+ return MessageType.PREPARE_FAILBACK_REQUEST;
+ }
}
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/PreparePartitionsFailbackResponseMessage.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/PreparePartitionsFailbackResponseMessage.java
similarity index 82%
rename from asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/PreparePartitionsFailbackResponseMessage.java
rename to asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/PreparePartitionsFailbackResponseMessage.java
index c655ecd..e02cd42 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/PreparePartitionsFailbackResponseMessage.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/PreparePartitionsFailbackResponseMessage.java
@@ -16,14 +16,15 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.asterix.runtime.message;
+package org.apache.asterix.app.replication.message;
-import java.util.Set;
-
-import org.apache.asterix.runtime.utils.ClusterStateManager;
+import org.apache.asterix.runtime.message.AbstractFailbackPlanMessage;
+import org.apache.asterix.runtime.utils.AppContextInfo;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.service.IControllerService;
+import java.util.Set;
+
public class PreparePartitionsFailbackResponseMessage extends AbstractFailbackPlanMessage {
private static final long serialVersionUID = 1L;
@@ -40,11 +41,16 @@
@Override
public void handle(IControllerService cs) throws HyracksDataException, InterruptedException {
- ClusterStateManager.INSTANCE.processPreparePartitionsFailbackResponse(this);
+ AppContextInfo.INSTANCE.getFaultToleranceStrategy().process(this);
}
@Override
public String toString() {
return PreparePartitionsFailbackResponseMessage.class.getSimpleName() + " " + partitions.toString();
}
+
+ @Override
+ public MessageType getType() {
+ return MessageType.PREPARE_FAILBACK_RESPONSE;
+ }
}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/ReplayPartitionLogsRequestMessage.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/ReplayPartitionLogsRequestMessage.java
new file mode 100644
index 0000000..96ae8be
--- /dev/null
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/ReplayPartitionLogsRequestMessage.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.app.replication.message;
+
+import java.util.Set;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.asterix.common.api.IAppRuntimeContext;
+import org.apache.asterix.common.exceptions.ExceptionUtils;
+import org.apache.asterix.common.messaging.api.INCMessageBroker;
+import org.apache.asterix.common.replication.INCLifecycleMessage;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.service.IControllerService;
+import org.apache.hyracks.control.nc.NodeControllerService;
+
+public class ReplayPartitionLogsRequestMessage implements INCLifecycleMessage {
+
+ private static final Logger LOGGER = Logger.getLogger(ReplayPartitionLogsRequestMessage.class.getName());
+ private static final long serialVersionUID = 1L;
+ private final Set<Integer> partitions;
+
+ public ReplayPartitionLogsRequestMessage(Set<Integer> partitions) {
+ this.partitions = partitions;
+ }
+
+ @Override
+ public void handle(IControllerService cs) throws HyracksDataException, InterruptedException {
+ NodeControllerService ncs = (NodeControllerService) cs;
+ IAppRuntimeContext appContext = (IAppRuntimeContext) ncs.getApplicationContext().getApplicationObject();
+ // Replay the logs for these partitions and flush any impacted dataset
+ appContext.getRemoteRecoveryManager().replayReplicaPartitionLogs(partitions, true);
+
+ INCMessageBroker broker = (INCMessageBroker) ncs.getApplicationContext().getMessageBroker();
+ ReplayPartitionLogsResponseMessage reponse = new ReplayPartitionLogsResponseMessage(ncs.getId(), partitions);
+ try {
+ broker.sendMessageToCC(reponse);
+ } catch (Exception e) {
+ LOGGER.log(Level.SEVERE, "Failed sending message to cc", e);
+ throw ExceptionUtils.convertToHyracksDataException(e);
+ }
+ }
+
+ @Override
+ public MessageType getType() {
+ return MessageType.REPLAY_LOGS_REQUEST;
+ }
+}
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/PreparePartitionsFailbackResponseMessage.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/ReplayPartitionLogsResponseMessage.java
similarity index 67%
copy from asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/PreparePartitionsFailbackResponseMessage.java
copy to asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/ReplayPartitionLogsResponseMessage.java
index c655ecd..dc19735 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/PreparePartitionsFailbackResponseMessage.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/ReplayPartitionLogsResponseMessage.java
@@ -16,35 +16,41 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.asterix.runtime.message;
+package org.apache.asterix.app.replication.message;
-import java.util.Set;
-
-import org.apache.asterix.runtime.utils.ClusterStateManager;
+import org.apache.asterix.common.replication.INCLifecycleMessage;
+import org.apache.asterix.runtime.utils.AppContextInfo;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.service.IControllerService;
-public class PreparePartitionsFailbackResponseMessage extends AbstractFailbackPlanMessage {
+import java.util.Set;
+
+public class ReplayPartitionLogsResponseMessage implements INCLifecycleMessage {
private static final long serialVersionUID = 1L;
private final Set<Integer> partitions;
+ private final String nodeId;
- public PreparePartitionsFailbackResponseMessage(long planId, int requestId, Set<Integer> partitions) {
- super(planId, requestId);
+ public ReplayPartitionLogsResponseMessage(String nodeId, Set<Integer> partitions) {
this.partitions = partitions;
+ this.nodeId = nodeId;
+ }
+
+ @Override
+ public void handle(IControllerService cs) throws HyracksDataException, InterruptedException {
+ AppContextInfo.INSTANCE.getFaultToleranceStrategy().process(this);
}
public Set<Integer> getPartitions() {
return partitions;
}
- @Override
- public void handle(IControllerService cs) throws HyracksDataException, InterruptedException {
- ClusterStateManager.INSTANCE.processPreparePartitionsFailbackResponse(this);
+ public String getNodeId() {
+ return nodeId;
}
@Override
- public String toString() {
- return PreparePartitionsFailbackResponseMessage.class.getSimpleName() + " " + partitions.toString();
+ public MessageType getType() {
+ return MessageType.REPLAY_LOGS_RESPONSE;
}
-}
+}
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/StartupTaskRequestMessage.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/StartupTaskRequestMessage.java
new file mode 100644
index 0000000..be42a9d
--- /dev/null
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/StartupTaskRequestMessage.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.app.replication.message;
+
+import org.apache.asterix.common.exceptions.ExceptionUtils;
+import org.apache.asterix.common.messaging.api.INCMessageBroker;
+import org.apache.asterix.common.replication.INCLifecycleMessage;
+import org.apache.asterix.common.transactions.IRecoveryManager.SystemState;
+import org.apache.asterix.runtime.utils.AppContextInfo;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.service.IControllerService;
+import org.apache.hyracks.control.nc.NodeControllerService;
+
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+public class StartupTaskRequestMessage implements INCLifecycleMessage {
+
+ private static final Logger LOGGER = Logger.getLogger(StartupTaskRequestMessage.class.getName());
+ private static final long serialVersionUID = 1L;
+ private final SystemState state;
+ private final String nodeId;
+
+ public StartupTaskRequestMessage(String nodeId, SystemState state) {
+ this.state = state;
+ this.nodeId = nodeId;
+ }
+
+ public static void send(NodeControllerService cs, SystemState systemState) throws HyracksDataException {
+ try {
+ StartupTaskRequestMessage msg = new StartupTaskRequestMessage(cs.getId(), systemState);
+ ((INCMessageBroker) cs.getApplicationContext().getMessageBroker()).sendMessageToCC(msg);
+ } catch (Exception e) {
+ LOGGER.log(Level.SEVERE, "Unable to send StartupTaskRequestMessage to CC", e);
+ throw ExceptionUtils.convertToHyracksDataException(e);
+ }
+ }
+
+ @Override
+ public void handle(IControllerService cs) throws HyracksDataException, InterruptedException {
+ AppContextInfo.INSTANCE.getFaultToleranceStrategy().process(this);
+ }
+
+ public SystemState getState() {
+ return state;
+ }
+
+ public String getNodeId() {
+ return nodeId;
+ }
+
+ @Override
+ public MessageType getType() {
+ return MessageType.STARTUP_TASK_REQUEST;
+ }
+}
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/StartupTaskResponseMessage.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/StartupTaskResponseMessage.java
new file mode 100644
index 0000000..6a72776
--- /dev/null
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/StartupTaskResponseMessage.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.app.replication.message;
+
+import java.util.List;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.asterix.common.api.INCLifecycleTask;
+import org.apache.asterix.common.exceptions.ExceptionUtils;
+import org.apache.asterix.common.messaging.api.INCMessageBroker;
+import org.apache.asterix.common.replication.INCLifecycleMessage;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.service.IControllerService;
+import org.apache.hyracks.control.nc.NodeControllerService;
+
+public class StartupTaskResponseMessage implements INCLifecycleMessage {
+
+ private static final Logger LOGGER = Logger.getLogger(StartupTaskResponseMessage.class.getName());
+ private static final long serialVersionUID = 1L;
+ private final String nodeId;
+ private final List<INCLifecycleTask> tasks;
+
+ public StartupTaskResponseMessage(String nodeId, List<INCLifecycleTask> tasks) {
+ this.nodeId = nodeId;
+ this.tasks = tasks;
+ }
+
+ @Override
+ public void handle(IControllerService cs) throws HyracksDataException, InterruptedException {
+ NodeControllerService ncs = (NodeControllerService) cs;
+ INCMessageBroker broker = (INCMessageBroker) ncs.getApplicationContext().getMessageBroker();
+ boolean success = true;
+ HyracksDataException exception = null;
+ try {
+ for (INCLifecycleTask task : tasks) {
+ task.perform(cs);
+ }
+ } catch (HyracksDataException e) {
+ success = false;
+ exception = e;
+ }
+ NCLifecycleTaskReportMessage result = new NCLifecycleTaskReportMessage(nodeId, success);
+ result.setException(exception);
+ try {
+ broker.sendMessageToCC(result);
+ } catch (Exception e) {
+ LOGGER.log(Level.SEVERE, "Failed sending message to cc", e);
+ throw ExceptionUtils.convertToHyracksDataException(e);
+ }
+ }
+
+ public String getNodeId() {
+ return nodeId;
+ }
+
+ @Override
+ public MessageType getType() {
+ return MessageType.STARTUP_TASK_RESPONSE;
+ }
+}
\ No newline at end of file
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/TakeoverMetadataNodeRequestMessage.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/TakeoverMetadataNodeRequestMessage.java
similarity index 89%
rename from asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/TakeoverMetadataNodeRequestMessage.java
rename to asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/TakeoverMetadataNodeRequestMessage.java
index 1e67052..8ce12dd 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/TakeoverMetadataNodeRequestMessage.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/TakeoverMetadataNodeRequestMessage.java
@@ -16,20 +16,20 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.asterix.runtime.message;
+package org.apache.asterix.app.replication.message;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.apache.asterix.common.api.IAppRuntimeContext;
import org.apache.asterix.common.exceptions.ExceptionUtils;
-import org.apache.asterix.common.messaging.api.IApplicationMessage;
import org.apache.asterix.common.messaging.api.INCMessageBroker;
+import org.apache.asterix.common.replication.INCLifecycleMessage;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.service.IControllerService;
import org.apache.hyracks.control.nc.NodeControllerService;
-public class TakeoverMetadataNodeRequestMessage implements IApplicationMessage {
+public class TakeoverMetadataNodeRequestMessage implements INCLifecycleMessage {
private static final long serialVersionUID = 1L;
private static final Logger LOGGER = Logger.getLogger(TakeoverMetadataNodeRequestMessage.class.getName());
@@ -66,4 +66,9 @@
public String toString() {
return TakeoverMetadataNodeRequestMessage.class.getSimpleName();
}
-}
+
+ @Override
+ public MessageType getType() {
+ return MessageType.TAKEOVER_METADATA_NODE_REQUEST;
+ }
+}
\ No newline at end of file
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/TakeoverMetadataNodeResponseMessage.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/TakeoverMetadataNodeResponseMessage.java
similarity index 75%
rename from asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/TakeoverMetadataNodeResponseMessage.java
rename to asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/TakeoverMetadataNodeResponseMessage.java
index d8b2136..428047c 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/TakeoverMetadataNodeResponseMessage.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/TakeoverMetadataNodeResponseMessage.java
@@ -16,14 +16,14 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.asterix.runtime.message;
+package org.apache.asterix.app.replication.message;
-import org.apache.asterix.common.messaging.api.IApplicationMessage;
-import org.apache.asterix.runtime.utils.ClusterStateManager;
+import org.apache.asterix.common.replication.INCLifecycleMessage;
+import org.apache.asterix.runtime.utils.AppContextInfo;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.service.IControllerService;
-public class TakeoverMetadataNodeResponseMessage implements IApplicationMessage {
+public class TakeoverMetadataNodeResponseMessage implements INCLifecycleMessage {
private static final long serialVersionUID = 1L;
private final String nodeId;
@@ -38,11 +38,16 @@
@Override
public void handle(IControllerService cs) throws HyracksDataException, InterruptedException {
- ClusterStateManager.INSTANCE.processMetadataNodeTakeoverResponse(this);
+ AppContextInfo.INSTANCE.getFaultToleranceStrategy().process(this);
}
@Override
public String toString() {
return TakeoverMetadataNodeResponseMessage.class.getSimpleName();
}
+
+ @Override
+ public MessageType getType() {
+ return MessageType.TAKEOVER_METADATA_NODE_RESPONSE;
+ }
}
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/TakeoverPartitionsRequestMessage.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/TakeoverPartitionsRequestMessage.java
similarity index 90%
rename from asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/TakeoverPartitionsRequestMessage.java
rename to asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/TakeoverPartitionsRequestMessage.java
index fb8a33b..4e415de 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/TakeoverPartitionsRequestMessage.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/TakeoverPartitionsRequestMessage.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.asterix.runtime.message;
+package org.apache.asterix.app.replication.message;
import java.io.IOException;
import java.util.logging.Level;
@@ -25,14 +25,14 @@
import org.apache.asterix.common.api.IAppRuntimeContext;
import org.apache.asterix.common.exceptions.ACIDException;
import org.apache.asterix.common.exceptions.ExceptionUtils;
-import org.apache.asterix.common.messaging.api.IApplicationMessage;
import org.apache.asterix.common.messaging.api.INCMessageBroker;
+import org.apache.asterix.common.replication.INCLifecycleMessage;
import org.apache.asterix.common.replication.IRemoteRecoveryManager;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.service.IControllerService;
import org.apache.hyracks.control.nc.NodeControllerService;
-public class TakeoverPartitionsRequestMessage implements IApplicationMessage {
+public class TakeoverPartitionsRequestMessage implements INCLifecycleMessage {
private static final long serialVersionUID = 1L;
private static final Logger LOGGER = Logger.getLogger(TakeoverPartitionsRequestMessage.class.getName());
@@ -76,8 +76,7 @@
@Override
public void handle(IControllerService cs) throws HyracksDataException, InterruptedException {
NodeControllerService ncs = (NodeControllerService) cs;
- IAppRuntimeContext appContext =
- (IAppRuntimeContext) ncs.getApplicationContext().getApplicationObject();
+ IAppRuntimeContext appContext = (IAppRuntimeContext) ncs.getApplicationContext().getApplicationObject();
INCMessageBroker broker = (INCMessageBroker) ncs.getApplicationContext().getMessageBroker();
//if the NC is shutting down, it should ignore takeover partitions request
if (!appContext.isShuttingdown()) {
@@ -104,4 +103,9 @@
}
}
}
-}
+
+ @Override
+ public MessageType getType() {
+ return MessageType.TAKEOVER_PARTITION_REQUEST;
+ }
+}
\ No newline at end of file
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/TakeoverPartitionsResponseMessage.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/TakeoverPartitionsResponseMessage.java
similarity index 79%
rename from asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/TakeoverPartitionsResponseMessage.java
rename to asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/TakeoverPartitionsResponseMessage.java
index 81492c2..e653a64 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/TakeoverPartitionsResponseMessage.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/TakeoverPartitionsResponseMessage.java
@@ -16,14 +16,14 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.asterix.runtime.message;
+package org.apache.asterix.app.replication.message;
-import org.apache.asterix.common.messaging.api.IApplicationMessage;
-import org.apache.asterix.runtime.utils.ClusterStateManager;
+import org.apache.asterix.common.replication.INCLifecycleMessage;
+import org.apache.asterix.runtime.utils.AppContextInfo;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.service.IControllerService;
-public class TakeoverPartitionsResponseMessage implements IApplicationMessage {
+public class TakeoverPartitionsResponseMessage implements INCLifecycleMessage {
private static final long serialVersionUID = 1L;
private final Integer[] partitions;
@@ -50,11 +50,16 @@
@Override
public void handle(IControllerService cs) throws HyracksDataException, InterruptedException {
- ClusterStateManager.INSTANCE.processPartitionTakeoverResponse(this);
+ AppContextInfo.INSTANCE.getFaultToleranceStrategy().process(this);
}
@Override
public String toString() {
return TakeoverPartitionsResponseMessage.class.getSimpleName();
}
+
+ @Override
+ public MessageType getType() {
+ return MessageType.TAKEOVER_PARTITION_RESPONSE;
+ }
}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplicationEntryPoint.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplicationEntryPoint.java
index 62eb250..f3182cf 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplicationEntryPoint.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplicationEntryPoint.java
@@ -18,41 +18,23 @@
*/
package org.apache.asterix.hyracks.bootstrap;
-import static org.apache.asterix.api.http.servlet.ServletConstants.ASTERIX_BUILD_PROP_ATTR;
-import static org.apache.asterix.api.http.servlet.ServletConstants.HYRACKS_CONNECTION_ATTR;
-
-import java.util.List;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
import org.apache.asterix.active.ActiveLifecycleListener;
-import org.apache.asterix.api.http.server.ApiServlet;
-import org.apache.asterix.api.http.server.ClusterApiServlet;
-import org.apache.asterix.api.http.server.ClusterControllerDetailsApiServlet;
-import org.apache.asterix.api.http.server.ConnectorApiServlet;
-import org.apache.asterix.api.http.server.DdlApiServlet;
-import org.apache.asterix.api.http.server.DiagnosticsApiServlet;
-import org.apache.asterix.api.http.server.FeedServlet;
-import org.apache.asterix.api.http.server.FullApiServlet;
-import org.apache.asterix.api.http.server.NodeControllerDetailsApiServlet;
-import org.apache.asterix.api.http.server.QueryApiServlet;
-import org.apache.asterix.api.http.server.QueryResultApiServlet;
-import org.apache.asterix.api.http.server.QueryServiceServlet;
-import org.apache.asterix.api.http.server.QueryStatusApiServlet;
-import org.apache.asterix.api.http.server.QueryWebInterfaceServlet;
-import org.apache.asterix.api.http.server.ShutdownApiServlet;
-import org.apache.asterix.api.http.server.UpdateApiServlet;
-import org.apache.asterix.api.http.server.VersionApiServlet;
+import org.apache.asterix.api.http.server.*;
import org.apache.asterix.api.http.servlet.ServletConstants;
import org.apache.asterix.app.cc.CCExtensionManager;
import org.apache.asterix.app.cc.ResourceIdManager;
import org.apache.asterix.app.external.ExternalLibraryUtils;
+import org.apache.asterix.app.replication.FaultToleranceStrategyFactory;
import org.apache.asterix.common.api.AsterixThreadFactory;
import org.apache.asterix.common.config.AsterixExtension;
+import org.apache.asterix.common.config.ClusterProperties;
import org.apache.asterix.common.config.ExternalProperties;
import org.apache.asterix.common.config.MetadataProperties;
import org.apache.asterix.common.context.IStorageComponentProvider;
import org.apache.asterix.common.library.ILibraryManager;
+import org.apache.asterix.common.messaging.api.ICCMessageBroker;
+import org.apache.asterix.common.replication.IFaultToleranceStrategy;
+import org.apache.asterix.common.replication.IReplicationStrategy;
import org.apache.asterix.common.utils.Servlets;
import org.apache.asterix.external.library.ExternalLibraryManager;
import org.apache.asterix.file.StorageComponentProvider;
@@ -70,13 +52,19 @@
import org.apache.hyracks.api.client.IHyracksClientConnection;
import org.apache.hyracks.api.job.resource.IJobCapacityController;
import org.apache.hyracks.api.lifecycle.LifeCycleComponentManager;
-import org.apache.hyracks.api.messages.IMessageBroker;
import org.apache.hyracks.control.cc.ClusterControllerService;
import org.apache.hyracks.control.common.controllers.CCConfig;
import org.apache.hyracks.http.api.IServlet;
import org.apache.hyracks.http.server.HttpServer;
import org.apache.hyracks.http.server.WebManager;
+import java.util.List;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import static org.apache.asterix.api.http.servlet.ServletConstants.ASTERIX_BUILD_PROP_ATTR;
+import static org.apache.asterix.api.http.servlet.ServletConstants.HYRACKS_CONNECTION_ATTR;
+
public class CCApplicationEntryPoint implements ICCApplicationEntryPoint {
private static final Logger LOGGER = Logger.getLogger(CCApplicationEntryPoint.class.getName());
@@ -90,7 +78,7 @@
@Override
public void start(ICCApplicationContext ccAppCtx, String[] args) throws Exception {
final ClusterControllerService controllerService = (ClusterControllerService) ccAppCtx.getControllerService();
- IMessageBroker messageBroker = new CCMessageBroker(controllerService);
+ ICCMessageBroker messageBroker = new CCMessageBroker(controllerService);
this.appCtx = ccAppCtx;
if (LOGGER.isLoggable(Level.INFO)) {
@@ -100,11 +88,14 @@
appCtx.setThreadFactory(new AsterixThreadFactory(appCtx.getThreadFactory(), new LifeCycleComponentManager()));
ILibraryManager libraryManager = new ExternalLibraryManager();
ResourceIdManager resourceIdManager = new ResourceIdManager();
+ IReplicationStrategy repStrategy = ClusterProperties.INSTANCE.getReplicationStrategy();
+ IFaultToleranceStrategy ftStrategy = FaultToleranceStrategyFactory
+ .create(ClusterProperties.INSTANCE.getCluster(), repStrategy, messageBroker);
ExternalLibraryUtils.setUpExternaLibraries(libraryManager, false);
componentProvider = new StorageComponentProvider();
GlobalRecoveryManager.instantiate((HyracksConnection) getNewHyracksClientConnection(), componentProvider);
AppContextInfo.initialize(appCtx, getNewHyracksClientConnection(), libraryManager, resourceIdManager,
- () -> MetadataManager.INSTANCE, GlobalRecoveryManager.instance());
+ () -> MetadataManager.INSTANCE, GlobalRecoveryManager.instance(), ftStrategy);
ccExtensionManager = new CCExtensionManager(getExtensions());
AppContextInfo.INSTANCE.setExtensionManager(ccExtensionManager);
final CCConfig ccConfig = controllerService.getCCConfig();
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java
index bc270df..ce57648 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java
@@ -18,27 +18,17 @@
*/
package org.apache.asterix.hyracks.bootstrap;
-import java.io.File;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import org.apache.asterix.app.external.ExternalLibraryUtils;
import org.apache.asterix.app.nc.NCAppRuntimeContext;
+import org.apache.asterix.app.replication.message.StartupTaskRequestMessage;
import org.apache.asterix.common.api.AsterixThreadFactory;
import org.apache.asterix.common.api.IAppRuntimeContext;
import org.apache.asterix.common.config.AsterixExtension;
-import org.apache.asterix.common.config.MetadataProperties;
-import org.apache.asterix.common.config.StorageProperties;
-import org.apache.asterix.common.config.TransactionProperties;
import org.apache.asterix.common.config.ClusterProperties;
import org.apache.asterix.common.config.IPropertiesProvider;
import org.apache.asterix.common.config.MessagingProperties;
-import org.apache.asterix.common.replication.IRemoteRecoveryManager;
-import org.apache.asterix.common.transactions.ICheckpointManager;
+import org.apache.asterix.common.config.MetadataProperties;
+import org.apache.asterix.common.config.StorageProperties;
+import org.apache.asterix.common.config.TransactionProperties;
import org.apache.asterix.common.transactions.IRecoveryManager;
import org.apache.asterix.common.transactions.IRecoveryManager.SystemState;
import org.apache.asterix.common.utils.PrintUtil;
@@ -47,39 +37,39 @@
import org.apache.asterix.event.schema.cluster.Node;
import org.apache.asterix.messaging.MessagingChannelInterfaceFactory;
import org.apache.asterix.messaging.NCMessageBroker;
-import org.apache.asterix.runtime.message.ReportMaxResourceIdMessage;
import org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository;
import org.apache.commons.io.FileUtils;
import org.apache.hyracks.api.application.INCApplicationContext;
import org.apache.hyracks.api.application.INCApplicationEntryPoint;
import org.apache.hyracks.api.job.resource.NodeCapacity;
-import org.apache.hyracks.api.lifecycle.ILifeCycleComponentManager;
-import org.apache.hyracks.api.lifecycle.LifeCycleComponentManager;
import org.apache.hyracks.api.messages.IMessageBroker;
import org.apache.hyracks.control.nc.NodeControllerService;
import org.kohsuke.args4j.CmdLineException;
import org.kohsuke.args4j.CmdLineParser;
import org.kohsuke.args4j.Option;
+import java.io.File;
+import java.util.Collections;
+import java.util.List;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
public class NCApplicationEntryPoint implements INCApplicationEntryPoint {
private static final Logger LOGGER = Logger.getLogger(NCApplicationEntryPoint.class.getName());
- @Option(name = "-initial-run",
- usage = "A flag indicating if it's the first time the NC is started (default: false)", required = false)
+ @Option(name = "-initial-run", usage = "A flag indicating if it's the first time the NC is started "
+ + "(default: false)", required = false)
public boolean initialRun = false;
- @Option(name = "-virtual-NC",
- usage = "A flag indicating if this NC is running on virtual cluster " + "(default: false)",
- required = false)
+ @Option(name = "-virtual-NC", usage = "A flag indicating if this NC is running on virtual cluster "
+ + "(default: false)", required = false)
public boolean virtualNC = false;
private INCApplicationContext ncApplicationContext = null;
private IAppRuntimeContext runtimeContext;
private String nodeId;
- private boolean isMetadataNode = false;
private boolean stopInitiated = false;
- private SystemState systemState = SystemState.NEW_UNIVERSE;
- private boolean pendingFailbackCompletion = false;
+ private SystemState systemState;
private IMessageBroker messageBroker;
@Override
@@ -93,8 +83,8 @@
parser.printUsage(System.err);
throw e;
}
- ncAppCtx.setThreadFactory(new AsterixThreadFactory(ncAppCtx.getThreadFactory(),
- ncAppCtx.getLifeCycleComponentManager()));
+ ncAppCtx.setThreadFactory(
+ new AsterixThreadFactory(ncAppCtx.getThreadFactory(), ncAppCtx.getLifeCycleComponentManager()));
ncApplicationContext = ncAppCtx;
nodeId = ncApplicationContext.getNodeId();
if (LOGGER.isLoggable(Level.INFO)) {
@@ -104,12 +94,11 @@
final NodeControllerService controllerService = (NodeControllerService) ncAppCtx.getControllerService();
if (System.getProperty("java.rmi.server.hostname") == null) {
- System.setProperty("java.rmi.server.hostname", (controllerService)
- .getConfiguration().clusterNetPublicIPAddress);
+ System.setProperty("java.rmi.server.hostname",
+ (controllerService).getConfiguration().clusterNetPublicIPAddress);
}
runtimeContext = new NCAppRuntimeContext(ncApplicationContext, getExtensions());
- MetadataProperties metadataProperties = ((IPropertiesProvider) runtimeContext)
- .getMetadataProperties();
+ MetadataProperties metadataProperties = ((IPropertiesProvider) runtimeContext).getMetadataProperties();
if (!metadataProperties.getNodeNames().contains(ncApplicationContext.getNodeId())) {
if (LOGGER.isLoggable(Level.INFO)) {
LOGGER.info("Substitute node joining : " + ncApplicationContext.getNodeId());
@@ -118,69 +107,35 @@
}
runtimeContext.initialize(initialRun);
ncApplicationContext.setApplicationObject(runtimeContext);
- MessagingProperties messagingProperties = ((IPropertiesProvider) runtimeContext)
- .getMessagingProperties();
+ MessagingProperties messagingProperties = ((IPropertiesProvider) runtimeContext).getMessagingProperties();
messageBroker = new NCMessageBroker(controllerService, messagingProperties);
ncApplicationContext.setMessageBroker(messageBroker);
MessagingChannelInterfaceFactory interfaceFactory = new MessagingChannelInterfaceFactory(
(NCMessageBroker) messageBroker, messagingProperties);
ncApplicationContext.setMessagingChannelInterfaceFactory(interfaceFactory);
- boolean replicationEnabled = ClusterProperties.INSTANCE.isReplicationEnabled();
- boolean autoFailover = ClusterProperties.INSTANCE.isAutoFailoverEnabled();
- if (initialRun) {
- LOGGER.info("System is being initialized. (first run)");
- } else {
- IRecoveryManager recoveryMgr = runtimeContext.getTransactionSubsystem().getRecoveryManager();
- systemState = recoveryMgr.getSystemState();
+ IRecoveryManager recoveryMgr = runtimeContext.getTransactionSubsystem().getRecoveryManager();
+ systemState = recoveryMgr.getSystemState();
+ if (systemState == SystemState.NEW_UNIVERSE) {
if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("System is in a state: " + systemState);
+ LOGGER.info("System state: " + SystemState.NEW_UNIVERSE);
+ LOGGER.info("Node ID: " + nodeId);
+ LOGGER.info("Stores: " + PrintUtil.toString(metadataProperties.getStores()));
+ LOGGER.info("Root Metadata Store: " + metadataProperties.getStores().get(nodeId)[0]);
}
-
- //do not attempt to perform remote recovery if this is a virtual NC
- if (autoFailover && !virtualNC) {
- if (systemState == SystemState.NEW_UNIVERSE || systemState == SystemState.CORRUPTED) {
- //Start failback process
- IRemoteRecoveryManager remoteRecoveryMgr = runtimeContext.getRemoteRecoveryManager();
- remoteRecoveryMgr.startFailbackProcess();
- systemState = SystemState.RECOVERING;
- pendingFailbackCompletion = true;
- }
- } else {
- //recover if the system is corrupted by checking system state.
- if (systemState == SystemState.CORRUPTED) {
- recoveryMgr.startRecovery(true);
- }
- }
+ PersistentLocalResourceRepository localResourceRepository =
+ (PersistentLocalResourceRepository) runtimeContext.getLocalResourceRepository();
+ localResourceRepository.initializeNewUniverse(ClusterProperties.INSTANCE.getStorageDirectoryName());
}
- /**
- * if the node pending failback completion, the replication channel
- * should not be opened to avoid other nodes connecting to it before
- * the node completes its failback. CC will notify other replicas once
- * this node is ready to receive replication requests.
- */
- if (replicationEnabled && !pendingFailbackCompletion) {
- startReplicationService();
- }
+ performLocalCleanUp();
}
protected List<AsterixExtension> getExtensions() {
return Collections.emptyList();
}
- private void startReplicationService() throws InterruptedException {
- //Open replication channel
- runtimeContext.getReplicationChannel().start();
-
- //Check the state of remote replicas
- runtimeContext.getReplicationManager().initializeReplicasState();
-
- //Start replication after the state of remote replicas has been initialized.
- runtimeContext.getReplicationManager().startReplicationThreads();
- }
-
@Override
public void stop() throws Exception {
if (!stopInitiated) {
@@ -204,63 +159,13 @@
@Override
public void notifyStartupComplete() throws Exception {
- //Send max resource id on this NC to the CC
- ReportMaxResourceIdMessage.send((NodeControllerService) ncApplicationContext.getControllerService());
- MetadataProperties metadataProperties = ((IPropertiesProvider) runtimeContext)
- .getMetadataProperties();
- if (initialRun || systemState == SystemState.NEW_UNIVERSE) {
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("System state: " + SystemState.NEW_UNIVERSE);
- LOGGER.info("Node ID: " + nodeId);
- LOGGER.info("Stores: " + PrintUtil.toString(metadataProperties.getStores()));
- LOGGER.info("Root Metadata Store: " + metadataProperties.getStores().get(nodeId)[0]);
- }
-
- PersistentLocalResourceRepository localResourceRepository =
- (PersistentLocalResourceRepository) runtimeContext
- .getLocalResourceRepository();
- localResourceRepository.initializeNewUniverse(ClusterProperties.INSTANCE.getStorageDirectoryName());
+ // Since we don't pass initial run flag in AsterixHyracksIntegrationUtil, we use the virtualNC flag
+ if (systemState == SystemState.NEW_UNIVERSE && (initialRun || virtualNC)) {
+ systemState = SystemState.INITIAL_RUN;
}
-
- isMetadataNode = nodeId.equals(metadataProperties.getMetadataNodeName());
- if (isMetadataNode && !pendingFailbackCompletion) {
- runtimeContext.initializeMetadata(systemState == SystemState.NEW_UNIVERSE);
- }
- ExternalLibraryUtils.setUpExternaLibraries(runtimeContext.getLibraryManager(),
- isMetadataNode && !pendingFailbackCompletion);
-
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("Starting lifecycle components");
- }
-
- Map<String, String> lifecycleMgmtConfiguration = new HashMap<String, String>();
- String dumpPathKey = LifeCycleComponentManager.Config.DUMP_PATH_KEY;
- String dumpPath = metadataProperties.getCoredumpPath(nodeId);
- lifecycleMgmtConfiguration.put(dumpPathKey, dumpPath);
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("Coredump directory for NC is: " + dumpPath);
- }
- ILifeCycleComponentManager lccm = ncApplicationContext.getLifeCycleComponentManager();
- lccm.configure(lifecycleMgmtConfiguration);
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("Configured:" + lccm);
- }
- ncApplicationContext.setStateDumpHandler(
- new AsterixStateDumpHandler(ncApplicationContext.getNodeId(), lccm.getDumpPath(), lccm));
-
- lccm.startAll();
-
- if (!pendingFailbackCompletion) {
- ICheckpointManager checkpointMgr = runtimeContext.getTransactionSubsystem().getCheckpointManager();
- checkpointMgr.doSharpCheckpoint();
-
- if (isMetadataNode) {
- runtimeContext.exportMetadataNodeStub();
- }
- }
-
- //Clean any temporary files
- performLocalCleanUp();
+ // Request startup tasks from CC
+ StartupTaskRequestMessage.send((NodeControllerService) ncApplicationContext.getControllerService(),
+ systemState);
}
@Override
@@ -296,8 +201,7 @@
}
private void updateOnNodeJoin() {
- MetadataProperties metadataProperties = ((IPropertiesProvider) runtimeContext)
- .getMetadataProperties();
+ MetadataProperties metadataProperties = ((IPropertiesProvider) runtimeContext).getMetadataProperties();
if (!metadataProperties.getNodeNames().contains(nodeId)) {
metadataProperties.getNodeNames().add(nodeId);
Cluster cluster = ClusterProperties.INSTANCE.getCluster();
@@ -305,8 +209,7 @@
throw new IllegalStateException("No cluster configuration found for this instance");
}
String asterixInstanceName = metadataProperties.getInstanceName();
- TransactionProperties txnProperties = ((IPropertiesProvider) runtimeContext)
- .getTransactionProperties();
+ TransactionProperties txnProperties = ((IPropertiesProvider) runtimeContext).getTransactionProperties();
Node self = null;
List<Node> nodes;
if (cluster.getSubstituteNodes() != null) {
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/util/FaultToleranceUtil.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/util/FaultToleranceUtil.java
new file mode 100644
index 0000000..0a9a215
--- /dev/null
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/util/FaultToleranceUtil.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.util;
+
+import java.util.List;
+import java.util.Map;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+import java.util.stream.Collectors;
+
+import org.apache.asterix.common.cluster.IClusterStateManager;
+import org.apache.asterix.common.messaging.api.ICCMessageBroker;
+import org.apache.asterix.common.replication.IReplicationStrategy;
+import org.apache.asterix.common.replication.Replica;
+import org.apache.asterix.runtime.message.ReplicaEventMessage;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hyracks.api.application.IClusterLifecycleListener.ClusterEventType;
+
+public class FaultToleranceUtil {
+
+ private static final Logger LOGGER = Logger.getLogger(FaultToleranceUtil.class.getName());
+ private static final String CLUSTER_NET_IP_ADDRESS_KEY = "cluster-net-ip-address";
+
+ private FaultToleranceUtil() {
+ throw new AssertionError();
+ }
+
+ public static void notifyImpactedReplicas(String nodeId, ClusterEventType event,
+ IClusterStateManager clusterManager, ICCMessageBroker messageBroker,
+ IReplicationStrategy replicationStrategy) {
+ List<String> primaryRemoteReplicas = replicationStrategy.getRemotePrimaryReplicas(nodeId).stream()
+ .map(Replica::getId).collect(Collectors.toList());
+ String nodeIdAddress = StringUtils.EMPTY;
+ Map<String, Map<String, String>> activeNcConfiguration = clusterManager.getActiveNcConfiguration();
+ // In case the node joined with a new IP address, we need to send it to the other replicas
+ if (event == ClusterEventType.NODE_JOIN) {
+ nodeIdAddress = activeNcConfiguration.get(nodeId).get(CLUSTER_NET_IP_ADDRESS_KEY);
+ }
+ ReplicaEventMessage msg = new ReplicaEventMessage(nodeId, nodeIdAddress, event);
+ for (String replica : primaryRemoteReplicas) {
+ // If the remote replica is alive, send the event
+ if (activeNcConfiguration.containsKey(replica)) {
+ try {
+ messageBroker.sendApplicationMessageToNC(msg, replica);
+ } catch (Exception e) {
+ LOGGER.log(Level.WARNING, "Failed sending an application message to an NC", e);
+ }
+ }
+ }
+ }
+}
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/main/resources/cluster.xml b/asterixdb/asterix-app/src/main/resources/cluster.xml
index 8f0b694..7f78b26 100644
--- a/asterixdb/asterix-app/src/main/resources/cluster.xml
+++ b/asterixdb/asterix-app/src/main/resources/cluster.xml
@@ -19,14 +19,22 @@
<cluster xmlns="cluster">
<instance_name>asterix</instance_name>
<store>storage</store>
+ <metadata_node>nc1</metadata_node>
- <data_replication>
+ <high_availability>
<enabled>false</enabled>
- <replication_port>2016</replication_port>
- <replication_factor>2</replication_factor>
- <auto_failover>false</auto_failover>
- <replication_time_out>30</replication_time_out>
- </data_replication>
+ <data_replication>
+ <strategy>metadata_only</strategy>
+ <replication_port>2016</replication_port>
+ <replication_time_out>30</replication_time_out>
+ </data_replication>
+ <fault_tolerance>
+ <strategy>metadata_node</strategy>
+ <replica>
+ <node_id>nc2</node_id>
+ </replica>
+ </fault_tolerance>
+ </high_availability>
<master_node>
<id>master</id>
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/replication/replication.1.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/replication/replication.1.adm
index 74e019b..a614faa 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/replication/replication.1.adm
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/replication/replication.1.adm
@@ -1,11 +1,9 @@
{
"config" : {
- "enabled" : false,
- "factor" : 2,
"log.batchsize" : 4096,
"log.buffer.numpages" : 8,
"log.buffer.pagesize" : 131072,
"max.remote.recovery.attempts" : 5,
"timeout" : 30
}
-}
\ No newline at end of file
+}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetLifecycleManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetLifecycleManager.java
index 04e30ff..ae19f23 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetLifecycleManager.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetLifecycleManager.java
@@ -23,6 +23,7 @@
import org.apache.asterix.common.context.DatasetInfo;
import org.apache.asterix.common.context.IndexInfo;
import org.apache.asterix.common.context.PrimaryIndexOperationTracker;
+import org.apache.asterix.common.replication.IReplicationStrategy;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.storage.am.common.api.IIndex;
import org.apache.hyracks.storage.am.common.api.IResourceLifecycleManager;
@@ -102,4 +103,12 @@
* @throws HyracksDataException
*/
void closeUserDatasets() throws HyracksDataException;
+
+ /**
+ * Flushes all opened datasets that are matching {@code replicationStrategy}.
+ *
+ * @param replicationStrategy
+ * @throws HyracksDataException
+ */
+ void flushDataset(IReplicationStrategy replicationStrategy) throws HyracksDataException;
}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/INCLifecycleTask.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/INCLifecycleTask.java
new file mode 100644
index 0000000..c30e999
--- /dev/null
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/INCLifecycleTask.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.api;
+
+import java.io.Serializable;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.service.IControllerService;
+
+@FunctionalInterface
+public interface INCLifecycleTask extends Serializable {
+
+ /**
+ * Performs the task.
+ *
+ * @param cs
+ * @throws HyracksDataException
+ */
+ void perform(IControllerService cs) throws HyracksDataException;
+}
\ No newline at end of file
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IClusterStateManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IClusterStateManager.java
new file mode 100644
index 0000000..d971f48
--- /dev/null
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IClusterStateManager.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.cluster;
+
+import java.util.Map;
+
+import org.apache.asterix.common.api.IClusterManagementWork.ClusterState;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public interface IClusterStateManager {
+
+ /**
+ * @return The current cluster state.
+ */
+ ClusterState getState();
+
+ /**
+ * Updates the cluster state based on the state of all cluster partitions and the metadata node.
+ * Cluster state after refresh:
+ * ACTIVE: all cluster partitions are active and the metadata node is bound.
+ * PENDING: all cluster partitions are active but the metadata node is not bound.
+ * UNUSABLE: one or more cluster partitions are not active.
+ */
+ void refreshState() throws HyracksDataException;
+
+ /**
+ * Sets the cluster state into {@code state}
+ */
+ void setState(ClusterState state);
+
+ /**
+ * Updates all partitions of {@code nodeId} based on the {@code active} flag.
+ * @param nodeId
+ * @param active
+ * @throws HyracksDataException
+ */
+ void updateNodePartitions(String nodeId, boolean active) throws HyracksDataException;
+
+ /**
+ * Updates the active node and active state of the cluster partition with id {@code partitionNum}
+ */
+ void updateClusterPartition(Integer partitionNum, String activeNode, boolean active);
+
+ /**
+ * Updates the metadata node id and its state.
+ */
+ void updateMetadataNode(String nodeId, boolean active);
+
+ /**
+ * @return a map of nodeId and NC Configuration for active nodes.
+ */
+ Map<String, Map<String, String>> getActiveNcConfiguration();
+
+ /**
+ * @return The current metadata node Id.
+ */
+ String getCurrentMetadataNodeId();
+
+ /**
+ * @param nodeId
+ * @return The node originally assigned partitions.
+ */
+ ClusterPartition[] getNodePartitions(String nodeId);
+
+ /**
+ * @return A copy of the current state of the cluster partitions.
+ */
+ ClusterPartition[] getClusterPartitons();
+}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/ClusterProperties.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/ClusterProperties.java
index 81c5a6d..980ad24 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/ClusterProperties.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/ClusterProperties.java
@@ -19,21 +19,27 @@
package org.apache.asterix.common.config;
import java.io.InputStream;
+import java.util.Optional;
+import java.util.stream.Collectors;
import javax.xml.bind.JAXBContext;
import javax.xml.bind.JAXBException;
import javax.xml.bind.Unmarshaller;
+import org.apache.asterix.common.replication.IReplicationStrategy;
+import org.apache.asterix.common.replication.ReplicationStrategyFactory;
import org.apache.asterix.event.schema.cluster.Cluster;
+import org.apache.asterix.event.schema.cluster.Node;
+import org.apache.asterix.event.schema.cluster.Replica;
+import org.apache.commons.lang3.StringUtils;
public class ClusterProperties {
public static final ClusterProperties INSTANCE = new ClusterProperties();
-
private static final String CLUSTER_CONFIGURATION_FILE = "cluster.xml";
private static final String DEFAULT_STORAGE_DIR_NAME = "storage";
-
- private final Cluster cluster;
+ private String nodeNamePrefix = StringUtils.EMPTY;
+ private Cluster cluster;
private ClusterProperties() {
InputStream is = this.getClass().getClassLoader().getResourceAsStream(CLUSTER_CONFIGURATION_FILE);
@@ -42,11 +48,11 @@
JAXBContext ctx = JAXBContext.newInstance(Cluster.class);
Unmarshaller unmarshaller = ctx.createUnmarshaller();
cluster = (Cluster) unmarshaller.unmarshal(is);
+ nodeNamePrefix = cluster.getInstanceName() + "_";
+ updateNodeIdToFullName();
} catch (JAXBException e) {
throw new IllegalStateException("Failed to read configuration file " + CLUSTER_CONFIGURATION_FILE, e);
}
- } else {
- cluster = null;
}
}
@@ -62,14 +68,41 @@
return DEFAULT_STORAGE_DIR_NAME;
}
- public boolean isReplicationEnabled() {
- if (cluster != null && cluster.getDataReplication() != null) {
- return cluster.getDataReplication().isEnabled();
- }
- return false;
+ public Node getNodeById(String nodeId) {
+ Optional<Node> matchingNode = cluster.getNode().stream().filter(node -> node.getId().equals(nodeId)).findAny();
+ return matchingNode.isPresent() ? matchingNode.get() : null;
}
- public boolean isAutoFailoverEnabled() {
- return isReplicationEnabled() && cluster.getDataReplication().isAutoFailover();
+ public int getNodeIndex(String nodeId) {
+ for (int i = 0; i < cluster.getNode().size(); i++) {
+ Node node = cluster.getNode().get(i);
+ if (node.getId().equals(nodeId)) {
+ return i;
+ }
+ }
+ return -1;
+ }
+
+ public IReplicationStrategy getReplicationStrategy() {
+ return ReplicationStrategyFactory.create(cluster);
+ }
+
+ private String getNodeFullName(String nodeId) {
+ if (nodeId.startsWith(nodeNamePrefix)) {
+ return nodeId;
+ }
+ return nodeNamePrefix + nodeId;
+ }
+
+ private void updateNodeIdToFullName() {
+ cluster.getNode().forEach(node -> node.setId(getNodeFullName(node.getId())));
+ if (cluster.getMetadataNode() != null) {
+ cluster.setMetadataNode(getNodeFullName(cluster.getMetadataNode()));
+ }
+ if (cluster.getHighAvailability() != null && cluster.getHighAvailability().getFaultTolerance() != null
+ && cluster.getHighAvailability().getFaultTolerance().getReplica() != null) {
+ Replica replicas = cluster.getHighAvailability().getFaultTolerance().getReplica();
+ replicas.setNodeId(replicas.getNodeId().stream().map(this::getNodeFullName).collect(Collectors.toList()));
+ }
}
}
\ No newline at end of file
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/ReplicationProperties.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/ReplicationProperties.java
index 164a525..cf2ce4f 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/ReplicationProperties.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/ReplicationProperties.java
@@ -18,11 +18,10 @@
*/
package org.apache.asterix.common.config;
-import java.util.HashSet;
import java.util.Set;
-import java.util.logging.Level;
-import java.util.logging.Logger;
+import java.util.stream.Collectors;
+import org.apache.asterix.common.replication.IReplicationStrategy;
import org.apache.asterix.common.replication.Replica;
import org.apache.asterix.event.schema.cluster.Cluster;
import org.apache.asterix.event.schema.cluster.Node;
@@ -31,16 +30,8 @@
public class ReplicationProperties extends AbstractProperties {
- private static final Logger LOGGER = Logger.getLogger(ReplicationProperties.class.getName());
-
-
private static final int REPLICATION_DATAPORT_DEFAULT = 2000;
- private static final String REPLICATION_ENABLED_KEY = "replication.enabled";
-
- private static final String REPLICATION_FACTOR_KEY = "replication.factor";
- private static final int REPLICATION_FACTOR_DEFAULT = 1;
-
private static final String REPLICATION_TIMEOUT_KEY = "replication.timeout";
private static final int REPLICATION_TIME_OUT_DEFAULT = 15;
@@ -60,205 +51,60 @@
private static final int REPLICATION_LOG_BUFFER_PAGE_SIZE_DEFAULT = StorageUtil.getSizeInBytes(128,
StorageUnit.KILOBYTE);
- private final String nodeNamePrefix;
private final Cluster cluster;
+ private final IReplicationStrategy repStrategy;
public ReplicationProperties(PropertiesAccessor accessor) {
super(accessor);
this.cluster = ClusterProperties.INSTANCE.getCluster();
-
- if (cluster != null) {
- nodeNamePrefix = cluster.getInstanceName() + "_";
- } else {
- nodeNamePrefix = "";
- }
- }
-
- @PropertyKey(REPLICATION_ENABLED_KEY)
- public boolean isReplicationEnabled() {
- return ClusterProperties.INSTANCE.isReplicationEnabled();
+ this.repStrategy = ClusterProperties.INSTANCE.getReplicationStrategy();
}
public String getReplicaIPAddress(String nodeId) {
- if (cluster != null) {
- for (int i = 0; i < cluster.getNode().size(); i++) {
- Node node = cluster.getNode().get(i);
- if (getRealCluserNodeID(node.getId()).equals(nodeId)) {
- return node.getClusterIp();
- }
- }
- }
- return NODE_IP_ADDRESS_DEFAULT;
+ Node node = ClusterProperties.INSTANCE.getNodeById(nodeId);
+ return node != null ? node.getClusterIp() : NODE_IP_ADDRESS_DEFAULT;
}
public int getDataReplicationPort(String nodeId) {
- if (cluster != null && cluster.getDataReplication() != null) {
- for (int i = 0; i < cluster.getNode().size(); i++) {
- Node node = cluster.getNode().get(i);
- if (getRealCluserNodeID(node.getId()).equals(nodeId)) {
- return node.getReplicationPort() != null ? node.getReplicationPort().intValue()
- : cluster.getDataReplication().getReplicationPort().intValue();
- }
- }
+ Node node = ClusterProperties.INSTANCE.getNodeById(nodeId);
+ if (node != null) {
+ return node.getReplicationPort() != null ? node.getReplicationPort().intValue()
+ : cluster.getHighAvailability().getDataReplication().getReplicationPort().intValue();
}
return REPLICATION_DATAPORT_DEFAULT;
}
- public Set<Replica> getRemoteReplicas(String nodeId) {
- Set<Replica> remoteReplicas = new HashSet<>();;
-
- int numberOfRemoteReplicas = getReplicationFactor() - 1;
- //Using chained-declustering
- if (cluster != null) {
- int nodeIndex = -1;
- //find the node index in the cluster config
- for (int i = 0; i < cluster.getNode().size(); i++) {
- Node node = cluster.getNode().get(i);
- if (getRealCluserNodeID(node.getId()).equals(nodeId)) {
- nodeIndex = i;
- break;
- }
- }
-
- if (nodeIndex == -1) {
- LOGGER.log(Level.WARNING,
- "Could not find node " + getRealCluserNodeID(nodeId) + " in cluster configurations");
- return null;
- }
-
- //find nodes to the right of this node
- for (int i = nodeIndex + 1; i < cluster.getNode().size(); i++) {
- remoteReplicas.add(getReplicaByNodeIndex(i));
- if (remoteReplicas.size() == numberOfRemoteReplicas) {
- break;
- }
- }
-
- //if not all remote replicas have been found, start from the beginning
- if (remoteReplicas.size() != numberOfRemoteReplicas) {
- for (int i = 0; i < cluster.getNode().size(); i++) {
- remoteReplicas.add(getReplicaByNodeIndex(i));
- if (remoteReplicas.size() == numberOfRemoteReplicas) {
- break;
- }
- }
- }
- }
- return remoteReplicas;
- }
-
- private Replica getReplicaByNodeIndex(int nodeIndex) {
- Node node = cluster.getNode().get(nodeIndex);
- Node replicaNode = new Node();
- replicaNode.setId(getRealCluserNodeID(node.getId()));
- replicaNode.setClusterIp(node.getClusterIp());
- return new Replica(replicaNode);
- }
-
public Replica getReplicaById(String nodeId) {
- int nodeIndex = -1;
- if (cluster != null) {
- for (int i = 0; i < cluster.getNode().size(); i++) {
- Node node = cluster.getNode().get(i);
-
- if (getRealCluserNodeID(node.getId()).equals(nodeId)) {
- nodeIndex = i;
- break;
- }
- }
+ Node node = ClusterProperties.INSTANCE.getNodeById(nodeId);
+ if (node != null) {
+ return new Replica(node);
}
-
- if (nodeIndex < 0) {
- return null;
- }
-
- return getReplicaByNodeIndex(nodeIndex);
+ return null;
}
public Set<String> getRemoteReplicasIds(String nodeId) {
- Set<String> remoteReplicasIds = new HashSet<>();
- Set<Replica> remoteReplicas = getRemoteReplicas(nodeId);
-
- for (Replica replica : remoteReplicas) {
- remoteReplicasIds.add(replica.getId());
- }
-
- return remoteReplicasIds;
+ return repStrategy.getRemoteReplicas(nodeId).stream().map(Replica::getId).collect(Collectors.toSet());
}
- public String getRealCluserNodeID(String nodeId) {
- return nodeNamePrefix + nodeId;
+ public Set<String> getRemotePrimaryReplicasIds(String nodeId) {
+ return repStrategy.getRemotePrimaryReplicas(nodeId).stream().map(Replica::getId).collect(Collectors.toSet());
}
public Set<String> getNodeReplicasIds(String nodeId) {
- Set<String> replicaIds = new HashSet<>();
- replicaIds.add(nodeId);
- replicaIds.addAll(getRemoteReplicasIds(nodeId));
- return replicaIds;
- }
-
- @PropertyKey(REPLICATION_FACTOR_KEY)
- public int getReplicationFactor() {
- if (cluster != null) {
- if (cluster.getDataReplication() == null || cluster.getDataReplication().getReplicationFactor() == null) {
- return REPLICATION_FACTOR_DEFAULT;
- }
- return cluster.getDataReplication().getReplicationFactor().intValue();
- }
- return REPLICATION_FACTOR_DEFAULT;
+ Set<String> remoteReplicasIds = getRemoteReplicasIds(nodeId);
+ // This includes the node itself
+ remoteReplicasIds.add(nodeId);
+ return remoteReplicasIds;
}
@PropertyKey(REPLICATION_TIMEOUT_KEY)
public int getReplicationTimeOut() {
if (cluster != null) {
- return cluster.getDataReplication().getReplicationTimeOut().intValue();
+ return cluster.getHighAvailability().getDataReplication().getReplicationTimeOut().intValue();
}
return REPLICATION_TIME_OUT_DEFAULT;
}
- /**
- * @param nodeId
- * @return The set of nodes which replicate to this node, including the node itself
- */
- public Set<String> getNodeReplicationClients(String nodeId) {
- Set<String> clientReplicas = new HashSet<>();
- clientReplicas.add(nodeId);
-
- int clientsCount = getReplicationFactor();
-
- //Using chained-declustering backwards
- if (cluster != null) {
- int nodeIndex = -1;
- //find the node index in the cluster config
- for (int i = 0; i < cluster.getNode().size(); i++) {
- Node node = cluster.getNode().get(i);
- if (getRealCluserNodeID(node.getId()).equals(nodeId)) {
- nodeIndex = i;
- break;
- }
- }
-
- //find nodes to the left of this node
- for (int i = nodeIndex - 1; i >= 0; i--) {
- clientReplicas.add(getReplicaByNodeIndex(i).getId());
- if (clientReplicas.size() == clientsCount) {
- break;
- }
- }
-
- //if not all client replicas have been found, start from the end
- if (clientReplicas.size() != clientsCount) {
- for (int i = cluster.getNode().size() - 1; i >= 0; i--) {
- clientReplicas.add(getReplicaByNodeIndex(i).getId());
- if (clientReplicas.size() == clientsCount) {
- break;
- }
- }
- }
- }
- return clientReplicas;
- }
-
@PropertyKey(REPLICATION_MAX_REMOTE_RECOVERY_ATTEMPTS_KEY)
public int getMaxRemoteRecoveryAttempts() {
return MAX_REMOTE_RECOVERY_ATTEMPTS;
@@ -281,4 +127,12 @@
return accessor.getProperty(REPLICATION_LOG_BATCH_SIZE_KEY, REPLICATION_LOG_BATCH_SIZE_DEFAULT,
PropertyInterpreters.getIntegerBytePropertyInterpreter());
}
-}
+
+ public boolean isParticipant(String nodeId) {
+ return repStrategy.isParticipant(nodeId);
+ }
+
+ public IReplicationStrategy getReplicationStrategy() {
+ return repStrategy;
+ }
+}
\ No newline at end of file
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
index f49b07a..cbb4868 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
@@ -30,6 +30,7 @@
import org.apache.asterix.common.config.StorageProperties;
import org.apache.asterix.common.exceptions.ACIDException;
import org.apache.asterix.common.ioopcallbacks.AbstractLSMIOOperationCallback;
+import org.apache.asterix.common.replication.IReplicationStrategy;
import org.apache.asterix.common.transactions.ILogManager;
import org.apache.asterix.common.transactions.LogRecord;
import org.apache.asterix.common.transactions.Resource;
@@ -571,4 +572,13 @@
public int getNumPartitions() {
return numPartitions;
}
+
+ @Override
+ public void flushDataset(IReplicationStrategy replicationStrategy) throws HyracksDataException {
+ for (DatasetResource dsr : datasets.values()) {
+ if (replicationStrategy.isMatch(dsr.getDatasetID())) {
+ flushDatasetOpenIndexes(dsr.getDatasetInfo(), false);
+ }
+ }
+ }
}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/ChainedDeclusteringReplicationStrategy.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/ChainedDeclusteringReplicationStrategy.java
new file mode 100644
index 0000000..ad326b2
--- /dev/null
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/ChainedDeclusteringReplicationStrategy.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.replication;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.asterix.common.config.ClusterProperties;
+import org.apache.asterix.event.schema.cluster.Cluster;
+
+public class ChainedDeclusteringReplicationStrategy implements IReplicationStrategy {
+
+ private static final Logger LOGGER = Logger.getLogger(ChainedDeclusteringReplicationStrategy.class.getName());
+ private int replicationFactor;
+
+ @Override
+ public boolean isMatch(int datasetId) {
+ return true;
+ }
+
+ @Override
+ public Set<Replica> getRemoteReplicas(String nodeId) {
+ Set<Replica> remoteReplicas = new HashSet<>();
+ Cluster cluster = ClusterProperties.INSTANCE.getCluster();
+ int numberOfRemoteReplicas = replicationFactor - 1;
+ int nodeIndex = ClusterProperties.INSTANCE.getNodeIndex(nodeId);
+
+ if (nodeIndex == -1) {
+ LOGGER.log(Level.WARNING, "Could not find node " + nodeId + " in cluster configurations");
+ return Collections.emptySet();
+ }
+
+ //find nodes to the right of this node
+ while (remoteReplicas.size() != numberOfRemoteReplicas) {
+ remoteReplicas.add(new Replica(cluster.getNode().get(++nodeIndex % cluster.getNode().size())));
+ }
+
+ return remoteReplicas;
+ }
+
+ @Override
+ public Set<Replica> getRemotePrimaryReplicas(String nodeId) {
+ Set<Replica> clientReplicas = new HashSet<>();
+ Cluster cluster = ClusterProperties.INSTANCE.getCluster();
+ final int remotePrimaryReplicasCount = replicationFactor - 1;
+
+ int nodeIndex = ClusterProperties.INSTANCE.getNodeIndex(nodeId);
+
+ //find nodes to the left of this node
+ while (clientReplicas.size() != remotePrimaryReplicasCount) {
+ clientReplicas.add(new Replica(cluster.getNode().get(Math.abs(--nodeIndex % cluster.getNode().size()))));
+ }
+
+ return clientReplicas;
+ }
+
+ @Override
+ public ChainedDeclusteringReplicationStrategy from(Cluster cluster) {
+ if (cluster.getHighAvailability().getDataReplication().getReplicationFactor() == null) {
+ throw new IllegalStateException("Replication factor must be specified.");
+ }
+ ChainedDeclusteringReplicationStrategy cd = new ChainedDeclusteringReplicationStrategy();
+ cd.replicationFactor = cluster.getHighAvailability().getDataReplication().getReplicationFactor().intValue();
+ return cd;
+ }
+}
\ No newline at end of file
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/IFaultToleranceStrategy.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/IFaultToleranceStrategy.java
new file mode 100644
index 0000000..46d5d98
--- /dev/null
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/IFaultToleranceStrategy.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.replication;
+
+import org.apache.asterix.common.cluster.IClusterStateManager;
+import org.apache.asterix.common.messaging.api.ICCMessageBroker;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public interface IFaultToleranceStrategy {
+
+ /**
+ * Defines the logic of a {@link IFaultToleranceStrategy} when a node joins the cluster.
+ *
+ * @param nodeId
+ * @throws HyracksDataException
+ */
+ void notifyNodeJoin(String nodeId) throws HyracksDataException;
+
+ /**
+ * Defines the logic of a {@link IFaultToleranceStrategy} when a node leaves the cluster.
+ *
+ * @param nodeId
+ * @throws HyracksDataException
+ */
+ void notifyNodeFailure(String nodeId) throws HyracksDataException;
+
+ /**
+ * Binds the fault tolerance strategy to {@code cluserManager}.
+ *
+ * @param clusterManager
+ */
+ void bindTo(IClusterStateManager clusterManager);
+
+ /**
+ * Processes {@code message} based on the message type.
+ *
+ * @param message
+ * @throws HyracksDataException
+ */
+ void process(INCLifecycleMessage message) throws HyracksDataException;
+
+ /**
+ * Constructs a fault tolerance strategy.
+ *
+ * @param replicationStrategy
+ * @param messageBroker
+ * @return
+ */
+ IFaultToleranceStrategy from(IReplicationStrategy replicationStrategy, ICCMessageBroker messageBroker);
+
+}
\ No newline at end of file
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/INCLifecycleMessage.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/INCLifecycleMessage.java
new file mode 100644
index 0000000..c19b0aa
--- /dev/null
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/INCLifecycleMessage.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.replication;
+
+import org.apache.asterix.common.messaging.api.IApplicationMessage;
+
+public interface INCLifecycleMessage extends IApplicationMessage {
+
+ public enum MessageType {
+ REPLAY_LOGS_REQUEST,
+ REPLAY_LOGS_RESPONSE,
+ PREPARE_FAILBACK_REQUEST,
+ PREPARE_FAILBACK_RESPONSE,
+ COMPLETE_FAILBACK_REQUEST,
+ COMPLETE_FAILBACK_RESPONSE,
+ STARTUP_TASK_REQUEST,
+ STARTUP_TASK_RESPONSE,
+ STARTUP_TASK_RESULT,
+ TAKEOVER_PARTITION_REQUEST,
+ TAKEOVER_PARTITION_RESPONSE,
+ TAKEOVER_METADATA_NODE_REQUEST,
+ TAKEOVER_METADATA_NODE_RESPONSE
+ }
+
+ /**
+ * @return The message type.
+ */
+ MessageType getType();
+}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/IRemoteRecoveryManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/IRemoteRecoveryManager.java
index 9f9d74b..51b826b 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/IRemoteRecoveryManager.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/IRemoteRecoveryManager.java
@@ -19,8 +19,11 @@
package org.apache.asterix.common.replication;
import java.io.IOException;
+import java.util.Map;
+import java.util.Set;
import org.apache.asterix.common.exceptions.ACIDException;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
public interface IRemoteRecoveryManager {
@@ -46,4 +49,23 @@
* @throws InterruptedException
*/
public void completeFailbackProcess() throws IOException, InterruptedException;
+
+ /**
+ * Replays all committed jobs logs for {@code partitions}. Optionally, flushes all datasets
+ * to convert the replayed logs into LSM Components.
+ *
+ * @param partitions
+ * @param flush
+ * @throws HyracksDataException
+ */
+ void replayReplicaPartitionLogs(Set<Integer> partitions, boolean flush) throws HyracksDataException;
+
+ /**
+ * Performs the remote recovery plan by requesting data from each specified node
+ * for each partitions specified.
+ *
+ * @param recoveryPlan
+ * @throws HyracksDataException
+ */
+ void doRemoteRecoveryPlan(Map<String, Set<Integer>> recoveryPlan) throws HyracksDataException;
}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/IReplicationManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/IReplicationManager.java
index 6bd1505..b969bef 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/IReplicationManager.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/IReplicationManager.java
@@ -50,13 +50,13 @@
*
* @param remoteReplicaId
* The replica id to send the request to.
- * @param replicasDataToRecover
- * Get files that belong to those replicas.
+ * @param partitionsToRecover
+ * Get files that belong to those partitions.
* @param existingFiles
* a list of already existing files on the requester
* @throws IOException
*/
- public void requestReplicaFiles(String remoteReplicaId, Set<String> replicasDataToRecover,
+ public void requestReplicaFiles(String remoteReplicaId, Set<Integer> partitionsToRecover,
Set<String> existingFiles) throws IOException;
/**
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/IReplicationStrategy.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/IReplicationStrategy.java
new file mode 100644
index 0000000..f65f6ac
--- /dev/null
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/IReplicationStrategy.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.replication;
+
+import java.util.Set;
+
+import org.apache.asterix.event.schema.cluster.Cluster;
+
+public interface IReplicationStrategy {
+
+ /**
+ * @param datasetId
+ * @return True, if the dataset should be replicated. Otherwise false.
+ */
+ boolean isMatch(int datasetId);
+
+ /**
+ * @param nodeId
+ * @return The set of nodes that replicate data on {@code nodeId}.
+ */
+ Set<Replica> getRemotePrimaryReplicas(String nodeId);
+
+ /**
+ * @param node
+ * @return The set of nodes that {@code nodeId} replicates data to.
+ */
+ Set<Replica> getRemoteReplicas(String node);
+
+ /**
+ * @param nodeId
+ * @return true if {@code nodeId} has any remote primary replica or remote replica. Otherwise false.
+ */
+ default boolean isParticipant(String nodeId) {
+ return !getRemoteReplicas(nodeId).isEmpty() || !getRemotePrimaryReplicas(nodeId).isEmpty();
+ }
+
+ /**
+ * @param cluster
+ * @return A replication strategy based on the passed configurations.
+ */
+ IReplicationStrategy from(Cluster cluster);
+}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/MetadataOnlyReplicationStrategy.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/MetadataOnlyReplicationStrategy.java
new file mode 100644
index 0000000..711f06d
--- /dev/null
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/MetadataOnlyReplicationStrategy.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.replication;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.asterix.common.config.ClusterProperties;
+import org.apache.asterix.common.metadata.MetadataIndexImmutableProperties;
+import org.apache.asterix.event.schema.cluster.Cluster;
+import org.apache.asterix.event.schema.cluster.Node;
+
+public class MetadataOnlyReplicationStrategy implements IReplicationStrategy {
+
+ private String metadataNodeId;
+ private Replica metadataPrimaryReplica;
+ private Set<Replica> metadataNodeReplicas;
+
+ @Override
+ public boolean isMatch(int datasetId) {
+ return datasetId < MetadataIndexImmutableProperties.FIRST_AVAILABLE_USER_DATASET_ID && datasetId >= 0;
+ }
+
+ @Override
+ public Set<Replica> getRemoteReplicas(String nodeId) {
+ if (nodeId.equals(metadataNodeId)) {
+ return metadataNodeReplicas;
+ }
+ return Collections.emptySet();
+ }
+
+ @Override
+ public Set<Replica> getRemotePrimaryReplicas(String nodeId) {
+ if (metadataNodeReplicas.stream().map(Replica::getId).filter(replicaId -> replicaId.equals(nodeId))
+ .count() != 0) {
+ return new HashSet<>(Arrays.asList(metadataPrimaryReplica));
+ }
+ return Collections.emptySet();
+ }
+
+ @Override
+ public MetadataOnlyReplicationStrategy from(Cluster cluster) {
+ if (cluster.getMetadataNode() == null) {
+ throw new IllegalStateException("Metadata node must be specified.");
+ }
+
+ Node metadataNode = ClusterProperties.INSTANCE.getNodeById(cluster.getMetadataNode());
+ if (metadataNode == null) {
+ throw new IllegalStateException("Invalid metadata node specified");
+ }
+
+ if (cluster.getHighAvailability().getFaultTolerance().getReplica() == null
+ || cluster.getHighAvailability().getFaultTolerance().getReplica().getNodeId() == null
+ || cluster.getHighAvailability().getFaultTolerance().getReplica().getNodeId().isEmpty()) {
+ throw new IllegalStateException("One or more replicas must be specified for metadata node.");
+ }
+
+ final Set<Replica> replicas = new HashSet<>();
+ for (String nodeId : cluster.getHighAvailability().getFaultTolerance().getReplica().getNodeId()) {
+ Node node = ClusterProperties.INSTANCE.getNodeById(nodeId);
+ if (node == null) {
+ throw new IllegalStateException("Invalid replica specified: " + nodeId);
+ }
+ replicas.add(new Replica(node));
+ }
+ MetadataOnlyReplicationStrategy st = new MetadataOnlyReplicationStrategy();
+ st.metadataNodeId = cluster.getMetadataNode();
+ st.metadataPrimaryReplica = new Replica(metadataNode);
+ st.metadataNodeReplicas = replicas;
+ return st;
+ }
+}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/NoReplicationStrategy.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/NoReplicationStrategy.java
new file mode 100644
index 0000000..43347f6
--- /dev/null
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/NoReplicationStrategy.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.replication;
+
+import java.util.Collections;
+import java.util.Set;
+
+import org.apache.asterix.event.schema.cluster.Cluster;
+
+public class NoReplicationStrategy implements IReplicationStrategy {
+
+ @Override
+ public boolean isMatch(int datasetId) {
+ return false;
+ }
+
+ @Override
+ public boolean isParticipant(String nodeId) {
+ return false;
+ }
+
+ @Override
+ public Set<Replica> getRemotePrimaryReplicas(String nodeId) {
+ return Collections.emptySet();
+ }
+
+ @Override
+ public Set<Replica> getRemoteReplicas(String node) {
+ return Collections.emptySet();
+ }
+
+ @Override
+ public NoReplicationStrategy from(Cluster cluster) {
+ return new NoReplicationStrategy();
+ }
+}
\ No newline at end of file
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/Replica.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/Replica.java
index bd77778..267a22d 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/Replica.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/Replica.java
@@ -36,11 +36,13 @@
UNKNOWN
}
- final Node node;
+ private final Node node;
private ReplicaState state = ReplicaState.UNKNOWN;
public Replica(Node node) {
- this.node = node;
+ this.node = new Node();
+ this.node.setId(node.getId());
+ this.node.setClusterIp(node.getClusterIp());
}
public ReplicaState getState() {
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/ReplicationStrategyFactory.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/ReplicationStrategyFactory.java
new file mode 100644
index 0000000..b61b38a
--- /dev/null
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/ReplicationStrategyFactory.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.replication;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.asterix.event.schema.cluster.Cluster;
+
+public class ReplicationStrategyFactory {
+
+ private static final Map<String, Class<? extends IReplicationStrategy>>
+ BUILT_IN_REPLICATION_STRATEGY = new HashMap<>();
+
+ static {
+ BUILT_IN_REPLICATION_STRATEGY.put("no_replication", NoReplicationStrategy.class);
+ BUILT_IN_REPLICATION_STRATEGY.put("chained_declustering", ChainedDeclusteringReplicationStrategy.class);
+ BUILT_IN_REPLICATION_STRATEGY.put("metadata_only", MetadataOnlyReplicationStrategy.class);
+ }
+
+ private ReplicationStrategyFactory() {
+ throw new AssertionError();
+ }
+
+ public static IReplicationStrategy create(Cluster cluster) {
+ boolean highAvailabilityEnabled = cluster.getHighAvailability() != null
+ && cluster.getHighAvailability().getEnabled() != null
+ && Boolean.valueOf(cluster.getHighAvailability().getEnabled());
+
+ if (!highAvailabilityEnabled || cluster.getHighAvailability().getDataReplication() == null
+ || cluster.getHighAvailability().getDataReplication().getStrategy() == null) {
+ return new NoReplicationStrategy();
+ }
+ String strategyName = cluster.getHighAvailability().getDataReplication().getStrategy().toLowerCase();
+ if (!BUILT_IN_REPLICATION_STRATEGY.containsKey(strategyName)) {
+ throw new IllegalArgumentException(String.format("Unsupported Replication Strategy. Available types: %s",
+ BUILT_IN_REPLICATION_STRATEGY.keySet().toString()));
+ }
+ Class<? extends IReplicationStrategy> clazz = BUILT_IN_REPLICATION_STRATEGY.get(strategyName);
+ try {
+ return clazz.newInstance().from(cluster);
+ } catch (InstantiationException | IllegalAccessException e) {
+ throw new IllegalStateException(e);
+ }
+ }
+}
\ No newline at end of file
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IndexFileProperties.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IndexFileProperties.java
new file mode 100644
index 0000000..ca6968f
--- /dev/null
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IndexFileProperties.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.storage;
+
+import java.io.File;
+
+import org.apache.asterix.common.utils.StoragePathUtil;
+
+/**
+ * A holder class for an index file properties.
+ */
+public class IndexFileProperties {
+
+ private final String fileName;
+ private final String idxName;
+ private final String dataverseName;
+ private final int partitionId;
+ private final int datasetId;
+
+ public IndexFileProperties(int partitionId, String dataverseName, String idxName, String fileName, int datasetId) {
+ this.partitionId = partitionId;
+ this.dataverseName = dataverseName;
+ this.idxName = idxName;
+ this.fileName = fileName;
+ this.datasetId = datasetId;
+ }
+
+ public String getFileName() {
+ return fileName;
+ }
+
+ public String getIdxName() {
+ return idxName;
+ }
+
+ public String getDataverseName() {
+ return dataverseName;
+ }
+
+ public int getPartitionId() {
+ return partitionId;
+ }
+
+ public int getDatasetId() {
+ return datasetId;
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder();
+ sb.append(StoragePathUtil.PARTITION_DIR_PREFIX + partitionId + File.separator);
+ sb.append(dataverseName + File.separator);
+ sb.append(idxName + File.separator);
+ sb.append(fileName);
+ sb.append(" [Dataset ID: " + datasetId + "]");
+ return sb.toString();
+ }
+}
\ No newline at end of file
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/IRecoveryManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/IRecoveryManager.java
index 6816116..3e85276 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/IRecoveryManager.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/IRecoveryManager.java
@@ -34,6 +34,7 @@
public interface IRecoveryManager {
public enum SystemState {
+ INITIAL_RUN,
NEW_UNIVERSE,
RECOVERING,
HEALTHY,
@@ -120,4 +121,6 @@
* Deletes all temporary recovery files
*/
public void deleteRecoveryTemporaryFiles();
+
+ void startLocalRecovery(Set<Integer> partitions) throws IOException, ACIDException;
}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StoragePathUtil.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StoragePathUtil.java
index 9f2e3e7..46cd476 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StoragePathUtil.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StoragePathUtil.java
@@ -78,6 +78,17 @@
}
/**
+ * @param fileAbsolutePath
+ * @return the file relative path starting from the partition directory
+ */
+ public static String getIndexFileRelativePath(String fileAbsolutePath) {
+ String[] tokens = fileAbsolutePath.split(File.separator);
+ //partition/dataverse/idx/fileName
+ return tokens[tokens.length - 4] + File.separator + tokens[tokens.length - 3] + File.separator
+ + tokens[tokens.length - 2] + File.separator + tokens[tokens.length - 1];
+ }
+
+ /**
* Create a file
* Note: this method is not thread safe. It is the responsibility of the caller to ensure no path conflict when
* creating files simultaneously
diff --git a/asterixdb/asterix-common/src/main/resources/schema/cluster.xsd b/asterixdb/asterix-common/src/main/resources/schema/cluster.xsd
index 79c377a..098b4e7 100644
--- a/asterixdb/asterix-common/src/main/resources/schema/cluster.xsd
+++ b/asterixdb/asterix-common/src/main/resources/schema/cluster.xsd
@@ -44,7 +44,7 @@
<xs:element name="http_port" type="xs:integer" />
<xs:element name="debug_port" type="xs:integer" />
<xs:element name="metadata_node" type="xs:string" />
- <xs:element name="enabled" type="xs:boolean" />
+ <xs:element name="enabled" type="xs:string" />
<xs:element name="replication_port" type="xs:integer" />
<xs:element name="replication_factor" type="xs:integer" />
<xs:element name="auto_failover" type="xs:boolean" />
@@ -57,6 +57,8 @@
<xs:element name="result_time_to_live" type="xs:long" />
<xs:element name="result_sweep_threshold" type="xs:long" />
<xs:element name="cc_root" type="xs:string" />
+ <xs:element name="strategy" type="xs:string" />
+ <xs:element name="node_id" type="xs:string" />
<!-- definition of complex elements -->
<xs:element name="working_dir">
@@ -87,15 +89,33 @@
<xs:element name="data_replication">
<xs:complexType>
<xs:sequence>
- <xs:element ref="cl:enabled" />
+ <xs:element ref="cl:strategy" />
<xs:element ref="cl:replication_port" />
<xs:element ref="cl:replication_factor" />
- <xs:element ref="cl:auto_failover" />
<xs:element ref="cl:replication_time_out" />
</xs:sequence>
</xs:complexType>
</xs:element>
+ <xs:element name="fault_tolerance">
+ <xs:complexType>
+ <xs:sequence>
+ <xs:element ref="cl:strategy" />
+ <xs:element ref="cl:replica" minOccurs="0"/>
+ </xs:sequence>
+ </xs:complexType>
+ </xs:element>
+
+ <xs:element name="high_availability">
+ <xs:complexType>
+ <xs:sequence>
+ <xs:element ref="cl:enabled" minOccurs="0"/>
+ <xs:element ref="cl:data_replication" minOccurs="0"/>
+ <xs:element ref="cl:fault_tolerance" minOccurs="0"/>
+ </xs:sequence>
+ </xs:complexType>
+ </xs:element>
+
<xs:element name="property">
<xs:complexType>
<xs:sequence>
@@ -136,6 +156,14 @@
</xs:complexType>
</xs:element>
+ <xs:element name="replica">
+ <xs:complexType>
+ <xs:sequence>
+ <xs:element ref="cl:node_id" maxOccurs="unbounded" />
+ </xs:sequence>
+ </xs:complexType>
+ </xs:element>
+
<xs:element name="cluster">
<xs:complexType>
<xs:sequence>
@@ -150,7 +178,7 @@
<xs:element ref="cl:iodevices" minOccurs="0" />
<xs:element ref="cl:working_dir" />
<xs:element ref="cl:metadata_node" />
- <xs:element ref="cl:data_replication" minOccurs="0" />
+ <xs:element ref="cl:high_availability" minOccurs="0" />
<xs:element ref="cl:master_node" />
<xs:element ref="cl:node" maxOccurs="unbounded" />
<xs:element ref="cl:substitute_nodes" />
diff --git a/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/aql/TestExecutor.java b/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/aql/TestExecutor.java
index 288a739..8213213 100644
--- a/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/aql/TestExecutor.java
+++ b/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/aql/TestExecutor.java
@@ -76,7 +76,6 @@
import org.apache.http.util.EntityUtils;
import org.apache.hyracks.util.StorageUtil;
import org.junit.Assert;
-
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.JsonMappingException;
@@ -1108,6 +1107,21 @@
}
ProcessBuilder pb = new ProcessBuilder("kill", "-9", Integer.toString(nodePid));
pb.start().waitFor();
+ // Delete NC's transaction logs to re-initialize it as a new NC.
+ deleteNCTxnLogs(nodeId, cUnit);
+ }
+
+ private void deleteNCTxnLogs(String nodeId, CompilationUnit cUnit) throws Exception {
+ OutputFormat fmt = OutputFormat.forCompilationUnit(cUnit);
+ String endpoint = "/admin/cluster";
+ InputStream executeJSONGet = executeJSONGet(fmt, new URI("http://" + host + ":" + port + endpoint));
+ StringWriter actual = new StringWriter();
+ IOUtils.copy(executeJSONGet, actual, StandardCharsets.UTF_8);
+ String config = actual.toString();
+ ObjectMapper om = new ObjectMapper();
+ String logDir = om.readTree(config).findPath("transaction.log.dirs").get(nodeId).asText();
+ ProcessBuilder pb = new ProcessBuilder("rm", "-rf", logDir);
+ pb.start().waitFor();
}
public void executeTest(String actualPath, TestCaseContext testCaseCtx, ProcessBuilder pb,
diff --git a/asterixdb/asterix-installer/src/main/java/org/apache/asterix/installer/command/ConfigureCommand.java b/asterixdb/asterix-installer/src/main/java/org/apache/asterix/installer/command/ConfigureCommand.java
index 6d2b4b9..eac7586 100644
--- a/asterixdb/asterix-installer/src/main/java/org/apache/asterix/installer/command/ConfigureCommand.java
+++ b/asterixdb/asterix-installer/src/main/java/org/apache/asterix/installer/command/ConfigureCommand.java
@@ -44,7 +44,8 @@
@Override
protected void execCommand() throws Exception {
configureCluster("local", "local.xml");
- configureCluster("local", "local_with_replication.xml");
+ configureCluster("local", "local_chained_declustering_rep.xml");
+ configureCluster("local", "local_metadata_only_rep.xml");
configureCluster("demo", "demo.xml");
String installerConfPath = InstallerDriver.getManagixHome() + File.separator + InstallerDriver.MANAGIX_CONF_XML;
diff --git a/asterixdb/asterix-installer/src/main/java/org/apache/asterix/installer/command/ValidateCommand.java b/asterixdb/asterix-installer/src/main/java/org/apache/asterix/installer/command/ValidateCommand.java
index 4037eaf..748d811 100644
--- a/asterixdb/asterix-installer/src/main/java/org/apache/asterix/installer/command/ValidateCommand.java
+++ b/asterixdb/asterix-installer/src/main/java/org/apache/asterix/installer/command/ValidateCommand.java
@@ -262,40 +262,16 @@
boolean valid = true;
//if replication is disabled, no need to validate the settings
- if (cluster.getDataReplication() != null && cluster.getDataReplication().isEnabled()) {
-
- if (cluster.getDataReplication().getReplicationFactor() == null) {
- if (cluster.getNode().size() >= 3) {
- LOGGER.warn("Replication factor not defined. Using default value (3) " + WARNING);
-
- } else {
- valid = false;
- LOGGER.fatal("Replication factor not defined for data repliaction. " + ERROR);
- }
-
- }
-
- //replication factor = 1 means no replication
- if (cluster.getDataReplication().getReplicationFactor().intValue() == 1) {
- LOGGER.warn("Replication factor is set to 1. Disabling data replication" + WARNING);
- return true;
- }
-
- if (cluster.getDataReplication().getReplicationFactor().intValue() > cluster.getNode().size()) {
- LOGGER.fatal("Replication factor = " + cluster.getDataReplication().getReplicationFactor().intValue()
- + " requires at least " + cluster.getDataReplication().getReplicationFactor().intValue()
- + " nodes in the cluster" + ERROR);
- valid = false;
- }
-
- if (cluster.getDataReplication().getReplicationPort() == null
- || cluster.getDataReplication().getReplicationPort().toString().length() == 0) {
+ if (cluster.getHighAvailability() != null && cluster.getHighAvailability().getDataReplication() != null) {
+ if (cluster.getHighAvailability().getDataReplication().getReplicationPort() == null || cluster
+ .getHighAvailability().getDataReplication().getReplicationPort().toString().length() == 0) {
valid = false;
LOGGER.fatal("Replication data port not defined for data repliaction. " + ERROR);
}
- if (cluster.getDataReplication().getReplicationTimeOut() == null
- || (cluster.getDataReplication().getReplicationTimeOut().intValue() + "").length() == 0) {
+ if (cluster.getHighAvailability().getDataReplication().getReplicationTimeOut() == null || String
+ .valueOf(cluster.getHighAvailability().getDataReplication().getReplicationTimeOut().intValue())
+ .length() == 0) {
LOGGER.warn("Replication maximum wait time not defined. Using default value (60 seconds) " + WARNING);
}
diff --git a/asterixdb/asterix-installer/src/main/resources/clusters/local/local_with_replication.xml b/asterixdb/asterix-installer/src/main/resources/clusters/local/local_chained_declustering_rep.xml
similarity index 89%
rename from asterixdb/asterix-installer/src/main/resources/clusters/local/local_with_replication.xml
rename to asterixdb/asterix-installer/src/main/resources/clusters/local/local_chained_declustering_rep.xml
index e6a3547..954a311 100644
--- a/asterixdb/asterix-installer/src/main/resources/clusters/local/local_with_replication.xml
+++ b/asterixdb/asterix-installer/src/main/resources/clusters/local/local_chained_declustering_rep.xml
@@ -43,13 +43,18 @@
<metadata_node>nc1</metadata_node>
- <data_replication>
+ <high_availability>
<enabled>true</enabled>
- <replication_port>2000</replication_port>
- <replication_factor>2</replication_factor>
- <auto_failover>true</auto_failover>
- <replication_time_out>10</replication_time_out>
- </data_replication>
+ <data_replication>
+ <strategy>chained_declustering</strategy>
+ <replication_port>2000</replication_port>
+ <replication_factor>2</replication_factor>
+ <replication_time_out>30</replication_time_out>
+ </data_replication>
+ <fault_tolerance>
+ <strategy>auto</strategy>
+ </fault_tolerance>
+ </high_availability>
<master_node>
<id>master</id>
diff --git a/asterixdb/asterix-installer/src/main/resources/clusters/local/local_with_replication.xml b/asterixdb/asterix-installer/src/main/resources/clusters/local/local_metadata_only_rep.xml
similarity index 88%
copy from asterixdb/asterix-installer/src/main/resources/clusters/local/local_with_replication.xml
copy to asterixdb/asterix-installer/src/main/resources/clusters/local/local_metadata_only_rep.xml
index e6a3547..7a435b7 100644
--- a/asterixdb/asterix-installer/src/main/resources/clusters/local/local_with_replication.xml
+++ b/asterixdb/asterix-installer/src/main/resources/clusters/local/local_metadata_only_rep.xml
@@ -43,13 +43,20 @@
<metadata_node>nc1</metadata_node>
- <data_replication>
+ <high_availability>
<enabled>true</enabled>
- <replication_port>2000</replication_port>
- <replication_factor>2</replication_factor>
- <auto_failover>true</auto_failover>
- <replication_time_out>10</replication_time_out>
- </data_replication>
+ <data_replication>
+ <strategy>metadata_only</strategy>
+ <replication_port>2000</replication_port>
+ <replication_time_out>30</replication_time_out>
+ </data_replication>
+ <fault_tolerance>
+ <strategy>metadata_node</strategy>
+ <replica>
+ <node_id>nc2</node_id>
+ </replica>
+ </fault_tolerance>
+ </high_availability>
<master_node>
<id>master</id>
diff --git a/asterixdb/asterix-installer/src/test/java/org/apache/asterix/installer/test/AsterixInstallerIntegrationUtil.java b/asterixdb/asterix-installer/src/test/java/org/apache/asterix/installer/test/AsterixInstallerIntegrationUtil.java
index e840796..bfe63be 100644
--- a/asterixdb/asterix-installer/src/test/java/org/apache/asterix/installer/test/AsterixInstallerIntegrationUtil.java
+++ b/asterixdb/asterix-installer/src/test/java/org/apache/asterix/installer/test/AsterixInstallerIntegrationUtil.java
@@ -55,8 +55,10 @@
private static IHyracksClientConnection hcc;
private static final String CLUSTERS_BASE_PATH = "clusters" + File.separator + "local" + File.separator;
public static final String LOCAL_CLUSTER_PATH = CLUSTERS_BASE_PATH + File.separator + "local.xml";
- public static final String LOCAL_CLUSTER_WITH_REPLICATION_PATH = CLUSTERS_BASE_PATH + File.separator
- + "local_with_replication.xml";
+ public static final String LOCAL_CLUSTER_CHAINED_DECLUSTERING_REP_PATH = CLUSTERS_BASE_PATH + File.separator
+ + "local_chained_declustering_rep.xml";
+ public static final String LOCAL_CLUSTER_METADATA_ONLY_REP_PATH = CLUSTERS_BASE_PATH + File.separator
+ + "local_metadata_only_rep.xml";
public static void deinit() throws Exception {
deleteInstance();
diff --git a/asterixdb/asterix-installer/src/test/java/org/apache/asterix/installer/test/MetadataReplicationIT.java b/asterixdb/asterix-installer/src/test/java/org/apache/asterix/installer/test/MetadataReplicationIT.java
new file mode 100644
index 0000000..56ac759
--- /dev/null
+++ b/asterixdb/asterix-installer/src/test/java/org/apache/asterix/installer/test/MetadataReplicationIT.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.installer.test;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Map;
+import java.util.logging.Logger;
+
+import org.apache.asterix.event.model.AsterixInstance.State;
+import org.apache.asterix.test.aql.TestExecutor;
+import org.apache.asterix.test.base.RetainLogsRule;
+import org.apache.asterix.testframework.context.TestCaseContext;
+import org.apache.commons.lang3.StringUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestRule;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(Parameterized.class)
+public class MetadataReplicationIT {
+
+ private static final String PATH_BASE = "src/test/resources/integrationts/metadata_only_replication/";
+ private static final String PATH_ACTUAL = "target" + File.separator + "ittest" + File.separator;
+ private static final Logger LOGGER = Logger.getLogger(MetadataReplicationIT.class.getName());
+ private static String reportPath = new File(
+ StringUtils.join(new String[] { "target", "failsafe-reports" }, File.separator)).getAbsolutePath();
+
+ private final TestExecutor testExecutor = new TestExecutor();
+ private TestCaseContext tcCtx;
+ private static String scriptHomePath;
+ private static File asterixInstallerPath;
+ private static ProcessBuilder pb;
+ private static Map<String, String> env;
+
+ public MetadataReplicationIT(TestCaseContext tcCtx) {
+ this.tcCtx = tcCtx;
+ }
+
+ @Rule
+ public TestRule retainLogs = new RetainLogsRule(AsterixInstallerIntegrationUtil.getManagixHome(), reportPath);
+
+ @BeforeClass
+ public static void setUp() throws Exception {
+ try {
+ pb = new ProcessBuilder();
+ env = pb.environment();
+ asterixInstallerPath = new File(System.getProperty("user.dir"));
+ scriptHomePath = asterixInstallerPath + File.separator + "src" + File.separator + "test" + File.separator
+ + "resources" + File.separator + "integrationts" + File.separator + "metadata_only_replication" + File.separator
+ + "scripts";
+ env.put("SCRIPT_HOME", scriptHomePath);
+ } catch (Throwable th) {
+ th.printStackTrace();
+ throw th;
+ }
+ }
+
+ @Before
+ public void before() throws Exception {
+ LOGGER.info("Creating new instance...");
+ AsterixInstallerIntegrationUtil.init(AsterixInstallerIntegrationUtil.LOCAL_CLUSTER_METADATA_ONLY_REP_PATH);
+ LOGGER.info("Instacne created.");
+ AsterixInstallerIntegrationUtil.transformIntoRequiredState(State.ACTIVE);
+ LOGGER.info("Instance is in ACTIVE state.");
+ }
+
+ @After
+ public void after() throws Exception {
+ LOGGER.info("Destroying instance...");
+ AsterixInstallerIntegrationUtil.deinit();
+ LOGGER.info("Instance destroyed.");
+ }
+
+ @Test
+ public void test() throws Exception {
+ testExecutor.executeTest(PATH_ACTUAL, tcCtx, pb, false);
+ }
+
+ @Parameterized.Parameters(name = "MetadataReplicationIT {index}: {0}")
+ public static Collection<Object[]> tests() throws Exception {
+ Collection<Object[]> testArgs = buildTestsInXml(TestCaseContext.DEFAULT_TESTSUITE_XML_NAME);
+ if (testArgs.size() == 0) {
+ testArgs = buildTestsInXml(TestCaseContext.DEFAULT_TESTSUITE_XML_NAME);
+ }
+ return testArgs;
+ }
+
+ protected static Collection<Object[]> buildTestsInXml(String xmlfile) throws Exception {
+ Collection<Object[]> testArgs = new ArrayList<>();
+ TestCaseContext.Builder b = new TestCaseContext.Builder();
+ for (TestCaseContext ctx : b.build(new File(PATH_BASE), xmlfile)) {
+ testArgs.add(new Object[] { ctx });
+ }
+ return testArgs;
+ }
+}
diff --git a/asterixdb/asterix-installer/src/test/java/org/apache/asterix/installer/test/ReplicationIT.java b/asterixdb/asterix-installer/src/test/java/org/apache/asterix/installer/test/ReplicationIT.java
index 5f47849..5c8a68a 100644
--- a/asterixdb/asterix-installer/src/test/java/org/apache/asterix/installer/test/ReplicationIT.java
+++ b/asterixdb/asterix-installer/src/test/java/org/apache/asterix/installer/test/ReplicationIT.java
@@ -80,7 +80,7 @@
@Before
public void before() throws Exception {
LOGGER.info("Creating new instance...");
- AsterixInstallerIntegrationUtil.init(AsterixInstallerIntegrationUtil.LOCAL_CLUSTER_WITH_REPLICATION_PATH);
+ AsterixInstallerIntegrationUtil.init(AsterixInstallerIntegrationUtil.LOCAL_CLUSTER_CHAINED_DECLUSTERING_REP_PATH);
LOGGER.info("Instacne created.");
AsterixInstallerIntegrationUtil.transformIntoRequiredState(State.ACTIVE);
LOGGER.info("Instance is in ACTIVE state.");
diff --git a/asterixdb/asterix-installer/src/test/resources/integrationts/metadata_only_replication/queries/metadata_recovery/metadata_node_recovery/metadata_node_recovery.1.ddl.aql b/asterixdb/asterix-installer/src/test/resources/integrationts/metadata_only_replication/queries/metadata_recovery/metadata_node_recovery/metadata_node_recovery.1.ddl.aql
new file mode 100644
index 0000000..725ed61
--- /dev/null
+++ b/asterixdb/asterix-installer/src/test/resources/integrationts/metadata_only_replication/queries/metadata_recovery/metadata_node_recovery/metadata_node_recovery.1.ddl.aql
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Test case Name : metadata_node_recovery.aql
+ * Description : Make sure metadata only replication completes as expected.
+ The test goes as follows:
+ start metadata node and a replica, update metadata, kill metadata node,
+ start metadata node again, validate metadata, update metadata, kill replica,
+ start replica, update metadata, kill metadata node, start metadata node,
+ validate metadata.
+ * Expected Result : Success
+ * Date : January 10 2017
+ */
+drop dataverse Social if exists;
+create dataverse Social;
+use dataverse Social;
+
+create type EmploymentType as open {
+ organization-name: string,
+ start-date: date,
+ end-date: date?
+}
+
+create type UserType as closed {
+ id: int,
+ alias: string,
+ name: string,
+ user-since: datetime,
+ friend-ids: {{ int32 }},
+ employment: [EmploymentType]
+}
+
+create dataset Users(UserType)
+primary key id;
\ No newline at end of file
diff --git a/asterixdb/asterix-installer/src/test/resources/integrationts/metadata_only_replication/queries/metadata_recovery/metadata_node_recovery/metadata_node_recovery.10.node.aql b/asterixdb/asterix-installer/src/test/resources/integrationts/metadata_only_replication/queries/metadata_recovery/metadata_node_recovery/metadata_node_recovery.10.node.aql
new file mode 100644
index 0000000..f4685df
--- /dev/null
+++ b/asterixdb/asterix-installer/src/test/resources/integrationts/metadata_only_replication/queries/metadata_recovery/metadata_node_recovery/metadata_node_recovery.10.node.aql
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Test case Name : metadata_node_recovery.aql
+ * Description : Make sure metadata only replication completes as expected.
+ The test goes as follows:
+ start metadata node and a replica, update metadata, kill metadata node,
+ start metadata node again, validate metadata, update metadata, kill replica,
+ start replica, update metadata, kill metadata node, start metadata node,
+ validate metadata.
+ * Expected Result : Success
+ * Date : January 10 2017
+ */
+kill asterix_nc2
\ No newline at end of file
diff --git a/asterixdb/asterix-installer/src/test/resources/integrationts/metadata_only_replication/queries/metadata_recovery/metadata_node_recovery/metadata_node_recovery.11.sleep.aql b/asterixdb/asterix-installer/src/test/resources/integrationts/metadata_only_replication/queries/metadata_recovery/metadata_node_recovery/metadata_node_recovery.11.sleep.aql
new file mode 100644
index 0000000..51b0e76
--- /dev/null
+++ b/asterixdb/asterix-installer/src/test/resources/integrationts/metadata_only_replication/queries/metadata_recovery/metadata_node_recovery/metadata_node_recovery.11.sleep.aql
@@ -0,0 +1 @@
+60000
\ No newline at end of file
diff --git a/asterixdb/asterix-installer/src/test/resources/integrationts/metadata_only_replication/queries/metadata_recovery/metadata_node_recovery/metadata_node_recovery.12.mgx.aql b/asterixdb/asterix-installer/src/test/resources/integrationts/metadata_only_replication/queries/metadata_recovery/metadata_node_recovery/metadata_node_recovery.12.mgx.aql
new file mode 100644
index 0000000..0e7bef0
--- /dev/null
+++ b/asterixdb/asterix-installer/src/test/resources/integrationts/metadata_only_replication/queries/metadata_recovery/metadata_node_recovery/metadata_node_recovery.12.mgx.aql
@@ -0,0 +1 @@
+startnode -n asterix -nodes nc2
\ No newline at end of file
diff --git a/asterixdb/asterix-installer/src/test/resources/integrationts/metadata_only_replication/queries/metadata_recovery/metadata_node_recovery/metadata_node_recovery.13.sleep.aql b/asterixdb/asterix-installer/src/test/resources/integrationts/metadata_only_replication/queries/metadata_recovery/metadata_node_recovery/metadata_node_recovery.13.sleep.aql
new file mode 100644
index 0000000..d0000aa
--- /dev/null
+++ b/asterixdb/asterix-installer/src/test/resources/integrationts/metadata_only_replication/queries/metadata_recovery/metadata_node_recovery/metadata_node_recovery.13.sleep.aql
@@ -0,0 +1 @@
+20000
\ No newline at end of file
diff --git a/asterixdb/asterix-installer/src/test/resources/integrationts/metadata_only_replication/queries/metadata_recovery/metadata_node_recovery/metadata_node_recovery.14.ddl.aql b/asterixdb/asterix-installer/src/test/resources/integrationts/metadata_only_replication/queries/metadata_recovery/metadata_node_recovery/metadata_node_recovery.14.ddl.aql
new file mode 100644
index 0000000..eee7423
--- /dev/null
+++ b/asterixdb/asterix-installer/src/test/resources/integrationts/metadata_only_replication/queries/metadata_recovery/metadata_node_recovery/metadata_node_recovery.14.ddl.aql
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Test case Name : metadata_node_recovery.aql
+ * Description : Make sure metadata only replication completes as expected.
+ The test goes as follows:
+ start metadata node and a replica, update metadata, kill metadata node,
+ start metadata node again, validate metadata, update metadata, kill replica,
+ start replica, update metadata, kill metadata node, start metadata node,
+ validate metadata.
+ * Expected Result : Success
+ * Date : January 10 2017
+ */
+use dataverse Social;
+
+create dataset UsersAfterReplicaRecovery(UserType)
+primary key id;
\ No newline at end of file
diff --git a/asterixdb/asterix-installer/src/test/resources/integrationts/metadata_only_replication/queries/metadata_recovery/metadata_node_recovery/metadata_node_recovery.15.node.aql b/asterixdb/asterix-installer/src/test/resources/integrationts/metadata_only_replication/queries/metadata_recovery/metadata_node_recovery/metadata_node_recovery.15.node.aql
new file mode 100644
index 0000000..720f33e
--- /dev/null
+++ b/asterixdb/asterix-installer/src/test/resources/integrationts/metadata_only_replication/queries/metadata_recovery/metadata_node_recovery/metadata_node_recovery.15.node.aql
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Test case Name : metadata_node_recovery.aql
+ * Description : Make sure metadata only replication completes as expected.
+ The test goes as follows:
+ start metadata node and a replica, update metadata, kill metadata node,
+ start metadata node again, validate metadata, update metadata, kill replica,
+ start replica, update metadata, kill metadata node, start metadata node,
+ validate metadata.
+ * Expected Result : Success
+ * Date : January 10 2017
+ */
+kill asterix_nc1
\ No newline at end of file
diff --git a/asterixdb/asterix-installer/src/test/resources/integrationts/metadata_only_replication/queries/metadata_recovery/metadata_node_recovery/metadata_node_recovery.16.sleep.aql b/asterixdb/asterix-installer/src/test/resources/integrationts/metadata_only_replication/queries/metadata_recovery/metadata_node_recovery/metadata_node_recovery.16.sleep.aql
new file mode 100644
index 0000000..51b0e76
--- /dev/null
+++ b/asterixdb/asterix-installer/src/test/resources/integrationts/metadata_only_replication/queries/metadata_recovery/metadata_node_recovery/metadata_node_recovery.16.sleep.aql
@@ -0,0 +1 @@
+60000
\ No newline at end of file
diff --git a/asterixdb/asterix-installer/src/test/resources/integrationts/metadata_only_replication/queries/metadata_recovery/metadata_node_recovery/metadata_node_recovery.17.mgx.aql b/asterixdb/asterix-installer/src/test/resources/integrationts/metadata_only_replication/queries/metadata_recovery/metadata_node_recovery/metadata_node_recovery.17.mgx.aql
new file mode 100644
index 0000000..67b492c
--- /dev/null
+++ b/asterixdb/asterix-installer/src/test/resources/integrationts/metadata_only_replication/queries/metadata_recovery/metadata_node_recovery/metadata_node_recovery.17.mgx.aql
@@ -0,0 +1 @@
+startnode -n asterix -nodes nc1
\ No newline at end of file
diff --git a/asterixdb/asterix-installer/src/test/resources/integrationts/metadata_only_replication/queries/metadata_recovery/metadata_node_recovery/metadata_node_recovery.18.sleep.aql b/asterixdb/asterix-installer/src/test/resources/integrationts/metadata_only_replication/queries/metadata_recovery/metadata_node_recovery/metadata_node_recovery.18.sleep.aql
new file mode 100644
index 0000000..d0000aa
--- /dev/null
+++ b/asterixdb/asterix-installer/src/test/resources/integrationts/metadata_only_replication/queries/metadata_recovery/metadata_node_recovery/metadata_node_recovery.18.sleep.aql
@@ -0,0 +1 @@
+20000
\ No newline at end of file
diff --git a/asterixdb/asterix-installer/src/test/resources/integrationts/metadata_only_replication/queries/metadata_recovery/metadata_node_recovery/metadata_node_recovery.19.query.aql b/asterixdb/asterix-installer/src/test/resources/integrationts/metadata_only_replication/queries/metadata_recovery/metadata_node_recovery/metadata_node_recovery.19.query.aql
new file mode 100644
index 0000000..5da6c59
--- /dev/null
+++ b/asterixdb/asterix-installer/src/test/resources/integrationts/metadata_only_replication/queries/metadata_recovery/metadata_node_recovery/metadata_node_recovery.19.query.aql
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Test case Name : metadata_node_recovery.aql
+ * Description : Make sure metadata only replication completes as expected.
+ The test goes as follows:
+ start metadata node and a replica, update metadata, kill metadata node,
+ start metadata node again, validate metadata, update metadata, kill replica,
+ start replica, update metadata, kill metadata node, start metadata node,
+ validate metadata.
+ * Expected Result : Success
+ * Date : January 10 2017
+ */
+count(
+for $x in dataset Metadata.Dataset
+where $x.DatasetName ='UsersAfterRecovery'
+or $x.DatasetName ='UsersAfterReplicaRecovery'
+return $x.DatasetName
+);
\ No newline at end of file
diff --git a/asterixdb/asterix-installer/src/test/resources/integrationts/metadata_only_replication/queries/metadata_recovery/metadata_node_recovery/metadata_node_recovery.2.node.aql b/asterixdb/asterix-installer/src/test/resources/integrationts/metadata_only_replication/queries/metadata_recovery/metadata_node_recovery/metadata_node_recovery.2.node.aql
new file mode 100644
index 0000000..720f33e
--- /dev/null
+++ b/asterixdb/asterix-installer/src/test/resources/integrationts/metadata_only_replication/queries/metadata_recovery/metadata_node_recovery/metadata_node_recovery.2.node.aql
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Test case Name : metadata_node_recovery.aql
+ * Description : Make sure metadata only replication completes as expected.
+ The test goes as follows:
+ start metadata node and a replica, update metadata, kill metadata node,
+ start metadata node again, validate metadata, update metadata, kill replica,
+ start replica, update metadata, kill metadata node, start metadata node,
+ validate metadata.
+ * Expected Result : Success
+ * Date : January 10 2017
+ */
+kill asterix_nc1
\ No newline at end of file
diff --git a/asterixdb/asterix-installer/src/test/resources/integrationts/metadata_only_replication/queries/metadata_recovery/metadata_node_recovery/metadata_node_recovery.3.sleep.aql b/asterixdb/asterix-installer/src/test/resources/integrationts/metadata_only_replication/queries/metadata_recovery/metadata_node_recovery/metadata_node_recovery.3.sleep.aql
new file mode 100644
index 0000000..51b0e76
--- /dev/null
+++ b/asterixdb/asterix-installer/src/test/resources/integrationts/metadata_only_replication/queries/metadata_recovery/metadata_node_recovery/metadata_node_recovery.3.sleep.aql
@@ -0,0 +1 @@
+60000
\ No newline at end of file
diff --git a/asterixdb/asterix-installer/src/test/resources/integrationts/metadata_only_replication/queries/metadata_recovery/metadata_node_recovery/metadata_node_recovery.4.get.http b/asterixdb/asterix-installer/src/test/resources/integrationts/metadata_only_replication/queries/metadata_recovery/metadata_node_recovery/metadata_node_recovery.4.get.http
new file mode 100644
index 0000000..fdacfd0
--- /dev/null
+++ b/asterixdb/asterix-installer/src/test/resources/integrationts/metadata_only_replication/queries/metadata_recovery/metadata_node_recovery/metadata_node_recovery.4.get.http
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Test case Name : metadata_node_recovery.aql
+ * Description : Make sure metadata only replication completes as expected.
+ The test goes as follows:
+ start metadata node and a replica, update metadata, kill metadata node,
+ start metadata node again, validate metadata, update metadata, kill replica,
+ start replica, update metadata, kill metadata node, start metadata node,
+ validate metadata.
+ * Expected Result : Success
+ * Date : January 10 2017
+ */
+/admin/cluster/summary
\ No newline at end of file
diff --git a/asterixdb/asterix-installer/src/test/resources/integrationts/metadata_only_replication/queries/metadata_recovery/metadata_node_recovery/metadata_node_recovery.5.mgx.aql b/asterixdb/asterix-installer/src/test/resources/integrationts/metadata_only_replication/queries/metadata_recovery/metadata_node_recovery/metadata_node_recovery.5.mgx.aql
new file mode 100644
index 0000000..67b492c
--- /dev/null
+++ b/asterixdb/asterix-installer/src/test/resources/integrationts/metadata_only_replication/queries/metadata_recovery/metadata_node_recovery/metadata_node_recovery.5.mgx.aql
@@ -0,0 +1 @@
+startnode -n asterix -nodes nc1
\ No newline at end of file
diff --git a/asterixdb/asterix-installer/src/test/resources/integrationts/metadata_only_replication/queries/metadata_recovery/metadata_node_recovery/metadata_node_recovery.6.sleep.aql b/asterixdb/asterix-installer/src/test/resources/integrationts/metadata_only_replication/queries/metadata_recovery/metadata_node_recovery/metadata_node_recovery.6.sleep.aql
new file mode 100644
index 0000000..d0000aa
--- /dev/null
+++ b/asterixdb/asterix-installer/src/test/resources/integrationts/metadata_only_replication/queries/metadata_recovery/metadata_node_recovery/metadata_node_recovery.6.sleep.aql
@@ -0,0 +1 @@
+20000
\ No newline at end of file
diff --git a/asterixdb/asterix-installer/src/test/resources/integrationts/metadata_only_replication/queries/metadata_recovery/metadata_node_recovery/metadata_node_recovery.7.get.http b/asterixdb/asterix-installer/src/test/resources/integrationts/metadata_only_replication/queries/metadata_recovery/metadata_node_recovery/metadata_node_recovery.7.get.http
new file mode 100644
index 0000000..fdacfd0
--- /dev/null
+++ b/asterixdb/asterix-installer/src/test/resources/integrationts/metadata_only_replication/queries/metadata_recovery/metadata_node_recovery/metadata_node_recovery.7.get.http
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Test case Name : metadata_node_recovery.aql
+ * Description : Make sure metadata only replication completes as expected.
+ The test goes as follows:
+ start metadata node and a replica, update metadata, kill metadata node,
+ start metadata node again, validate metadata, update metadata, kill replica,
+ start replica, update metadata, kill metadata node, start metadata node,
+ validate metadata.
+ * Expected Result : Success
+ * Date : January 10 2017
+ */
+/admin/cluster/summary
\ No newline at end of file
diff --git a/asterixdb/asterix-installer/src/test/resources/integrationts/metadata_only_replication/queries/metadata_recovery/metadata_node_recovery/metadata_node_recovery.8.query.aql b/asterixdb/asterix-installer/src/test/resources/integrationts/metadata_only_replication/queries/metadata_recovery/metadata_node_recovery/metadata_node_recovery.8.query.aql
new file mode 100644
index 0000000..778e9f3
--- /dev/null
+++ b/asterixdb/asterix-installer/src/test/resources/integrationts/metadata_only_replication/queries/metadata_recovery/metadata_node_recovery/metadata_node_recovery.8.query.aql
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Test case Name : metadata_node_recovery.aql
+ * Description : Make sure metadata only replication completes as expected.
+ The test goes as follows:
+ start metadata node and a replica, update metadata, kill metadata node,
+ start metadata node again, validate metadata, update metadata, kill replica,
+ start replica, update metadata, kill metadata node, start metadata node,
+ validate metadata.
+ * Expected Result : Success
+ * Date : January 10 2017
+ */
+count(
+for $x in dataset Metadata.Dataset
+where $x.DatasetName ='Users'
+return $x.DatasetName
+);
\ No newline at end of file
diff --git a/asterixdb/asterix-installer/src/test/resources/integrationts/metadata_only_replication/queries/metadata_recovery/metadata_node_recovery/metadata_node_recovery.9.ddl.aql b/asterixdb/asterix-installer/src/test/resources/integrationts/metadata_only_replication/queries/metadata_recovery/metadata_node_recovery/metadata_node_recovery.9.ddl.aql
new file mode 100644
index 0000000..376df70
--- /dev/null
+++ b/asterixdb/asterix-installer/src/test/resources/integrationts/metadata_only_replication/queries/metadata_recovery/metadata_node_recovery/metadata_node_recovery.9.ddl.aql
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Test case Name : metadata_node_recovery.aql
+ * Description : Make sure metadata only replication completes as expected.
+ The test goes as follows:
+ start metadata node and a replica, update metadata, kill metadata node,
+ start metadata node again, validate metadata, update metadata, kill replica,
+ start replica, update metadata, kill metadata node, start metadata node,
+ validate metadata.
+ * Expected Result : Success
+ * Date : January 10 2017
+ */
+use dataverse Social;
+
+create dataset UsersAfterRecovery(UserType)
+primary key id;
\ No newline at end of file
diff --git a/asterixdb/asterix-installer/src/test/resources/integrationts/metadata_only_replication/results/metadata_recovery/metadata_node_recovery/metadata_node_recovery.cluster_state.4.adm b/asterixdb/asterix-installer/src/test/resources/integrationts/metadata_only_replication/results/metadata_recovery/metadata_node_recovery/metadata_node_recovery.cluster_state.4.adm
new file mode 100644
index 0000000..d6ea4b7
--- /dev/null
+++ b/asterixdb/asterix-installer/src/test/resources/integrationts/metadata_only_replication/results/metadata_recovery/metadata_node_recovery/metadata_node_recovery.cluster_state.4.adm
@@ -0,0 +1,34 @@
+{
+ "metadata_node" : "asterix_nc1",
+ "partitions" : {
+ "0" : {
+ "partitionId" : 0,
+ "nodeId" : "asterix_nc1",
+ "activeNodeId" : "asterix_nc1",
+ "active" : false,
+ "iodeviceNum" : 0
+ },
+ "1" : {
+ "partitionId" : 1,
+ "nodeId" : "asterix_nc1",
+ "activeNodeId" : "asterix_nc1",
+ "active" : false,
+ "iodeviceNum" : 1
+ },
+ "2" : {
+ "partitionId" : 2,
+ "nodeId" : "asterix_nc2",
+ "activeNodeId" : "asterix_nc2",
+ "active" : true,
+ "iodeviceNum" : 0
+ },
+ "3" : {
+ "partitionId" : 3,
+ "nodeId" : "asterix_nc2",
+ "activeNodeId" : "asterix_nc2",
+ "active" : true,
+ "iodeviceNum" : 1
+ }
+ },
+ "state" : "UNUSABLE"
+}
\ No newline at end of file
diff --git a/asterixdb/asterix-installer/src/test/resources/integrationts/metadata_only_replication/results/metadata_recovery/metadata_node_recovery/metadata_node_recovery.cluster_state.7.adm b/asterixdb/asterix-installer/src/test/resources/integrationts/metadata_only_replication/results/metadata_recovery/metadata_node_recovery/metadata_node_recovery.cluster_state.7.adm
new file mode 100644
index 0000000..579caac
--- /dev/null
+++ b/asterixdb/asterix-installer/src/test/resources/integrationts/metadata_only_replication/results/metadata_recovery/metadata_node_recovery/metadata_node_recovery.cluster_state.7.adm
@@ -0,0 +1,34 @@
+{
+ "metadata_node" : "asterix_nc1",
+ "partitions" : {
+ "0" : {
+ "partitionId" : 0,
+ "nodeId" : "asterix_nc1",
+ "activeNodeId" : "asterix_nc1",
+ "active" : true,
+ "iodeviceNum" : 0
+ },
+ "1" : {
+ "partitionId" : 1,
+ "nodeId" : "asterix_nc1",
+ "activeNodeId" : "asterix_nc1",
+ "active" : true,
+ "iodeviceNum" : 1
+ },
+ "2" : {
+ "partitionId" : 2,
+ "nodeId" : "asterix_nc2",
+ "activeNodeId" : "asterix_nc2",
+ "active" : true,
+ "iodeviceNum" : 0
+ },
+ "3" : {
+ "partitionId" : 3,
+ "nodeId" : "asterix_nc2",
+ "activeNodeId" : "asterix_nc2",
+ "active" : true,
+ "iodeviceNum" : 1
+ }
+ },
+ "state" : "ACTIVE"
+}
\ No newline at end of file
diff --git a/asterixdb/asterix-installer/src/test/resources/integrationts/metadata_only_replication/results/metadata_recovery/metadata_node_recovery/metadata_node_recovery.query.19.adm b/asterixdb/asterix-installer/src/test/resources/integrationts/metadata_only_replication/results/metadata_recovery/metadata_node_recovery/metadata_node_recovery.query.19.adm
new file mode 100644
index 0000000..d8263ee
--- /dev/null
+++ b/asterixdb/asterix-installer/src/test/resources/integrationts/metadata_only_replication/results/metadata_recovery/metadata_node_recovery/metadata_node_recovery.query.19.adm
@@ -0,0 +1 @@
+2
\ No newline at end of file
diff --git a/asterixdb/asterix-installer/src/test/resources/integrationts/metadata_only_replication/results/metadata_recovery/metadata_node_recovery/metadata_node_recovery.query.8.adm b/asterixdb/asterix-installer/src/test/resources/integrationts/metadata_only_replication/results/metadata_recovery/metadata_node_recovery/metadata_node_recovery.query.8.adm
new file mode 100644
index 0000000..56a6051
--- /dev/null
+++ b/asterixdb/asterix-installer/src/test/resources/integrationts/metadata_only_replication/results/metadata_recovery/metadata_node_recovery/metadata_node_recovery.query.8.adm
@@ -0,0 +1 @@
+1
\ No newline at end of file
diff --git a/asterixdb/asterix-installer/src/test/resources/integrationts/metadata_only_replication/testsuite.xml b/asterixdb/asterix-installer/src/test/resources/integrationts/metadata_only_replication/testsuite.xml
new file mode 100644
index 0000000..d0ac325
--- /dev/null
+++ b/asterixdb/asterix-installer/src/test/resources/integrationts/metadata_only_replication/testsuite.xml
@@ -0,0 +1,27 @@
+<!--
+ ! Licensed to the Apache Software Foundation (ASF) under one
+ ! or more contributor license agreements. See the NOTICE file
+ ! distributed with this work for additional information
+ ! regarding copyright ownership. The ASF licenses this file
+ ! to you under the Apache License, Version 2.0 (the
+ ! "License"); you may not use this file except in compliance
+ ! with the License. You may obtain a copy of the License at
+ !
+ ! http://www.apache.org/licenses/LICENSE-2.0
+ !
+ ! Unless required by applicable law or agreed to in writing,
+ ! software distributed under the License is distributed on an
+ ! "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ ! KIND, either express or implied. See the License for the
+ ! specific language governing permissions and limitations
+ ! under the License.
+ !-->
+<test-suite xmlns="urn:xml.testframework.asterix.apache.org" ResultOffsetPath="results" QueryOffsetPath="queries" QueryFileExtension=".aql">
+ <test-group name="metadata_recovery">
+ <test-case FilePath="metadata_recovery">
+ <compilation-unit name="metadata_node_recovery">
+ <output-dir compare="Text">metadata_node_recovery</output-dir>
+ </compilation-unit>
+ </test-case>
+ </test-group>
+</test-suite>
\ No newline at end of file
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/functions/ReplicaFilesRequest.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/functions/ReplicaFilesRequest.java
index 7502737..f5b2378 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/functions/ReplicaFilesRequest.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/functions/ReplicaFilesRequest.java
@@ -26,19 +26,19 @@
import java.util.Set;
public class ReplicaFilesRequest {
- private final Set<String> replicaIds;
+ private final Set<Integer> partitionIds;
private final Set<String> existingFiles;
- public ReplicaFilesRequest(Set<String> replicaIds, Set<String> existingFiles) {
- this.replicaIds = replicaIds;
+ public ReplicaFilesRequest(Set<Integer> partitionIds, Set<String> existingFiles) {
+ this.partitionIds = partitionIds;
this.existingFiles = existingFiles;
}
public void serialize(OutputStream out) throws IOException {
DataOutputStream dos = new DataOutputStream(out);
- dos.writeInt(replicaIds.size());
- for (String replicaId : replicaIds) {
- dos.writeUTF(replicaId);
+ dos.writeInt(partitionIds.size());
+ for (Integer partitionId : partitionIds) {
+ dos.writeInt(partitionId);
}
dos.writeInt(existingFiles.size());
for (String fileName : existingFiles) {
@@ -48,20 +48,20 @@
public static ReplicaFilesRequest create(DataInput input) throws IOException {
int size = input.readInt();
- Set<String> replicaIds = new HashSet<String>(size);
+ Set<Integer> partitionIds = new HashSet<>(size);
for (int i = 0; i < size; i++) {
- replicaIds.add(input.readUTF());
+ partitionIds.add(input.readInt());
}
int filesCount = input.readInt();
- Set<String> existingFiles = new HashSet<String>(filesCount);
+ Set<String> existingFiles = new HashSet<>(filesCount);
for (int i = 0; i < filesCount; i++) {
existingFiles.add(input.readUTF());
}
- return new ReplicaFilesRequest(replicaIds, existingFiles);
+ return new ReplicaFilesRequest(partitionIds, existingFiles);
}
- public Set<String> getReplicaIds() {
- return replicaIds;
+ public Set<Integer> getPartitionIds() {
+ return partitionIds;
}
public Set<String> getExistingFiles() {
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationChannel.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationChannel.java
index 8050e8d..044707a 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationChannel.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationChannel.java
@@ -52,13 +52,16 @@
import org.apache.asterix.common.replication.IReplicaResourcesManager;
import org.apache.asterix.common.replication.IReplicationChannel;
import org.apache.asterix.common.replication.IReplicationManager;
+import org.apache.asterix.common.replication.IReplicationStrategy;
import org.apache.asterix.common.replication.IReplicationThread;
import org.apache.asterix.common.replication.ReplicaEvent;
+import org.apache.asterix.common.storage.IndexFileProperties;
import org.apache.asterix.common.transactions.IAppRuntimeContextProvider;
import org.apache.asterix.common.transactions.ILogManager;
import org.apache.asterix.common.transactions.LogRecord;
import org.apache.asterix.common.transactions.LogSource;
import org.apache.asterix.common.transactions.LogType;
+import org.apache.asterix.common.utils.StoragePathUtil;
import org.apache.asterix.common.utils.TransactionUtil;
import org.apache.asterix.replication.functions.ReplicaFilesRequest;
import org.apache.asterix.replication.functions.ReplicaIndexFlushRequest;
@@ -101,6 +104,8 @@
private final Set<Integer> nodeHostedPartitions;
private final ReplicationNotifier replicationNotifier;
private final Object flushLogslock = new Object();
+ private final IDatasetLifecycleManager dsLifecycleManager;
+ private final PersistentLocalResourceRepository localResourceRep;
public ReplicationChannel(String nodeId, ReplicationProperties replicationProperties, ILogManager logManager,
IReplicaResourcesManager replicaResoucesManager, IReplicationManager replicationManager,
@@ -111,6 +116,9 @@
this.replicationManager = replicationManager;
this.replicationProperties = replicationProperties;
this.appContextProvider = asterixAppRuntimeContextProvider;
+ this.dsLifecycleManager = asterixAppRuntimeContextProvider.getDatasetLifecycleManager();
+ this.localResourceRep = (PersistentLocalResourceRepository) asterixAppRuntimeContextProvider
+ .getLocalResourceRepository();
lsmComponentRemoteLSN2LocalLSNMappingTaskQ = new LinkedBlockingQueue<>();
pendingNotificationRemoteLogsQ = new LinkedBlockingQueue<>();
lsmComponentId2PropertiesMap = new ConcurrentHashMap<>();
@@ -118,10 +126,9 @@
lsmComponentLSNMappingService = new LSMComponentsSyncService();
replicationNotifier = new ReplicationNotifier();
replicationThreads = Executors.newCachedThreadPool(appContext.getThreadFactory());
- Map<String, ClusterPartition[]> nodePartitions =
- ((IPropertiesProvider) asterixAppRuntimeContextProvider.getAppContext()).getMetadataProperties()
- .getNodePartitions();
- Set<String> nodeReplicationClients = replicationProperties.getNodeReplicationClients(nodeId);
+ Map<String, ClusterPartition[]> nodePartitions = ((IPropertiesProvider) asterixAppRuntimeContextProvider
+ .getAppContext()).getMetadataProperties().getNodePartitions();
+ Set<String> nodeReplicationClients = replicationProperties.getRemotePrimaryReplicasIds(nodeId);
List<Integer> clientsPartitions = new ArrayList<>();
for (String clientId : nodeReplicationClients) {
for (ClusterPartition clusterPartition : nodePartitions.get(clientId)) {
@@ -141,8 +148,8 @@
try {
serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.configureBlocking(true);
- InetSocketAddress replicationChannelAddress =
- new InetSocketAddress(InetAddress.getByName(nodeIP), dataPort);
+ InetSocketAddress replicationChannelAddress = new InetSocketAddress(InetAddress.getByName(nodeIP),
+ dataPort);
serverSocketChannel.socket().bind(replicationChannelAddress);
lsmComponentLSNMappingService.start();
replicationNotifier.start();
@@ -169,9 +176,8 @@
if (remainingFile == 0) {
if (lsmCompProp.getOpType() == LSMOperationType.FLUSH && lsmCompProp.getReplicaLSN() != null
&& replicaUniqueLSN2RemoteMapping.containsKey(lsmCompProp.getNodeUniqueLSN())) {
- int remainingIndexes =
- replicaUniqueLSN2RemoteMapping.get(lsmCompProp.getNodeUniqueLSN()).numOfFlushedIndexes
- .decrementAndGet();
+ int remainingIndexes = replicaUniqueLSN2RemoteMapping
+ .get(lsmCompProp.getNodeUniqueLSN()).numOfFlushedIndexes.decrementAndGet();
if (remainingIndexes == 0) {
/**
* Note: there is a chance that this will never be removed because some
@@ -217,8 +223,8 @@
public void run() {
Thread.currentThread().setName("Replication Thread");
try {
- ReplicationRequestType replicationFunction =
- ReplicationProtocol.getRequestType(socketChannel, inBuffer);
+ ReplicationRequestType replicationFunction = ReplicationProtocol.getRequestType(socketChannel,
+ inBuffer);
while (replicationFunction != ReplicationRequestType.GOODBYE) {
switch (replicationFunction) {
case REPLICATE_LOG:
@@ -282,8 +288,8 @@
Set<Integer> datasetsToForceFlush = new HashSet<>();
for (IndexInfo iInfo : openIndexesInfo) {
if (requestedIndexesToBeFlushed.contains(iInfo.getResourceId())) {
- AbstractLSMIOOperationCallback ioCallback =
- (AbstractLSMIOOperationCallback) iInfo.getIndex().getIOOperationCallback();
+ AbstractLSMIOOperationCallback ioCallback = (AbstractLSMIOOperationCallback) iInfo.getIndex()
+ .getIOOperationCallback();
//if an index has a pending flush, then the request to flush it will succeed.
if (ioCallback.hasPendingFlush()) {
//remove index to indicate that it will be flushed
@@ -372,34 +378,40 @@
LSMIndexFileProperties fileProperties = new LSMIndexFileProperties();
List<String> filesList;
- Set<String> replicaIds = request.getReplicaIds();
+ Set<Integer> partitionIds = request.getPartitionIds();
Set<String> requesterExistingFiles = request.getExistingFiles();
- Map<String, ClusterPartition[]> nodePartitions =
- ((IPropertiesProvider) appContextProvider.getAppContext()).getMetadataProperties()
- .getNodePartitions();
- for (String replicaId : replicaIds) {
- //get replica partitions
- ClusterPartition[] replicaPatitions = nodePartitions.get(replicaId);
- for (ClusterPartition partition : replicaPatitions) {
- filesList = replicaResourcesManager.getPartitionIndexesFiles(partition.getPartitionId(), false);
- //start sending files
- for (String filePath : filesList) {
- String relativeFilePath = PersistentLocalResourceRepository.getResourceRelativePath(filePath);
- //if the file already exists on the requester, skip it
- if (!requesterExistingFiles.contains(relativeFilePath)) {
- try (RandomAccessFile fromFile = new RandomAccessFile(filePath, "r");
- FileChannel fileChannel = fromFile.getChannel();) {
- long fileSize = fileChannel.size();
- fileProperties.initialize(filePath, fileSize, replicaId, false,
- AbstractLSMIOOperationCallback.INVALID, false);
- outBuffer = ReplicationProtocol.writeFileReplicationRequest(outBuffer, fileProperties,
- ReplicationRequestType.REPLICATE_FILE);
+ Map<Integer, ClusterPartition> clusterPartitions = ((IPropertiesProvider) appContextProvider
+ .getAppContext()).getMetadataProperties().getClusterPartitions();
- //send file info
- NetworkingUtil.transferBufferToChannel(socketChannel, outBuffer);
- //transfer file
- NetworkingUtil.sendFile(fileChannel, socketChannel);
- }
+ final IReplicationStrategy repStrategy = replicationProperties.getReplicationStrategy();
+ // Flush replicated datasets to generate the latest LSM components
+ dsLifecycleManager.flushDataset(repStrategy);
+ for (Integer partitionId : partitionIds) {
+ ClusterPartition partition = clusterPartitions.get(partitionId);
+ filesList = replicaResourcesManager.getPartitionIndexesFiles(partition.getPartitionId(), false);
+ //start sending files
+ for (String filePath : filesList) {
+ // Send only files of datasets that are replciated.
+ IndexFileProperties indexFileRef = localResourceRep.getIndexFileRef(filePath);
+ if (!repStrategy.isMatch(indexFileRef.getDatasetId())) {
+ continue;
+ }
+ String relativeFilePath = StoragePathUtil.getIndexFileRelativePath(filePath);
+ //if the file already exists on the requester, skip it
+ if (!requesterExistingFiles.contains(relativeFilePath)) {
+ try (RandomAccessFile fromFile = new RandomAccessFile(filePath, "r");
+ FileChannel fileChannel = fromFile.getChannel();) {
+ long fileSize = fileChannel.size();
+ fileProperties.initialize(filePath, fileSize, partition.getNodeId(), false,
+ AbstractLSMIOOperationCallback.INVALID, false);
+ outBuffer = ReplicationProtocol.writeFileReplicationRequest(outBuffer, fileProperties,
+ ReplicationRequestType.REPLICATE_FILE);
+
+ //send file info
+ NetworkingUtil.transferBufferToChannel(socketChannel, outBuffer);
+
+ //transfer file
+ NetworkingUtil.sendFile(fileChannel, socketChannel);
}
}
}
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationManager.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationManager.java
index 936f5a0..72cc7d1 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationManager.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationManager.java
@@ -50,18 +50,20 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.logging.Level;
import java.util.logging.Logger;
+import java.util.stream.Collectors;
import org.apache.asterix.common.cluster.ClusterPartition;
-import org.apache.asterix.common.config.ClusterProperties;
import org.apache.asterix.common.config.IPropertiesProvider;
import org.apache.asterix.common.config.ReplicationProperties;
import org.apache.asterix.common.dataflow.LSMIndexUtil;
import org.apache.asterix.common.replication.IReplicaResourcesManager;
import org.apache.asterix.common.replication.IReplicationManager;
+import org.apache.asterix.common.replication.IReplicationStrategy;
import org.apache.asterix.common.replication.Replica;
import org.apache.asterix.common.replication.Replica.ReplicaState;
import org.apache.asterix.common.replication.ReplicaEvent;
import org.apache.asterix.common.replication.ReplicationJob;
+import org.apache.asterix.common.storage.IndexFileProperties;
import org.apache.asterix.common.transactions.IAppRuntimeContextProvider;
import org.apache.asterix.common.transactions.ILogManager;
import org.apache.asterix.common.transactions.ILogRecord;
@@ -101,7 +103,6 @@
private final Map<Integer, Set<String>> jobCommitAcks;
private final Map<Integer, ILogRecord> replicationJobsPendingAcks;
private ByteBuffer dataBuffer;
-
private final LinkedBlockingQueue<IReplicationJob> replicationJobsQ;
private final LinkedBlockingQueue<ReplicaEvent> replicaEventsQ;
@@ -134,6 +135,8 @@
private Future<? extends Object> txnLogReplicatorTask;
private SocketChannel[] logsRepSockets;
private final ByteBuffer txnLogsBatchSizeBuffer = ByteBuffer.allocate(Integer.BYTES);
+ private final IReplicationStrategy replicationStrategy;
+ private final PersistentLocalResourceRepository localResourceRepo;
//TODO this class needs to be refactored by moving its private classes to separate files
//and possibly using MessageBroker to send/receive remote replicas events.
@@ -142,36 +145,38 @@
IAppRuntimeContextProvider asterixAppRuntimeContextProvider) {
this.nodeId = nodeId;
this.replicationProperties = replicationProperties;
+ replicationStrategy = replicationProperties.getReplicationStrategy();
this.replicaResourcesManager = (ReplicaResourcesManager) remoteResoucesManager;
this.asterixAppRuntimeContextProvider = asterixAppRuntimeContextProvider;
- this.hostIPAddressFirstOctet = replicationProperties.getReplicaIPAddress(nodeId).substring(0, 3);
this.logManager = logManager;
+ localResourceRepo = (PersistentLocalResourceRepository) asterixAppRuntimeContextProvider
+ .getLocalResourceRepository();
+ this.hostIPAddressFirstOctet = replicationProperties.getReplicaIPAddress(nodeId).substring(0, 3);
+ replicas = new HashMap<>();
replicationJobsQ = new LinkedBlockingQueue<>();
replicaEventsQ = new LinkedBlockingQueue<>();
terminateJobsReplication = new AtomicBoolean(false);
jobsReplicationSuspended = new AtomicBoolean(true);
replicationSuspended = new AtomicBoolean(true);
- replicas = new HashMap<>();
jobCommitAcks = new ConcurrentHashMap<>();
replicationJobsPendingAcks = new ConcurrentHashMap<>();
shuttingDownReplicaIds = new HashSet<>();
dataBuffer = ByteBuffer.allocate(INITIAL_BUFFER_SIZE);
+ replicationMonitor = new ReplicasEventsMonitor();
+ //add list of replicas from configurations (To be read from another source e.g. Zookeeper)
+ Set<Replica> replicaNodes = replicationProperties.getReplicationStrategy().getRemoteReplicas(nodeId);
//Used as async listeners from replicas
replicationListenerThreads = Executors.newCachedThreadPool();
replicationJobsProcessor = new ReplicationJobsProccessor();
- replicationMonitor = new ReplicasEventsMonitor();
- Map<String, ClusterPartition[]> nodePartitions =
- ((IPropertiesProvider) asterixAppRuntimeContextProvider.getAppContext()).getMetadataProperties()
- .getNodePartitions();
- //add list of replicas from configurations (To be read from another source e.g. Zookeeper)
- Set<Replica> replicaNodes = replicationProperties.getRemoteReplicas(nodeId);
+ Map<String, ClusterPartition[]> nodePartitions = ((IPropertiesProvider) asterixAppRuntimeContextProvider
+ .getAppContext()).getMetadataProperties().getNodePartitions();
replica2PartitionsMap = new HashMap<>(replicaNodes.size());
for (Replica replica : replicaNodes) {
- replicas.put(replica.getNode().getId(), replica);
+ replicas.put(replica.getId(), replica);
//for each remote replica, get the list of replication clients
- Set<String> nodeReplicationClients = replicationProperties.getNodeReplicationClients(replica.getId());
+ Set<String> nodeReplicationClients = replicationProperties.getRemotePrimaryReplicasIds(replica.getId());
//get the partitions of each client
List<Integer> clientPartitions = new ArrayList<>();
for (String clientId : nodeReplicationClients) {
@@ -255,7 +260,6 @@
getAndInitNewPage();
}
}
-
currentTxnLogBuffer.append(logRecord);
}
@@ -277,7 +281,12 @@
//all of the job's files belong to a single storage partition.
//get any of them to determine the partition from the file path.
String jobFile = job.getJobFiles().iterator().next();
- int jobPartitionId = PersistentLocalResourceRepository.getResourcePartition(jobFile);
+ IndexFileProperties indexFileRef = localResourceRepo.getIndexFileRef(jobFile);
+ if (!replicationStrategy.isMatch(indexFileRef.getDatasetId())) {
+ return;
+ }
+
+ int jobPartitionId = indexFileRef.getPartitionId();
ByteBuffer responseBuffer = null;
LSMIndexFileProperties asterixFileProperties = new LSMIndexFileProperties();
@@ -442,7 +451,7 @@
@Override
public boolean isReplicationEnabled() {
- return ClusterProperties.INSTANCE.isReplicationEnabled();
+ return replicationProperties.isParticipant(nodeId);
}
@Override
@@ -822,7 +831,6 @@
}
}
}
-
//presume replicated
return true;
}
@@ -908,18 +916,37 @@
public void stop(boolean dumpState, OutputStream ouputStream) throws IOException {
//stop replication thread afters all jobs/logs have been processed
suspendReplication(false);
- //send shutdown event to remote replicas
- sendShutdownNotifiction();
- //wait until all shutdown events come from all remote replicas
- synchronized (shuttingDownReplicaIds) {
- while (!shuttingDownReplicaIds.containsAll(getActiveReplicasIds())) {
- try {
- shuttingDownReplicaIds.wait(1000);
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
+
+ /**
+ * If this node has any remote replicas, it needs to inform them
+ * that it is shutting down.
+ */
+ if (!replicationStrategy.getRemoteReplicas(nodeId).isEmpty()) {
+ //send shutdown event to remote replicas
+ sendShutdownNotifiction();
+ }
+
+ /**
+ * If this node has any remote primary replicas, then it needs to wait
+ * until all of them send the shutdown notification.
+ */
+ // find active remote primary replicas
+ Set<String> activeRemotePrimaryReplicas = replicationStrategy.getRemotePrimaryReplicas(nodeId).stream()
+ .map(Replica::getId).filter(getActiveReplicasIds()::contains).collect(Collectors.toSet());
+
+ if (!activeRemotePrimaryReplicas.isEmpty()) {
+ //wait until all shutdown events come from all remote primary replicas
+ synchronized (shuttingDownReplicaIds) {
+ while (!shuttingDownReplicaIds.containsAll(activeRemotePrimaryReplicas)) {
+ try {
+ shuttingDownReplicaIds.wait();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
}
}
}
+
LOGGER.log(Level.INFO, "Got shutdown notification from all remote replicas");
//close replication channel
asterixAppRuntimeContextProvider.getAppContext().getReplicationChannel().close();
@@ -996,6 +1023,9 @@
public void requestFlushLaggingReplicaIndexes(long nonSharpCheckpointTargetLSN) throws IOException {
long startLSN = logManager.getAppendLSN();
Set<String> replicaIds = getActiveReplicasIds();
+ if (replicaIds.isEmpty()) {
+ return;
+ }
ByteBuffer requestBuffer = ByteBuffer.allocate(INITIAL_BUFFER_SIZE);
for (String replicaId : replicaIds) {
//1. identify replica indexes with LSN less than nonSharpCheckpointTargetLSN.
@@ -1079,9 +1109,9 @@
//Recovery Method
@Override
- public void requestReplicaFiles(String selectedReplicaId, Set<String> replicasDataToRecover,
+ public void requestReplicaFiles(String selectedReplicaId, Set<Integer> partitionsToRecover,
Set<String> existingFiles) throws IOException {
- ReplicaFilesRequest request = new ReplicaFilesRequest(replicasDataToRecover, existingFiles);
+ ReplicaFilesRequest request = new ReplicaFilesRequest(partitionsToRecover, existingFiles);
dataBuffer = ReplicationProtocol.writeGetReplicaFilesRequest(dataBuffer, request);
try (SocketChannel socketChannel = getReplicaSocket(selectedReplicaId)) {
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/recovery/RemoteRecoveryManager.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/recovery/RemoteRecoveryManager.java
index 0dcdc7b..fcc997f 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/recovery/RemoteRecoveryManager.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/recovery/RemoteRecoveryManager.java
@@ -27,13 +27,14 @@
import java.util.Set;
import java.util.logging.Level;
import java.util.logging.Logger;
+import java.util.stream.Collectors;
import org.apache.asterix.common.api.IAppRuntimeContext;
import org.apache.asterix.common.api.IDatasetLifecycleManager;
import org.apache.asterix.common.cluster.ClusterPartition;
-import org.apache.asterix.common.config.ReplicationProperties;
import org.apache.asterix.common.config.ClusterProperties;
import org.apache.asterix.common.config.IPropertiesProvider;
+import org.apache.asterix.common.config.ReplicationProperties;
import org.apache.asterix.common.exceptions.ACIDException;
import org.apache.asterix.common.replication.IRemoteRecoveryManager;
import org.apache.asterix.common.replication.IReplicationManager;
@@ -41,6 +42,7 @@
import org.apache.asterix.common.transactions.IRecoveryManager;
import org.apache.asterix.replication.storage.ReplicaResourcesManager;
import org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
public class RemoteRecoveryManager implements IRemoteRecoveryManager {
@@ -61,8 +63,7 @@
//1. identify which replicas reside in this node
String localNodeId = runtimeContext.getTransactionSubsystem().getId();
- Set<String> nodes = replicationProperties.getNodeReplicationClients(localNodeId);
-
+ Set<String> nodes = replicationProperties.getNodeReplicasIds(localNodeId);
Map<String, Set<String>> recoveryCandidates = new HashMap<>();
Map<String, Integer> candidatesScore = new HashMap<>();
@@ -124,16 +125,9 @@
}
@Override
- public void takeoverPartitons(Integer[] partitions) throws IOException, ACIDException {
- /**
- * TODO even though the takeover is always expected to succeed,
- * in case of any failure during the takeover, the CC should be
- * notified that the takeover failed.
- */
- Set<Integer> partitionsToTakeover = new HashSet<>(Arrays.asList(partitions));
+ public void replayReplicaPartitionLogs(Set<Integer> partitions, boolean flush) throws HyracksDataException {
ILogManager logManager = runtimeContext.getTransactionSubsystem().getLogManager();
-
- long minLSN = runtimeContext.getReplicaResourcesManager().getPartitionsMinLSN(partitionsToTakeover);
+ long minLSN = runtimeContext.getReplicaResourcesManager().getPartitionsMinLSN(partitions);
long readableSmallestLSN = logManager.getReadableSmallestLSN();
if (minLSN < readableSmallestLSN) {
minLSN = readableSmallestLSN;
@@ -141,7 +135,25 @@
//replay logs > minLSN that belong to these partitions
IRecoveryManager recoveryManager = runtimeContext.getTransactionSubsystem().getRecoveryManager();
- recoveryManager.replayPartitionsLogs(partitionsToTakeover, logManager.getLogReader(true), minLSN);
+ try {
+ recoveryManager.replayPartitionsLogs(partitions, logManager.getLogReader(true), minLSN);
+ if (flush) {
+ runtimeContext.getDatasetLifecycleManager().flushAllDatasets();
+ }
+ } catch (IOException | ACIDException e) {
+ throw new HyracksDataException(e);
+ }
+ }
+
+ @Override
+ public void takeoverPartitons(Integer[] partitions) throws IOException, ACIDException {
+ /**
+ * TODO even though the takeover is always expected to succeed,
+ * in case of any failure during the takeover, the CC should be
+ * notified that the takeover failed.
+ */
+ Set<Integer> partitionsToTakeover = new HashSet<>(Arrays.asList(partitions));
+ replayReplicaPartitionLogs(partitionsToTakeover, false);
//mark these partitions as active in this node
PersistentLocalResourceRepository resourceRepository = (PersistentLocalResourceRepository) runtimeContext
@@ -157,8 +169,9 @@
PersistentLocalResourceRepository resourceRepository = (PersistentLocalResourceRepository) runtimeContext
.getLocalResourceRepository();
IDatasetLifecycleManager datasetLifeCycleManager = runtimeContext.getDatasetLifecycleManager();
+ Map<String, ClusterPartition[]> nodePartitions = ((IPropertiesProvider) runtimeContext).getMetadataProperties()
+ .getNodePartitions();
- failbackRecoveryReplicas = new HashMap<>();
while (true) {
//start recovery steps
try {
@@ -189,10 +202,15 @@
/*** Start Recovery Per Lost Replica ***/
for (Entry<String, Set<String>> remoteReplica : failbackRecoveryReplicas.entrySet()) {
String replicaId = remoteReplica.getKey();
- Set<String> partitionsToRecover = remoteReplica.getValue();
+ Set<String> ncsToRecoverFor = remoteReplica.getValue();
+ Set<Integer> partitionsIds = new HashSet<>();
+ for (String node : ncsToRecoverFor) {
+ partitionsIds.addAll((Arrays.asList(nodePartitions.get(node))).stream()
+ .map(ClusterPartition::getPartitionId).collect(Collectors.toList()));
+ }
//1. Request indexes metadata and LSM components
- replicationManager.requestReplicaFiles(replicaId, partitionsToRecover, new HashSet<String>());
+ replicationManager.requestReplicaFiles(replicaId, partitionsIds, new HashSet<String>());
}
break;
} catch (IOException e) {
@@ -209,8 +227,8 @@
ILogManager logManager = runtimeContext.getTransactionSubsystem().getLogManager();
ReplicaResourcesManager replicaResourcesManager = (ReplicaResourcesManager) runtimeContext
.getReplicaResourcesManager();
- Map<String, ClusterPartition[]> nodePartitions = ((IPropertiesProvider) runtimeContext)
- .getMetadataProperties().getNodePartitions();
+ Map<String, ClusterPartition[]> nodePartitions = ((IPropertiesProvider) runtimeContext).getMetadataProperties()
+ .getNodePartitions();
/**
* for each lost partition, get the remaining files from replicas
@@ -221,17 +239,19 @@
String replicaId = remoteReplica.getKey();
Set<String> NCsDataToRecover = remoteReplica.getValue();
Set<String> existingFiles = new HashSet<>();
+ Set<Integer> partitionsToRecover = new HashSet<>();
for (String nodeId : NCsDataToRecover) {
//get partitions that will be recovered from this node
ClusterPartition[] replicaPartitions = nodePartitions.get(nodeId);
for (ClusterPartition partition : replicaPartitions) {
existingFiles.addAll(
replicaResourcesManager.getPartitionIndexesFiles(partition.getPartitionId(), true));
+ partitionsToRecover.add(partition.getPartitionId());
}
}
//Request remaining indexes files
- replicationManager.requestReplicaFiles(replicaId, NCsDataToRecover, existingFiles);
+ replicationManager.requestReplicaFiles(replicaId, partitionsToRecover, existingFiles);
}
} catch (IOException e) {
/**
@@ -256,4 +276,52 @@
failbackRecoveryReplicas = null;
}
-}
+
+ //TODO refactor common code between remote recovery and failback process
+ @Override
+ public void doRemoteRecoveryPlan(Map<String, Set<Integer>> recoveryPlan) throws HyracksDataException {
+ int maxRecoveryAttempts = replicationProperties.getMaxRemoteRecoveryAttempts();
+ PersistentLocalResourceRepository resourceRepository = (PersistentLocalResourceRepository) runtimeContext
+ .getLocalResourceRepository();
+ IDatasetLifecycleManager datasetLifeCycleManager = runtimeContext.getDatasetLifecycleManager();
+ ILogManager logManager = runtimeContext.getTransactionSubsystem().getLogManager();
+ while (true) {
+ //start recovery steps
+ try {
+ if (maxRecoveryAttempts <= 0) {
+ //to avoid infinite loop in case of unexpected behavior.
+ throw new IllegalStateException("Failed to perform remote recovery.");
+ }
+
+ /*** Prepare for Recovery ***/
+ //1. clean any memory data that could've existed from previous failed recovery attempt
+ datasetLifeCycleManager.closeAllDatasets();
+
+ //2. remove any existing storage data and initialize storage metadata
+ resourceRepository.deleteStorageData(true);
+ resourceRepository.initializeNewUniverse(ClusterProperties.INSTANCE.getStorageDirectoryName());
+
+ /*** Start Recovery Per Lost Replica ***/
+ for (Entry<String, Set<Integer>> remoteReplica : recoveryPlan.entrySet()) {
+ String replicaId = remoteReplica.getKey();
+ Set<Integer> partitionsToRecover = remoteReplica.getValue();
+
+ //Request indexes metadata and LSM components
+ replicationManager.requestReplicaFiles(replicaId, partitionsToRecover, new HashSet<String>());
+ }
+
+ //get max LSN from selected remote replicas
+ long maxRemoteLSN = replicationManager.getMaxRemoteLSN(recoveryPlan.keySet());
+
+ //6. force LogManager to start from a partition > maxLSN in selected remote replicas
+ logManager.renewLogFilesAndStartFromLSN(maxRemoteLSN);
+ break;
+ } catch (IOException e) {
+ if (LOGGER.isLoggable(Level.WARNING)) {
+ LOGGER.log(Level.WARNING, "Failed during remote recovery. Attempting again...", e);
+ }
+ maxRecoveryAttempts--;
+ }
+ }
+ }
+}
\ No newline at end of file
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/storage/ReplicaResourcesManager.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/storage/ReplicaResourcesManager.java
index cce3dc4..6a2ebf6 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/storage/ReplicaResourcesManager.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/storage/ReplicaResourcesManager.java
@@ -38,8 +38,8 @@
import java.util.logging.Logger;
import org.apache.asterix.common.cluster.ClusterPartition;
-import org.apache.asterix.common.config.MetadataProperties;
import org.apache.asterix.common.config.ClusterProperties;
+import org.apache.asterix.common.config.MetadataProperties;
import org.apache.asterix.common.replication.IReplicaResourcesManager;
import org.apache.asterix.common.utils.StoragePathUtil;
import org.apache.asterix.metadata.utils.SplitsAndConstraintsUtil;
@@ -285,7 +285,7 @@
partitionFiles.add(file.getAbsolutePath());
} else {
partitionFiles.add(
- PersistentLocalResourceRepository.getResourceRelativePath(file.getAbsolutePath()));
+ StoragePathUtil.getIndexFileRelativePath(file.getAbsolutePath()));
}
}
}
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/AbstractFailbackPlanMessage.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/AbstractFailbackPlanMessage.java
index 7c386d5..b2855f6 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/AbstractFailbackPlanMessage.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/AbstractFailbackPlanMessage.java
@@ -18,9 +18,9 @@
*/
package org.apache.asterix.runtime.message;
-import org.apache.asterix.common.messaging.api.IApplicationMessage;
+import org.apache.asterix.common.replication.INCLifecycleMessage;
-public abstract class AbstractFailbackPlanMessage implements IApplicationMessage {
+public abstract class AbstractFailbackPlanMessage implements INCLifecycleMessage {
private static final long serialVersionUID = 1L;
protected final long planId;
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/AppContextInfo.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/AppContextInfo.java
index 355f503..6193931 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/AppContextInfo.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/AppContextInfo.java
@@ -18,33 +18,23 @@
*/
package org.apache.asterix.runtime.utils;
-import java.io.IOException;
-import java.util.function.Supplier;
-import java.util.logging.Logger;
-
import org.apache.asterix.common.cluster.IGlobalRecoveryManager;
-import org.apache.asterix.common.config.BuildProperties;
-import org.apache.asterix.common.config.CompilerProperties;
-import org.apache.asterix.common.config.ExtensionProperties;
-import org.apache.asterix.common.config.ExternalProperties;
-import org.apache.asterix.common.config.FeedProperties;
-import org.apache.asterix.common.config.IPropertiesProvider;
-import org.apache.asterix.common.config.MessagingProperties;
-import org.apache.asterix.common.config.MetadataProperties;
-import org.apache.asterix.common.config.PropertiesAccessor;
-import org.apache.asterix.common.config.ReplicationProperties;
-import org.apache.asterix.common.config.StorageProperties;
-import org.apache.asterix.common.config.TransactionProperties;
+import org.apache.asterix.common.config.*;
import org.apache.asterix.common.dataflow.IApplicationContextInfo;
import org.apache.asterix.common.exceptions.AsterixException;
import org.apache.asterix.common.library.ILibraryManager;
import org.apache.asterix.common.metadata.IMetadataBootstrap;
+import org.apache.asterix.common.replication.IFaultToleranceStrategy;
import org.apache.asterix.common.transactions.IResourceIdManager;
import org.apache.hyracks.api.application.ICCApplicationContext;
import org.apache.hyracks.api.client.IHyracksClientConnection;
import org.apache.hyracks.storage.am.common.api.IIndexLifecycleManagerProvider;
import org.apache.hyracks.storage.common.IStorageManager;
+import java.io.IOException;
+import java.util.function.Supplier;
+import java.util.logging.Logger;
+
/*
* Acts as an holder class for IndexRegistryProvider, AsterixStorageManager
* instances that are accessed from the NCs. In addition an instance of ICCApplicationContext
@@ -71,13 +61,15 @@
private IHyracksClientConnection hcc;
private Object extensionManager;
private volatile boolean initialized = false;
+ private IFaultToleranceStrategy ftStrategy;
private AppContextInfo() {
}
public static synchronized void initialize(ICCApplicationContext ccAppCtx, IHyracksClientConnection hcc,
ILibraryManager libraryManager, IResourceIdManager resourceIdManager,
- Supplier<IMetadataBootstrap> metadataBootstrapSupplier, IGlobalRecoveryManager globalRecoveryManager)
+ Supplier<IMetadataBootstrap> metadataBootstrapSupplier, IGlobalRecoveryManager globalRecoveryManager,
+ IFaultToleranceStrategy ftStrategy)
throws AsterixException, IOException {
if (INSTANCE.initialized) {
throw new AsterixException(AppContextInfo.class.getSimpleName() + " has been initialized already");
@@ -98,6 +90,7 @@
INSTANCE.feedProperties = new FeedProperties(propertiesAccessor);
INSTANCE.extensionProperties = new ExtensionProperties(propertiesAccessor);
INSTANCE.replicationProperties = new ReplicationProperties(propertiesAccessor);
+ INSTANCE.ftStrategy = ftStrategy;
INSTANCE.hcc = hcc;
INSTANCE.buildProperties = new BuildProperties(propertiesAccessor);
INSTANCE.messagingProperties = new MessagingProperties(propertiesAccessor);
@@ -205,4 +198,8 @@
public IMetadataBootstrap getMetadataBootstrap() {
return metadataBootstrapSupplier.get();
}
+
+ public IFaultToleranceStrategy getFaultToleranceStrategy() {
+ return ftStrategy;
+ }
}
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java
index 8fffdb1..d975a98 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java
@@ -19,50 +19,36 @@
package org.apache.asterix.runtime.utils;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
-import java.util.Iterator;
-import java.util.LinkedList;
import java.util.List;
import java.util.Map;
-import java.util.Map.Entry;
import java.util.Set;
import java.util.SortedMap;
import java.util.logging.Level;
import java.util.logging.Logger;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.node.ArrayNode;
import org.apache.asterix.common.api.IClusterManagementWork.ClusterState;
import org.apache.asterix.common.cluster.ClusterPartition;
-import org.apache.asterix.common.config.ReplicationProperties;
+import org.apache.asterix.common.cluster.IClusterStateManager;
import org.apache.asterix.common.config.ClusterProperties;
-import org.apache.asterix.common.messaging.api.ICCMessageBroker;
+import org.apache.asterix.common.replication.IFaultToleranceStrategy;
import org.apache.asterix.event.schema.cluster.Cluster;
import org.apache.asterix.event.schema.cluster.Node;
-import org.apache.asterix.runtime.message.CompleteFailbackRequestMessage;
-import org.apache.asterix.runtime.message.CompleteFailbackResponseMessage;
-import org.apache.asterix.runtime.message.NodeFailbackPlan;
-import org.apache.asterix.runtime.message.NodeFailbackPlan.FailbackPlanState;
-import org.apache.asterix.runtime.message.PreparePartitionsFailbackRequestMessage;
-import org.apache.asterix.runtime.message.PreparePartitionsFailbackResponseMessage;
-import org.apache.asterix.runtime.message.ReplicaEventMessage;
-import org.apache.asterix.runtime.message.TakeoverMetadataNodeRequestMessage;
-import org.apache.asterix.runtime.message.TakeoverMetadataNodeResponseMessage;
-import org.apache.asterix.runtime.message.TakeoverPartitionsRequestMessage;
-import org.apache.asterix.runtime.message.TakeoverPartitionsResponseMessage;
import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
-import org.apache.hyracks.api.application.IClusterLifecycleListener.ClusterEventType;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.exceptions.HyracksException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
/**
* A holder class for properties related to the Asterix cluster.
*/
-public class ClusterStateManager {
+public class ClusterStateManager implements IClusterStateManager {
/*
* TODO: currently after instance restarts we require all nodes to join again,
* otherwise the cluster wont be ACTIVE. we may overcome this by storing the cluster state before the instance
@@ -71,9 +57,8 @@
private static final Logger LOGGER = Logger.getLogger(ClusterStateManager.class.getName());
public static final ClusterStateManager INSTANCE = new ClusterStateManager();
- private static final String CLUSTER_NET_IP_ADDRESS_KEY = "cluster-net-ip-address";
private static final String IO_DEVICES = "iodevices";
- private Map<String, Map<String, String>> activeNcConfiguration = new HashMap<>();
+ private final Map<String, Map<String, String>> activeNcConfiguration = new HashMap<>();
private final Cluster cluster;
private ClusterState state = ClusterState.UNUSABLE;
@@ -84,32 +69,21 @@
private Map<String, ClusterPartition[]> node2PartitionsMap = null;
private SortedMap<Integer, ClusterPartition> clusterPartitions = null;
- private Map<Long, TakeoverPartitionsRequestMessage> pendingTakeoverRequests = null;
- private long clusterRequestId = 0;
private String currentMetadataNode = null;
private boolean metadataNodeActive = false;
- private boolean autoFailover = false;
- private boolean replicationEnabled = false;
private Set<String> failedNodes = new HashSet<>();
- private LinkedList<NodeFailbackPlan> pendingProcessingFailbackPlans;
- private Map<Long, NodeFailbackPlan> planId2FailbackPlanMap;
+ private IFaultToleranceStrategy ftStrategy;
private ClusterStateManager() {
cluster = ClusterProperties.INSTANCE.getCluster();
// if this is the CC process
- if (AppContextInfo.INSTANCE.initialized()
- && AppContextInfo.INSTANCE.getCCApplicationContext() != null) {
+ if (AppContextInfo.INSTANCE.initialized() && AppContextInfo.INSTANCE.getCCApplicationContext() != null) {
node2PartitionsMap = AppContextInfo.INSTANCE.getMetadataProperties().getNodePartitions();
clusterPartitions = AppContextInfo.INSTANCE.getMetadataProperties().getClusterPartitions();
currentMetadataNode = AppContextInfo.INSTANCE.getMetadataProperties().getMetadataNodeName();
- replicationEnabled = ClusterProperties.INSTANCE.isReplicationEnabled();
- autoFailover = ClusterProperties.INSTANCE.isAutoFailoverEnabled();
- if (autoFailover) {
- pendingTakeoverRequests = new HashMap<>();
- pendingProcessingFailbackPlans = new LinkedList<>();
- planId2FailbackPlanMap = new HashMap<>();
- }
+ ftStrategy = AppContextInfo.INSTANCE.getFaultToleranceStrategy();
+ ftStrategy.bindTo(this);
}
}
@@ -117,30 +91,8 @@
if (LOGGER.isLoggable(Level.INFO)) {
LOGGER.info("Removing configuration parameters for node id " + nodeId);
}
- activeNcConfiguration.remove(nodeId);
-
- //if this node was waiting for failback and failed before it completed
- if (failedNodes.contains(nodeId)) {
- if (autoFailover) {
- notifyFailbackPlansNodeFailure(nodeId);
- revertFailedFailbackPlanEffects();
- }
- } else {
- //an active node failed
- failedNodes.add(nodeId);
- if (nodeId.equals(currentMetadataNode)) {
- metadataNodeActive = false;
- LOGGER.info("Metadata node is now inactive");
- }
- updateNodePartitions(nodeId, false);
- if (replicationEnabled) {
- notifyImpactedReplicas(nodeId, ClusterEventType.NODE_FAILURE);
- if (autoFailover) {
- notifyFailbackPlansNodeFailure(nodeId);
- requestPartitionsTakeover(nodeId);
- }
- }
- }
+ failedNodes.add(nodeId);
+ ftStrategy.notifyNodeFailure(nodeId);
}
public synchronized void addNCConfiguration(String nodeId, Map<String, String> configuration)
@@ -149,46 +101,51 @@
LOGGER.info("Registering configuration parameters for node id " + nodeId);
}
activeNcConfiguration.put(nodeId, configuration);
-
- //a node trying to come back after failure
- if (failedNodes.contains(nodeId)) {
- if (autoFailover) {
- prepareFailbackPlan(nodeId);
- return;
- } else {
- //a node completed local or remote recovery and rejoined
- failedNodes.remove(nodeId);
- if (replicationEnabled) {
- //notify other replica to reconnect to this node
- notifyImpactedReplicas(nodeId, ClusterEventType.NODE_JOIN);
- }
- }
- }
-
- if (nodeId.equals(currentMetadataNode)) {
- metadataNodeActive = true;
- LOGGER.info("Metadata node is now active");
- }
- updateNodePartitions(nodeId, true);
+ failedNodes.remove(nodeId);
+ ftStrategy.notifyNodeJoin(nodeId);
}
- private synchronized void updateNodePartitions(String nodeId, boolean added) throws HyracksDataException {
+ @Override
+ public synchronized void setState(ClusterState state) {
+ this.state = state;
+ LOGGER.info("Cluster State is now " + state.name());
+ }
+
+ @Override
+ public void updateMetadataNode(String nodeId, boolean active) {
+ currentMetadataNode = nodeId;
+ metadataNodeActive = active;
+ if (active) {
+ LOGGER.info(String.format("Metadata node %s is now active", currentMetadataNode));
+ }
+ }
+
+ @Override
+ public synchronized void updateNodePartitions(String nodeId, boolean active) throws HyracksDataException {
ClusterPartition[] nodePartitions = node2PartitionsMap.get(nodeId);
// if this isn't a storage node, it will not have cluster partitions
if (nodePartitions != null) {
for (ClusterPartition p : nodePartitions) {
- // set the active node for this node's partitions
- p.setActive(added);
- if (added) {
- p.setActiveNodeId(nodeId);
- }
+ updateClusterPartition(p.getPartitionId(), nodeId, active);
}
- resetClusterPartitionConstraint();
- updateClusterState();
}
}
- private synchronized void updateClusterState() throws HyracksDataException {
+ @Override
+ public synchronized void updateClusterPartition(Integer partitionNum, String activeNode, boolean active) {
+ ClusterPartition clusterPartition = clusterPartitions.get(partitionNum);
+ if (clusterPartition != null) {
+ // set the active node for this node's partitions
+ clusterPartition.setActive(active);
+ if (active) {
+ clusterPartition.setActiveNodeId(activeNode);
+ }
+ }
+ }
+
+ @Override
+ public synchronized void refreshState() throws HyracksDataException {
+ resetClusterPartitionConstraint();
for (ClusterPartition p : clusterPartitions.values()) {
if (!p.isActive()) {
state = ClusterState.UNUSABLE;
@@ -196,20 +153,19 @@
return;
}
}
+
+ state = ClusterState.PENDING;
+ LOGGER.info("Cluster is now " + state);
+
// if all storage partitions are active as well as the metadata node, then the cluster is active
if (metadataNodeActive) {
- state = ClusterState.PENDING;
- LOGGER.info("Cluster is now " + state);
AppContextInfo.INSTANCE.getMetadataBootstrap().init();
state = ClusterState.ACTIVE;
LOGGER.info("Cluster is now " + state);
+ // Notify any waiting threads for the cluster to be active.
+ notifyAll();
// start global recovery
AppContextInfo.INSTANCE.getGlobalRecoveryManager().startGlobalRecovery();
- if (autoFailover && !pendingProcessingFailbackPlans.isEmpty()) {
- processPendingFailbackPlans();
- }
- } else {
- requestMetadataNodeTakeover();
}
}
@@ -232,6 +188,7 @@
return ncConfig.get(IO_DEVICES).split(",");
}
+ @Override
public ClusterState getState() {
return state;
}
@@ -287,6 +244,7 @@
return AppContextInfo.INSTANCE.getMetadataProperties().getNodeNames().size();
}
+ @Override
public synchronized ClusterPartition[] getNodePartitions(String nodeId) {
return node2PartitionsMap.get(nodeId);
}
@@ -298,6 +256,7 @@
return 0;
}
+ @Override
public synchronized ClusterPartition[] getClusterPartitons() {
ArrayList<ClusterPartition> partitons = new ArrayList<>();
for (ClusterPartition partition : clusterPartitions.values()) {
@@ -306,331 +265,6 @@
return partitons.toArray(new ClusterPartition[] {});
}
- private synchronized void requestPartitionsTakeover(String failedNodeId) {
- //replica -> list of partitions to takeover
- Map<String, List<Integer>> partitionRecoveryPlan = new HashMap<>();
- ReplicationProperties replicationProperties = AppContextInfo.INSTANCE.getReplicationProperties();
-
- //collect the partitions of the failed NC
- List<ClusterPartition> lostPartitions = getNodeAssignedPartitions(failedNodeId);
- if (!lostPartitions.isEmpty()) {
- for (ClusterPartition partition : lostPartitions) {
- //find replicas for this partitions
- Set<String> partitionReplicas = replicationProperties.getNodeReplicasIds(partition.getNodeId());
- //find a replica that is still active
- for (String replica : partitionReplicas) {
- //TODO (mhubail) currently this assigns the partition to the first found active replica.
- //It needs to be modified to consider load balancing.
- if (addActiveReplica(replica, partition, partitionRecoveryPlan)) {
- break;
- }
- }
- }
-
- if (partitionRecoveryPlan.size() == 0) {
- //no active replicas were found for the failed node
- LOGGER.severe("Could not find active replicas for the partitions " + lostPartitions);
- return;
- } else {
- LOGGER.info("Partitions to recover: " + lostPartitions);
- }
- ICCMessageBroker messageBroker = (ICCMessageBroker) AppContextInfo.INSTANCE.getCCApplicationContext()
- .getMessageBroker();
- //For each replica, send a request to takeover the assigned partitions
- for (Entry<String, List<Integer>> entry : partitionRecoveryPlan.entrySet()) {
- String replica = entry.getKey();
- Integer[] partitionsToTakeover = entry.getValue().toArray(new Integer[entry.getValue().size()]);
- long requestId = clusterRequestId++;
- TakeoverPartitionsRequestMessage takeoverRequest = new TakeoverPartitionsRequestMessage(requestId,
- replica, partitionsToTakeover);
- pendingTakeoverRequests.put(requestId, takeoverRequest);
- try {
- messageBroker.sendApplicationMessageToNC(takeoverRequest, replica);
- } catch (Exception e) {
- /**
- * if we fail to send the request, it means the NC we tried to send the request to
- * has failed. When the failure notification arrives, we will send any pending request
- * that belongs to the failed NC to a different active replica.
- */
- LOGGER.log(Level.WARNING, "Failed to send takeover request: " + takeoverRequest, e);
- }
- }
- }
- }
-
- private boolean addActiveReplica(String replica, ClusterPartition partition,
- Map<String, List<Integer>> partitionRecoveryPlan) {
- if (activeNcConfiguration.containsKey(replica) && !failedNodes.contains(replica)) {
- if (!partitionRecoveryPlan.containsKey(replica)) {
- List<Integer> replicaPartitions = new ArrayList<>();
- replicaPartitions.add(partition.getPartitionId());
- partitionRecoveryPlan.put(replica, replicaPartitions);
- } else {
- partitionRecoveryPlan.get(replica).add(partition.getPartitionId());
- }
- return true;
- }
- return false;
- }
-
- private synchronized List<ClusterPartition> getNodeAssignedPartitions(String nodeId) {
- List<ClusterPartition> nodePartitions = new ArrayList<>();
- for (ClusterPartition partition : clusterPartitions.values()) {
- if (partition.getActiveNodeId().equals(nodeId)) {
- nodePartitions.add(partition);
- }
- }
- /**
- * if there is any pending takeover request that this node was supposed to handle,
- * it needs to be sent to a different replica
- */
- List<Long> failedTakeoverRequests = new ArrayList<>();
- for (TakeoverPartitionsRequestMessage request : pendingTakeoverRequests.values()) {
- if (request.getNodeId().equals(nodeId)) {
- for (Integer partitionId : request.getPartitions()) {
- nodePartitions.add(clusterPartitions.get(partitionId));
- }
- failedTakeoverRequests.add(request.getRequestId());
- }
- }
-
- //remove failed requests
- for (Long requestId : failedTakeoverRequests) {
- pendingTakeoverRequests.remove(requestId);
- }
- return nodePartitions;
- }
-
- private synchronized void requestMetadataNodeTakeover() {
- //need a new node to takeover metadata node
- ClusterPartition metadataPartiton = AppContextInfo.INSTANCE.getMetadataProperties()
- .getMetadataPartition();
- //request the metadataPartition node to register itself as the metadata node
- TakeoverMetadataNodeRequestMessage takeoverRequest = new TakeoverMetadataNodeRequestMessage();
- ICCMessageBroker messageBroker = (ICCMessageBroker) AppContextInfo.INSTANCE.getCCApplicationContext()
- .getMessageBroker();
- try {
- messageBroker.sendApplicationMessageToNC(takeoverRequest, metadataPartiton.getActiveNodeId());
- } catch (Exception e) {
- /**
- * if we fail to send the request, it means the NC we tried to send the request to
- * has failed. When the failure notification arrives, a new NC will be assigned to
- * the metadata partition and a new metadata node takeover request will be sent to it.
- */
- LOGGER.log(Level.WARNING,
- "Failed to send metadata node takeover request to: " + metadataPartiton.getActiveNodeId(), e);
- }
- }
-
- public synchronized void processPartitionTakeoverResponse(TakeoverPartitionsResponseMessage response)
- throws HyracksDataException {
- for (Integer partitonId : response.getPartitions()) {
- ClusterPartition partition = clusterPartitions.get(partitonId);
- partition.setActive(true);
- partition.setActiveNodeId(response.getNodeId());
- }
- pendingTakeoverRequests.remove(response.getRequestId());
- resetClusterPartitionConstraint();
- updateClusterState();
- }
-
- public synchronized void processMetadataNodeTakeoverResponse(TakeoverMetadataNodeResponseMessage response)
- throws HyracksDataException {
- currentMetadataNode = response.getNodeId();
- metadataNodeActive = true;
- LOGGER.info("Current metadata node: " + currentMetadataNode);
- updateClusterState();
- }
-
- private synchronized void prepareFailbackPlan(String failingBackNodeId) {
- NodeFailbackPlan plan = NodeFailbackPlan.createPlan(failingBackNodeId);
- pendingProcessingFailbackPlans.add(plan);
- planId2FailbackPlanMap.put(plan.getPlanId(), plan);
-
- //get all partitions this node requires to resync
- ReplicationProperties replicationProperties = AppContextInfo.INSTANCE.getReplicationProperties();
- Set<String> nodeReplicas = replicationProperties.getNodeReplicationClients(failingBackNodeId);
- for (String replicaId : nodeReplicas) {
- ClusterPartition[] nodePartitions = node2PartitionsMap.get(replicaId);
- for (ClusterPartition partition : nodePartitions) {
- plan.addParticipant(partition.getActiveNodeId());
- /**
- * if the partition original node is the returning node,
- * add it to the list of the partitions which will be failed back
- */
- if (partition.getNodeId().equals(failingBackNodeId)) {
- plan.addPartitionToFailback(partition.getPartitionId(), partition.getActiveNodeId());
- }
- }
- }
-
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("Prepared Failback plan: " + plan.toString());
- }
-
- processPendingFailbackPlans();
- }
-
- private synchronized void processPendingFailbackPlans() {
- /**
- * if the cluster state is not ACTIVE, then failbacks should not be processed
- * since some partitions are not active
- */
- if (state == ClusterState.ACTIVE) {
- while (!pendingProcessingFailbackPlans.isEmpty()) {
- //take the first pending failback plan
- NodeFailbackPlan plan = pendingProcessingFailbackPlans.pop();
- /**
- * A plan at this stage will be in one of two states:
- * 1. PREPARING -> the participants were selected but we haven't sent any request.
- * 2. PENDING_ROLLBACK -> a participant failed before we send any requests
- */
- if (plan.getState() == FailbackPlanState.PREPARING) {
- //set the partitions that will be failed back as inactive
- String failbackNode = plan.getNodeId();
- for (Integer partitionId : plan.getPartitionsToFailback()) {
- ClusterPartition clusterPartition = clusterPartitions.get(partitionId);
- clusterPartition.setActive(false);
- //partition expected to be returned to the failing back node
- clusterPartition.setActiveNodeId(failbackNode);
- }
-
- /**
- * if the returning node is the original metadata node,
- * then metadata node will change after the failback completes
- */
- String originalMetadataNode = AppContextInfo.INSTANCE.getMetadataProperties()
- .getMetadataNodeName();
- if (originalMetadataNode.equals(failbackNode)) {
- plan.setNodeToReleaseMetadataManager(currentMetadataNode);
- currentMetadataNode = "";
- metadataNodeActive = false;
- }
-
- //force new jobs to wait
- state = ClusterState.REBALANCING;
- ICCMessageBroker messageBroker = (ICCMessageBroker) AppContextInfo.INSTANCE
- .getCCApplicationContext().getMessageBroker();
- handleFailbackRequests(plan, messageBroker);
- /**
- * wait until the current plan is completed before processing the next plan.
- * when the current one completes or is reverted, the cluster state will be
- * ACTIVE again, and the next failback plan (if any) will be processed.
- */
- break;
- } else if (plan.getState() == FailbackPlanState.PENDING_ROLLBACK) {
- //this plan failed before sending any requests -> nothing to rollback
- planId2FailbackPlanMap.remove(plan.getPlanId());
- }
- }
- }
- }
-
- private void handleFailbackRequests(NodeFailbackPlan plan, ICCMessageBroker messageBroker) {
- //send requests to other nodes to complete on-going jobs and prepare partitions for failback
- for (PreparePartitionsFailbackRequestMessage request : plan.getPlanFailbackRequests()) {
- try {
- messageBroker.sendApplicationMessageToNC(request, request.getNodeID());
- plan.addPendingRequest(request);
- } catch (Exception e) {
- LOGGER.log(Level.WARNING, "Failed to send failback request to: " + request.getNodeID(), e);
- plan.notifyNodeFailure(request.getNodeID());
- revertFailedFailbackPlanEffects();
- break;
- }
- }
- }
-
- public synchronized void processPreparePartitionsFailbackResponse(PreparePartitionsFailbackResponseMessage msg) {
- NodeFailbackPlan plan = planId2FailbackPlanMap.get(msg.getPlanId());
- plan.markRequestCompleted(msg.getRequestId());
- /**
- * A plan at this stage will be in one of three states:
- * 1. PENDING_PARTICIPANT_REPONSE -> one or more responses are still expected (wait).
- * 2. PENDING_COMPLETION -> all responses received (time to send completion request).
- * 3. PENDING_ROLLBACK -> the plan failed and we just received the final pending response (revert).
- */
- if (plan.getState() == FailbackPlanState.PENDING_COMPLETION) {
- CompleteFailbackRequestMessage request = plan.getCompleteFailbackRequestMessage();
-
- //send complete resync and takeover partitions to the failing back node
- ICCMessageBroker messageBroker = (ICCMessageBroker) AppContextInfo.INSTANCE.getCCApplicationContext()
- .getMessageBroker();
- try {
- messageBroker.sendApplicationMessageToNC(request, request.getNodeId());
- } catch (Exception e) {
- LOGGER.log(Level.WARNING, "Failed to send complete failback request to: " + request.getNodeId(), e);
- notifyFailbackPlansNodeFailure(request.getNodeId());
- revertFailedFailbackPlanEffects();
- }
- } else if (plan.getState() == FailbackPlanState.PENDING_ROLLBACK) {
- revertFailedFailbackPlanEffects();
- }
- }
-
- public synchronized void processCompleteFailbackResponse(CompleteFailbackResponseMessage response)
- throws HyracksDataException {
- /**
- * the failback plan completed successfully:
- * Remove all references to it.
- * Remove the the failing back node from the failed nodes list.
- * Notify its replicas to reconnect to it.
- * Set the failing back node partitions as active.
- */
- NodeFailbackPlan plan = planId2FailbackPlanMap.remove(response.getPlanId());
- String nodeId = plan.getNodeId();
- failedNodes.remove(nodeId);
- //notify impacted replicas they can reconnect to this node
- notifyImpactedReplicas(nodeId, ClusterEventType.NODE_JOIN);
- updateNodePartitions(nodeId, true);
- }
-
- private synchronized void notifyImpactedReplicas(String nodeId, ClusterEventType event) {
- ReplicationProperties replicationProperties = AppContextInfo.INSTANCE.getReplicationProperties();
- Set<String> remoteReplicas = replicationProperties.getRemoteReplicasIds(nodeId);
- String nodeIdAddress = "";
- //in case the node joined with a new IP address, we need to send it to the other replicas
- if (event == ClusterEventType.NODE_JOIN) {
- nodeIdAddress = activeNcConfiguration.get(nodeId).get(CLUSTER_NET_IP_ADDRESS_KEY);
- }
-
- ReplicaEventMessage msg = new ReplicaEventMessage(nodeId, nodeIdAddress, event);
- ICCMessageBroker messageBroker = (ICCMessageBroker) AppContextInfo.INSTANCE.getCCApplicationContext()
- .getMessageBroker();
- for (String replica : remoteReplicas) {
- //if the remote replica is alive, send the event
- if (activeNcConfiguration.containsKey(replica)) {
- try {
- messageBroker.sendApplicationMessageToNC(msg, replica);
- } catch (Exception e) {
- LOGGER.log(Level.WARNING, "Failed sending an application message to an NC", e);
- }
- }
- }
- }
-
- private synchronized void revertFailedFailbackPlanEffects() {
- Iterator<NodeFailbackPlan> iterator = planId2FailbackPlanMap.values().iterator();
- while (iterator.hasNext()) {
- NodeFailbackPlan plan = iterator.next();
- if (plan.getState() == FailbackPlanState.PENDING_ROLLBACK) {
- //TODO if the failing back node is still active, notify it to construct a new plan for it
- iterator.remove();
-
- //reassign the partitions that were supposed to be failed back to an active replica
- requestPartitionsTakeover(plan.getNodeId());
- }
- }
- }
-
- private synchronized void notifyFailbackPlansNodeFailure(String nodeId) {
- Iterator<NodeFailbackPlan> iterator = planId2FailbackPlanMap.values().iterator();
- while (iterator.hasNext()) {
- NodeFailbackPlan plan = iterator.next();
- plan.notifyNodeFailure(nodeId);
- }
- }
-
public synchronized boolean isMetadataNodeActive() {
return metadataNodeActive;
}
@@ -676,4 +310,14 @@
stateDescription.putPOJO("partitions", clusterPartitions);
return stateDescription;
}
+
+ @Override
+ public Map<String, Map<String, String>> getActiveNcConfiguration() {
+ return Collections.unmodifiableMap(activeNcConfiguration);
+ }
+
+ @Override
+ public String getCurrentMetadataNodeId() {
+ return currentMetadataNode;
+ }
}
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
index f7023b3..b21eb29 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
@@ -45,6 +45,8 @@
import org.apache.asterix.common.exceptions.AsterixException;
import org.apache.asterix.common.replication.IReplicationManager;
import org.apache.asterix.common.replication.ReplicationJob;
+import org.apache.asterix.common.storage.IndexFileProperties;
+import org.apache.asterix.common.transactions.Resource;
import org.apache.asterix.common.utils.StorageConstants;
import org.apache.asterix.common.utils.StoragePathUtil;
import org.apache.commons.io.FileUtils;
@@ -483,20 +485,27 @@
nodeActivePartitions.remove(partitonId);
}
- /**
- * @param resourceAbsolutePath
- * @return the resource relative path starting from the partition directory
- */
- public static String getResourceRelativePath(String resourceAbsolutePath) {
- String[] tokens = resourceAbsolutePath.split(File.separator);
- //partition/dataverse/idx/fileName
- return tokens[tokens.length - 4] + File.separator + tokens[tokens.length - 3] + File.separator
- + tokens[tokens.length - 2] + File.separator + tokens[tokens.length - 1];
+ private static String getLocalResourceRelativePath(String absolutePath) {
+ final String[] tokens = absolutePath.split(File.separator);
+ // Format: storage_dir/partition/dataverse/idx
+ return tokens[tokens.length - 5] + File.separator + tokens[tokens.length - 4] + File.separator
+ + tokens[tokens.length - 3] + File.separator + tokens[tokens.length - 2];
}
- public static int getResourcePartition(String resourceAbsolutePath) {
- String[] tokens = resourceAbsolutePath.split(File.separator);
- //partition/dataverse/idx/fileName
- return StoragePathUtil.getPartitionNumFromName(tokens[tokens.length - 4]);
+ public IndexFileProperties getIndexFileRef(String absoluteFilePath) throws HyracksDataException {
+ //TODO pass relative path
+ final String[] tokens = absoluteFilePath.split(File.separator);
+ if (tokens.length < 5) {
+ throw new HyracksDataException("Invalid file format");
+ }
+ String fileName = tokens[tokens.length - 1];
+ String index = tokens[tokens.length - 2];
+ String dataverse = tokens[tokens.length - 3];
+ String partition = tokens[tokens.length - 4];
+ int partitionId = StoragePathUtil.getPartitionNumFromName(partition);
+ String relativePath = getLocalResourceRelativePath(absoluteFilePath);
+ final LocalResource lr = get(relativePath);
+ int datasetId = lr == null ? -1 : ((Resource) lr.getResource()).datasetId();
+ return new IndexFileProperties(partitionId, dataverse, index, fileName, datasetId);
}
}
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManagerWithReplication.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManagerWithReplication.java
index a248f77..d885f00 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManagerWithReplication.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManagerWithReplication.java
@@ -18,8 +18,12 @@
*/
package org.apache.asterix.transaction.management.service.logging;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
import org.apache.asterix.common.exceptions.ACIDException;
import org.apache.asterix.common.replication.IReplicationManager;
+import org.apache.asterix.common.replication.IReplicationStrategy;
import org.apache.asterix.common.transactions.ILogRecord;
import org.apache.asterix.common.transactions.ITransactionContext;
import org.apache.asterix.common.transactions.ITransactionManager;
@@ -30,15 +34,37 @@
public class LogManagerWithReplication extends LogManager {
private IReplicationManager replicationManager;
+ private final IReplicationStrategy replicationStrategy;
+ private final Set<Integer> replicatedJob = ConcurrentHashMap.newKeySet();
- public LogManagerWithReplication(ITransactionSubsystem txnSubsystem) {
+ public LogManagerWithReplication(ITransactionSubsystem txnSubsystem, IReplicationStrategy replicationStrategy) {
super(txnSubsystem);
+ this.replicationStrategy = replicationStrategy;
}
@Override
public void log(ILogRecord logRecord) throws ACIDException {
- //only locally generated logs should be replicated
- logRecord.setReplicated(logRecord.getLogSource() == LogSource.LOCAL && logRecord.getLogType() != LogType.WAIT);
+ boolean shouldReplicate = logRecord.getLogSource() == LogSource.LOCAL && logRecord.getLogType() != LogType.WAIT;
+ if (shouldReplicate) {
+ switch (logRecord.getLogType()) {
+ case LogType.ENTITY_COMMIT:
+ case LogType.UPSERT_ENTITY_COMMIT:
+ case LogType.UPDATE:
+ case LogType.FLUSH:
+ shouldReplicate = replicationStrategy.isMatch(logRecord.getDatasetId());
+ if (shouldReplicate && !replicatedJob.contains(logRecord.getJobId())) {
+ replicatedJob.add(logRecord.getJobId());
+ }
+ break;
+ case LogType.JOB_COMMIT:
+ case LogType.ABORT:
+ shouldReplicate = replicatedJob.remove(logRecord.getJobId());
+ break;
+ default:
+ shouldReplicate = false;
+ }
+ }
+ logRecord.setReplicated(shouldReplicate);
//Remote flush logs do not need to be flushed separately since they may not trigger local flush
if (logRecord.getLogType() == LogType.FLUSH && logRecord.getLogSource() == LogSource.LOCAL) {
@@ -74,7 +100,8 @@
}
//wait for job Commit/Abort ACK from replicas
- if (logRecord.getLogType() == LogType.JOB_COMMIT || logRecord.getLogType() == LogType.ABORT) {
+ if (logRecord.isReplicated() && (logRecord.getLogType() == LogType.JOB_COMMIT
+ || logRecord.getLogType() == LogType.ABORT)) {
while (!replicationManager.hasBeenReplicated(logRecord)) {
try {
logRecord.wait();
diff --git a/asterixdb/asterix-yarn/src/test/java/org/apache/asterix/aoya/test/AsterixYARNInstanceUtil.java b/asterixdb/asterix-yarn/src/test/java/org/apache/asterix/aoya/test/AsterixYARNInstanceUtil.java
index 40df685..d606f79 100644
--- a/asterixdb/asterix-yarn/src/test/java/org/apache/asterix/aoya/test/AsterixYARNInstanceUtil.java
+++ b/asterixdb/asterix-yarn/src/test/java/org/apache/asterix/aoya/test/AsterixYARNInstanceUtil.java
@@ -26,6 +26,7 @@
import org.apache.asterix.aoya.Utils;
import org.apache.asterix.event.schema.yarnCluster.Cluster;
import org.apache.asterix.event.schema.yarnCluster.Node;
+import org.apache.commons.io.FileUtils;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
@@ -37,6 +38,7 @@
public class AsterixYARNInstanceUtil {
private static final String PATH_ACTUAL = "ittest/";
private static final String INSTANCE_NAME = "asterix-integration-test";
+ private static final String TXN_LOG_PATH = "/tmp/asterix-yarn";
private MiniYARNCluster miniCluster;
private YarnConfiguration appConf;
public String aoyaHome;
@@ -120,4 +122,11 @@
outdir.delete();
}
}
+
+ public static void cleanUp() {
+ File txnLogFile = new File(TXN_LOG_PATH);
+ if (txnLogFile.exists()) {
+ FileUtils.deleteQuietly(txnLogFile);
+ }
+ }
}
diff --git a/asterixdb/asterix-yarn/src/test/java/org/apache/asterix/aoya/test/AsterixYARNLibraryTestIT.java b/asterixdb/asterix-yarn/src/test/java/org/apache/asterix/aoya/test/AsterixYARNLibraryTestIT.java
index 21c1e18..157cb12 100644
--- a/asterixdb/asterix-yarn/src/test/java/org/apache/asterix/aoya/test/AsterixYARNLibraryTestIT.java
+++ b/asterixdb/asterix-yarn/src/test/java/org/apache/asterix/aoya/test/AsterixYARNLibraryTestIT.java
@@ -48,6 +48,7 @@
@BeforeClass
public static void setUp() throws Exception {
+ AsterixYARNInstanceUtil.cleanUp();
instance = new AsterixYARNInstanceUtil();
appConf = instance.setUp();
configPath = instance.configPath;
diff --git a/asterixdb/asterix-yarn/src/test/java/org/apache/asterix/aoya/test/AsterixYARNLifecycleIT.java b/asterixdb/asterix-yarn/src/test/java/org/apache/asterix/aoya/test/AsterixYARNLifecycleIT.java
index 4e7ea8e..60afe91 100644
--- a/asterixdb/asterix-yarn/src/test/java/org/apache/asterix/aoya/test/AsterixYARNLifecycleIT.java
+++ b/asterixdb/asterix-yarn/src/test/java/org/apache/asterix/aoya/test/AsterixYARNLifecycleIT.java
@@ -47,6 +47,7 @@
@BeforeClass
public static void setUp() throws Exception {
+ AsterixYARNInstanceUtil.cleanUp();
instance = new AsterixYARNInstanceUtil();
appConf = instance.setUp();
configPath = instance.configPath;