Asterix NCs Fault Tolerance
This change includes the following:
- Adapt replication to unique partitions storage.
- Implement auto failover for failing NCs.
- Implement auto failover for metadata node.
- Fix for ASTERIXDB-1251 using proper error message.
- Basic replication test cases using vagrant virtual cluster for:
1. LSM bulkload components replication.
2. LSM Memory components replication and recovery.
3. Metadata node takeover.
These test cases will be part of the cluster test profile.
Change-Id: Ice26d980912a315fcb3efdd571d6ce88717cfea4
Reviewed-on: https://asterix-gerrit.ics.uci.edu/573
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Till Westmann <tillw@apache.org>
Reviewed-by: abdullah alamoudi <bamousaa@gmail.com>
diff --git a/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixAppRuntimeContext.java b/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixAppRuntimeContext.java
index 6d0b321..8a40876 100644
--- a/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixAppRuntimeContext.java
+++ b/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixAppRuntimeContext.java
@@ -19,7 +19,10 @@
package org.apache.asterix.api.common;
import java.io.IOException;
+import java.rmi.RemoteException;
+import java.rmi.server.UnicastRemoteObject;
import java.util.List;
+import java.util.logging.Level;
import java.util.logging.Logger;
import org.apache.asterix.common.api.AsterixThreadExecutor;
@@ -46,9 +49,14 @@
import org.apache.asterix.common.transactions.IAsterixAppRuntimeContextProvider;
import org.apache.asterix.common.transactions.IRecoveryManager;
import org.apache.asterix.common.transactions.IRecoveryManager.SystemState;
+import org.apache.asterix.common.transactions.ITransactionSubsystem;
import org.apache.asterix.external.feed.api.IFeedManager;
import org.apache.asterix.external.feed.management.FeedManager;
-import org.apache.asterix.common.transactions.ITransactionSubsystem;
+import org.apache.asterix.metadata.MetadataManager;
+import org.apache.asterix.metadata.MetadataNode;
+import org.apache.asterix.metadata.api.IAsterixStateProxy;
+import org.apache.asterix.metadata.api.IMetadataNode;
+import org.apache.asterix.metadata.bootstrap.MetadataBootstrap;
import org.apache.asterix.metadata.bootstrap.MetadataIndexImmutableProperties;
import org.apache.asterix.om.util.AsterixClusterProperties;
import org.apache.asterix.replication.management.ReplicationChannel;
@@ -85,6 +93,7 @@
import org.apache.hyracks.storage.common.file.IResourceIdFactory;
public class AsterixAppRuntimeContext implements IAsterixAppRuntimeContext, IAsterixPropertiesProvider {
+ private static final Logger LOGGER = Logger.getLogger(AsterixAppRuntimeContext.class.getName());
private static final AsterixPropertiesAccessor ASTERIX_PROPERTIES_ACCESSOR;
@@ -128,8 +137,9 @@
private IReplicationManager replicationManager;
private IRemoteRecoveryManager remoteRecoveryManager;
private IReplicaResourcesManager replicaResourcesManager;
+ private final int metadataRmiPort;
- public AsterixAppRuntimeContext(INCApplicationContext ncApplicationContext) {
+ public AsterixAppRuntimeContext(INCApplicationContext ncApplicationContext, int metadataRmiPort) {
this.ncApplicationContext = ncApplicationContext;
compilerProperties = new AsterixCompilerProperties(ASTERIX_PROPERTIES_ACCESSOR);
externalProperties = new AsterixExternalProperties(ASTERIX_PROPERTIES_ACCESSOR);
@@ -140,6 +150,7 @@
buildProperties = new AsterixBuildProperties(ASTERIX_PROPERTIES_ACCESSOR);
replicationProperties = new AsterixReplicationProperties(ASTERIX_PROPERTIES_ACCESSOR,
AsterixClusterProperties.INSTANCE.getCluster());
+ this.metadataRmiPort = metadataRmiPort;
}
public void initialize(boolean initialRun) throws IOException, ACIDException, AsterixException {
@@ -159,7 +170,8 @@
metadataMergePolicyFactory = new PrefixMergePolicyFactory();
ILocalResourceRepositoryFactory persistentLocalResourceRepositoryFactory = new PersistentLocalResourceRepositoryFactory(
- ioManager, ncApplicationContext.getNodeId());
+ ioManager, ncApplicationContext.getNodeId(), metadataProperties);
+
localResourceRepository = persistentLocalResourceRepositoryFactory.createRepository();
IAsterixAppRuntimeContextProvider asterixAppRuntimeContextProvider = new AsterixAppRuntimeContextProdiverForRecovery(
@@ -186,9 +198,7 @@
if (replicationProperties.isReplicationEnabled()) {
String nodeId = ncApplicationContext.getNodeId();
- replicaResourcesManager = new ReplicaResourcesManager(ioManager.getIODevices(),
- AsterixClusterProperties.INSTANCE.getStorageDirectoryName(), nodeId,
- replicationProperties.getReplicationStore());
+ replicaResourcesManager = new ReplicaResourcesManager(localResourceRepository, metadataProperties);
replicationManager = new ReplicationManager(nodeId, replicationProperties, replicaResourcesManager,
txnSubsystem.getLogManager(), asterixAppRuntimeContextProvider);
@@ -225,14 +235,14 @@
* to process any logs that might be generated during stopping these components
*/
lccm.register((ILifeCycleComponent) txnSubsystem.getLogManager());
- lccm.register((ILifeCycleComponent) txnSubsystem.getRecoveryManager());
/**
- * ReplicationManager must be stopped after indexLifecycleManager so that any logs/files generated
- * during closing datasets are sent to remote replicas
+ * ReplicationManager must be stopped after indexLifecycleManager and recovery manager
+ * so that any logs/files generated during closing datasets or checkpoints are sent to remote replicas
*/
if (replicationManager != null) {
lccm.register(replicationManager);
}
+ lccm.register((ILifeCycleComponent) txnSubsystem.getRecoveryManager());
/**
* Stopping indexLifecycleManager will flush and close all datasets.
*/
@@ -380,4 +390,35 @@
public void initializeResourceIdFactory() throws HyracksDataException {
resourceIdFactory = new GlobalResourceIdFactoryProvider(ncApplicationContext).createResourceIdFactory();
}
+
+ @Override
+ public void initializeMetadata(boolean newUniverse) throws Exception {
+ IAsterixStateProxy proxy = null;
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Bootstrapping metadata");
+ }
+ MetadataNode.INSTANCE.initialize(this);
+
+ proxy = (IAsterixStateProxy) ncApplicationContext.getDistributedState();
+ if (proxy == null) {
+ throw new IllegalStateException("Metadata node cannot access distributed state");
+ }
+
+ // This is a special case, we just give the metadataNode directly.
+ // This way we can delay the registration of the metadataNode until
+ // it is completely initialized.
+ MetadataManager.INSTANCE = new MetadataManager(proxy, MetadataNode.INSTANCE);
+ MetadataBootstrap.startUniverse(this, ncApplicationContext, newUniverse);
+ MetadataBootstrap.startDDLRecovery();
+
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Metadata node bound");
+ }
+ }
+
+ @Override
+ public void exportMetadataNodeStub() throws RemoteException {
+ IMetadataNode stub = (IMetadataNode) UnicastRemoteObject.exportObject(MetadataNode.INSTANCE, metadataRmiPort);
+ ((IAsterixStateProxy) ncApplicationContext.getDistributedState()).setMetadataNode(stub);
+ }
}
diff --git a/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplicationEntryPoint.java b/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplicationEntryPoint.java
index 2a7b3e4..f2fc9bf 100644
--- a/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplicationEntryPoint.java
+++ b/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplicationEntryPoint.java
@@ -79,7 +79,7 @@
@Override
public void start(ICCApplicationContext ccAppCtx, String[] args) throws Exception {
- messageBroker = new CCMessageBroker((ClusterControllerService)ccAppCtx.getControllerService());
+ messageBroker = new CCMessageBroker((ClusterControllerService) ccAppCtx.getControllerService());
this.appCtx = ccAppCtx;
if (LOGGER.isLoggable(Level.INFO)) {
@@ -87,7 +87,11 @@
}
appCtx.setThreadFactory(new AsterixThreadFactory(new LifeCycleComponentManager()));
- AsterixAppContextInfo.initialize(appCtx, getNewHyracksClientConnection());
+ GlobalRecoveryManager.INSTANCE = new GlobalRecoveryManager(
+ (HyracksConnection) getNewHyracksClientConnection());
+
+ AsterixAppContextInfo.initialize(appCtx, getNewHyracksClientConnection(),
+ GlobalRecoveryManager.INSTANCE);
proxy = AsterixStateProxy.registerRemoteObject();
appCtx.setDistributedState(proxy);
@@ -111,9 +115,7 @@
centralFeedManager = CentralFeedManager.getInstance();
centralFeedManager.start();
- AsterixGlobalRecoveryManager.INSTANCE = new AsterixGlobalRecoveryManager(
- (HyracksConnection) getNewHyracksClientConnection());
- ClusterManager.INSTANCE.registerSubscriber(AsterixGlobalRecoveryManager.INSTANCE);
+ ClusterManager.INSTANCE.registerSubscriber(GlobalRecoveryManager.INSTANCE);
AsterixReplicationProperties asterixRepliactionProperties = AsterixAppContextInfo.getInstance()
.getReplicationProperties();
diff --git a/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/ClusterLifecycleListener.java b/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/ClusterLifecycleListener.java
index 9505692..00b7391 100644
--- a/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/ClusterLifecycleListener.java
+++ b/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/ClusterLifecycleListener.java
@@ -95,19 +95,12 @@
if (LOGGER.isLoggable(Level.INFO)) {
LOGGER.info("NC: " + deadNode + " left");
}
- AsterixClusterProperties.INSTANCE.removeNCConfiguration(deadNode);
-
//if metadata node failed, we need to rebind the proxy connection when it joins again.
- //Note: the format for the NC should be (INSTANCE-NAME)_(NC-ID)
- if (AsterixClusterProperties.INSTANCE.getCluster() != null) {
- String instanceName = AsterixClusterProperties.INSTANCE.getCluster().getInstanceName();
- String metadataNodeName = AsterixClusterProperties.INSTANCE.getCluster().getMetadataNode();
- String completeMetadataNodeName = instanceName + "_" + metadataNodeName;
- if (deadNode.equals(completeMetadataNodeName)) {
- MetadataManager.INSTANCE.rebindMetadataNode = true;
- }
+ String metadataNode = AsterixClusterProperties.INSTANCE.getCurrentMetadataNode();
+ if (deadNode.equals(metadataNode)) {
+ MetadataManager.INSTANCE.rebindMetadataNode = true;
}
-
+ AsterixClusterProperties.INSTANCE.removeNCConfiguration(deadNode);
}
updateProgress(ClusterEventType.NODE_FAILURE, deadNodeIds);
Set<IClusterEventsSubscriber> subscribers = ClusterManager.INSTANCE.getRegisteredClusterEventSubscribers();
@@ -166,7 +159,8 @@
case REMOVE_NODE:
nodesToRemove.addAll(((RemoveNodeWork) w).getNodesToBeRemoved());
nodeRemovalRequests.add(w);
- RemoveNodeWorkResponse response = new RemoveNodeWorkResponse((RemoveNodeWork) w, Status.IN_PROGRESS);
+ RemoveNodeWorkResponse response = new RemoveNodeWorkResponse((RemoveNodeWork) w,
+ Status.IN_PROGRESS);
pendingWorkResponses.add(response);
break;
}
diff --git a/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/AsterixGlobalRecoveryManager.java b/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/GlobalRecoveryManager.java
similarity index 91%
rename from asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/AsterixGlobalRecoveryManager.java
rename to asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/GlobalRecoveryManager.java
index 4fae7e9..2bac1cf 100644
--- a/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/AsterixGlobalRecoveryManager.java
+++ b/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/GlobalRecoveryManager.java
@@ -23,13 +23,14 @@
import java.util.logging.Level;
import java.util.logging.Logger;
-import org.apache.asterix.common.api.IClusterEventsSubscriber;
import org.apache.asterix.common.api.IClusterManagementWork;
+import org.apache.asterix.common.api.IClusterManagementWork.ClusterState;
import org.apache.asterix.common.api.IClusterManagementWorkResponse;
-import org.apache.asterix.common.config.MetadataConstants;
+import org.apache.asterix.common.cluster.IGlobalRecoveryMaanger;
import org.apache.asterix.common.config.DatasetConfig.DatasetType;
import org.apache.asterix.common.config.DatasetConfig.ExternalDatasetTransactionState;
import org.apache.asterix.common.config.DatasetConfig.ExternalFilePendingOp;
+import org.apache.asterix.common.config.MetadataConstants;
import org.apache.asterix.external.indexing.ExternalFile;
import org.apache.asterix.feed.CentralFeedManager;
import org.apache.asterix.file.ExternalIndexingOperations;
@@ -41,20 +42,19 @@
import org.apache.asterix.metadata.entities.ExternalDatasetDetails;
import org.apache.asterix.metadata.entities.Index;
import org.apache.asterix.om.util.AsterixClusterProperties;
-import org.apache.asterix.common.api.IClusterManagementWork.ClusterState;
import org.apache.hyracks.api.client.HyracksConnection;
import org.apache.hyracks.api.job.JobId;
import org.apache.hyracks.api.job.JobSpecification;
-public class AsterixGlobalRecoveryManager implements IClusterEventsSubscriber {
+public class GlobalRecoveryManager implements IGlobalRecoveryMaanger {
private static ClusterState state;
- private static final Logger LOGGER = Logger.getLogger(AsterixGlobalRecoveryManager.class.getName());
+ private static final Logger LOGGER = Logger.getLogger(GlobalRecoveryManager.class.getName());
private HyracksConnection hcc;
- public static AsterixGlobalRecoveryManager INSTANCE;
+ public static GlobalRecoveryManager INSTANCE;
- public AsterixGlobalRecoveryManager(HyracksConnection hcc) throws Exception {
- state = AsterixClusterProperties.INSTANCE.getState();
+ public GlobalRecoveryManager(HyracksConnection hcc) throws Exception {
+ state = ClusterState.UNUSABLE;
this.hcc = hcc;
}
@@ -67,6 +67,28 @@
@Override
public Set<IClusterManagementWork> notifyNodeJoin(String joinedNodeId) {
+ startGlobalRecovery();
+ return null;
+ }
+
+ private void executeHyracksJob(JobSpecification spec) throws Exception {
+ spec.setMaxReattempts(0);
+ JobId jobId = hcc.startJob(spec);
+ hcc.waitForCompletion(jobId);
+ }
+
+ @Override
+ public void notifyRequestCompletion(IClusterManagementWorkResponse response) {
+ // Do nothing
+ }
+
+ @Override
+ public void notifyStateChange(ClusterState previousState, ClusterState newState) {
+ // Do nothing?
+ }
+
+ @Override
+ public void startGlobalRecovery() {
// perform global recovery if state changed to active
final ClusterState newState = AsterixClusterProperties.INSTANCE.getState();
boolean needToRecover = !newState.equals(state) && (newState == ClusterState.ACTIVE);
@@ -84,7 +106,8 @@
List<Dataverse> dataverses = MetadataManager.INSTANCE.getDataverses(mdTxnCtx);
for (Dataverse dataverse : dataverses) {
if (!dataverse.getDataverseName().equals(MetadataConstants.METADATA_DATAVERSE_NAME)) {
- AqlMetadataProvider metadataProvider = new AqlMetadataProvider(dataverse, CentralFeedManager.getInstance());
+ AqlMetadataProvider metadataProvider = new AqlMetadataProvider(dataverse,
+ CentralFeedManager.getInstance());
List<Dataset> datasets = MetadataManager.INSTANCE.getDataverseDatasets(mdTxnCtx,
dataverse.getDataverseName());
for (Dataset dataset : datasets) {
@@ -110,8 +133,8 @@
}
// 2. clean artifacts in NCs
metadataProvider.setMetadataTxnContext(mdTxnCtx);
- JobSpecification jobSpec = ExternalIndexingOperations.buildAbortOp(
- dataset, indexes, metadataProvider);
+ JobSpecification jobSpec = ExternalIndexingOperations
+ .buildAbortOp(dataset, indexes, metadataProvider);
executeHyracksJob(jobSpec);
// 3. correct the dataset state
((ExternalDatasetDetails) dataset.getDatasetDetails())
@@ -125,8 +148,8 @@
// if ready to commit, roll forward
// 1. commit indexes in NCs
metadataProvider.setMetadataTxnContext(mdTxnCtx);
- JobSpecification jobSpec = ExternalIndexingOperations.buildRecoverOp(
- dataset, indexes, metadataProvider);
+ JobSpecification jobSpec = ExternalIndexingOperations
+ .buildRecoverOp(dataset, indexes, metadataProvider);
executeHyracksJob(jobSpec);
// 2. add pending files in metadata
for (ExternalFile file : files) {
@@ -134,7 +157,8 @@
MetadataManager.INSTANCE.dropExternalFile(mdTxnCtx, file);
file.setPendingOp(ExternalFilePendingOp.PENDING_NO_OP);
MetadataManager.INSTANCE.addExternalFile(mdTxnCtx, file);
- } else if (file.getPendingOp() == ExternalFilePendingOp.PENDING_DROP_OP) {
+ } else if (file
+ .getPendingOp() == ExternalFilePendingOp.PENDING_DROP_OP) {
// find original file
for (ExternalFile originalFile : files) {
if (originalFile.getFileName().equals(file.getFileName())) {
@@ -145,7 +169,8 @@
break;
}
}
- } else if (file.getPendingOp() == ExternalFilePendingOp.PENDING_APPEND_OP) {
+ } else if (file
+ .getPendingOp() == ExternalFilePendingOp.PENDING_APPEND_OP) {
// find original file
for (ExternalFile originalFile : files) {
if (originalFile.getFileName().equals(file.getFileName())) {
@@ -196,22 +221,5 @@
state = newState;
recoveryThread.start();
}
- return null;
- }
-
- private void executeHyracksJob(JobSpecification spec) throws Exception {
- spec.setMaxReattempts(0);
- JobId jobId = hcc.startJob(spec);
- hcc.waitForCompletion(jobId);
- }
-
- @Override
- public void notifyRequestCompletion(IClusterManagementWorkResponse response) {
- // Do nothing
- }
-
- @Override
- public void notifyStateChange(ClusterState previousState, ClusterState newState) {
- // Do nothing?
}
}
diff --git a/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java b/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java
index 3341387..dac9af5 100644
--- a/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java
+++ b/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java
@@ -20,7 +20,6 @@
import java.io.File;
import java.io.IOException;
-import java.rmi.server.UnicastRemoteObject;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -42,10 +41,6 @@
import org.apache.asterix.event.schema.cluster.Cluster;
import org.apache.asterix.event.schema.cluster.Node;
import org.apache.asterix.messaging.NCMessageBroker;
-import org.apache.asterix.metadata.MetadataManager;
-import org.apache.asterix.metadata.MetadataNode;
-import org.apache.asterix.metadata.api.IAsterixStateProxy;
-import org.apache.asterix.metadata.api.IMetadataNode;
import org.apache.asterix.metadata.bootstrap.MetadataBootstrap;
import org.apache.asterix.om.util.AsterixClusterProperties;
import org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository;
@@ -102,7 +97,7 @@
LOGGER.info("Starting Asterix node controller: " + nodeId);
}
- runtimeContext = new AsterixAppRuntimeContext(ncApplicationContext);
+ runtimeContext = new AsterixAppRuntimeContext(ncApplicationContext, metadataRmiPort);
AsterixMetadataProperties metadataProperties = ((IAsterixPropertiesProvider) runtimeContext)
.getMetadataProperties();
if (!metadataProperties.getNodeNames().contains(ncApplicationContext.getNodeId())) {
@@ -215,33 +210,12 @@
localResourceRepository.initializeNewUniverse(AsterixClusterProperties.INSTANCE.getStorageDirectoryName());
}
- IAsterixStateProxy proxy = null;
isMetadataNode = nodeId.equals(metadataProperties.getMetadataNodeName());
if (isMetadataNode) {
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("Bootstrapping metadata");
- }
- MetadataNode.INSTANCE.initialize(runtimeContext);
-
- proxy = (IAsterixStateProxy) ncApplicationContext.getDistributedState();
- if (proxy == null) {
- throw new IllegalStateException("Metadata node cannot access distributed state");
- }
-
- //This is a special case, we just give the metadataNode directly.
- //This way we can delay the registration of the metadataNode until
- //it is completely initialized.
- MetadataManager.INSTANCE = new MetadataManager(proxy, MetadataNode.INSTANCE);
- MetadataBootstrap.startUniverse(((IAsterixPropertiesProvider) runtimeContext), ncApplicationContext,
- systemState == SystemState.NEW_UNIVERSE);
- MetadataBootstrap.startDDLRecovery();
-
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("Metadata node bound");
- }
+ runtimeContext.initializeMetadata(systemState == SystemState.NEW_UNIVERSE);
}
-
ExternalLibraryBootstrap.setUpExternaLibraries(isMetadataNode);
+
if (LOGGER.isLoggable(Level.INFO)) {
LOGGER.info("Starting lifecycle components");
}
@@ -267,9 +241,7 @@
recoveryMgr.checkpoint(true, RecoveryManager.NON_SHARP_CHECKPOINT_TARGET_LSN);
if (isMetadataNode) {
- IMetadataNode stub = null;
- stub = (IMetadataNode) UnicastRemoteObject.exportObject(MetadataNode.INSTANCE, metadataRmiPort);
- proxy.setMetadataNode(stub);
+ runtimeContext.exportMetadataNodeStub();
}
//Clean any temporary files
diff --git a/asterix-app/src/main/java/org/apache/asterix/messaging/CCMessageBroker.java b/asterix-app/src/main/java/org/apache/asterix/messaging/CCMessageBroker.java
index 095ef1b..aeaef59 100644
--- a/asterix-app/src/main/java/org/apache/asterix/messaging/CCMessageBroker.java
+++ b/asterix-app/src/main/java/org/apache/asterix/messaging/CCMessageBroker.java
@@ -29,14 +29,17 @@
import org.apache.asterix.common.messaging.ReportMaxResourceIdRequestMessage;
import org.apache.asterix.common.messaging.ResourceIdRequestMessage;
import org.apache.asterix.common.messaging.ResourceIdRequestResponseMessage;
+import org.apache.asterix.common.messaging.TakeoverMetadataNodeResponseMessage;
+import org.apache.asterix.common.messaging.TakeoverPartitionsResponseMessage;
+import org.apache.asterix.common.messaging.api.IApplicationMessage;
+import org.apache.asterix.common.messaging.api.ICCMessageBroker;
import org.apache.asterix.om.util.AsterixClusterProperties;
import org.apache.hyracks.api.messages.IMessage;
-import org.apache.hyracks.api.messages.IMessageBroker;
import org.apache.hyracks.api.util.JavaSerializationUtils;
import org.apache.hyracks.control.cc.ClusterControllerService;
import org.apache.hyracks.control.cc.NodeControllerState;
-public class CCMessageBroker implements IMessageBroker {
+public class CCMessageBroker implements ICCMessageBroker {
private final static Logger LOGGER = Logger.getLogger(CCMessageBroker.class.getName());
private final AtomicLong globalResourceId = new AtomicLong(0);
@@ -58,6 +61,12 @@
case REPORT_MAX_RESOURCE_ID_RESPONSE:
handleReportResourceMaxIdResponse(message, nodeId);
break;
+ case TAKEOVER_PARTITIONS_RESPONSE:
+ handleTakeoverPartitionsResponse(message);
+ break;
+ case TAKEOVER_METADATA_NODE_RESPONSE:
+ handleTakeoverMetadataNodeResponse(message);
+ break;
default:
LOGGER.warning("Unknown message: " + absMessage.getMessageType());
break;
@@ -89,7 +98,8 @@
nodesReportedMaxResourceId.add(nodeId);
}
- private void sendApplicationMessageToNC(IMessage msg, String nodeId) throws Exception {
+ @Override
+ public void sendApplicationMessageToNC(IApplicationMessage msg, String nodeId) throws Exception {
Map<String, NodeControllerState> nodeMap = ccs.getNodeMap();
NodeControllerState state = nodeMap.get(nodeId);
state.getNodeController().sendApplicationMessageToNC(JavaSerializationUtils.serialize(msg), null, nodeId);
@@ -106,4 +116,14 @@
}
}
}
+
+ private void handleTakeoverPartitionsResponse(IMessage message) {
+ TakeoverPartitionsResponseMessage msg = (TakeoverPartitionsResponseMessage) message;
+ AsterixClusterProperties.INSTANCE.processPartitionTakeoverResponse(msg);
+ }
+
+ private void handleTakeoverMetadataNodeResponse(IMessage message) {
+ TakeoverMetadataNodeResponseMessage msg = (TakeoverMetadataNodeResponseMessage) message;
+ AsterixClusterProperties.INSTANCE.processMetadataNodeTakeoverResponse(msg);
+ }
}
diff --git a/asterix-app/src/main/java/org/apache/asterix/messaging/NCMessageBroker.java b/asterix-app/src/main/java/org/apache/asterix/messaging/NCMessageBroker.java
index 001771e..8f8723e 100644
--- a/asterix-app/src/main/java/org/apache/asterix/messaging/NCMessageBroker.java
+++ b/asterix-app/src/main/java/org/apache/asterix/messaging/NCMessageBroker.java
@@ -25,9 +25,13 @@
import org.apache.asterix.common.api.IAsterixAppRuntimeContext;
import org.apache.asterix.common.messaging.AbstractApplicationMessage;
import org.apache.asterix.common.messaging.ReportMaxResourceIdMessage;
+import org.apache.asterix.common.messaging.TakeoverMetadataNodeResponseMessage;
+import org.apache.asterix.common.messaging.TakeoverPartitionsRequestMessage;
+import org.apache.asterix.common.messaging.TakeoverPartitionsResponseMessage;
import org.apache.asterix.common.messaging.api.IApplicationMessage;
import org.apache.asterix.common.messaging.api.IApplicationMessageCallback;
import org.apache.asterix.common.messaging.api.INCMessageBroker;
+import org.apache.asterix.common.replication.IRemoteRecoveryManager;
import org.apache.asterix.metadata.bootstrap.MetadataIndexImmutableProperties;
import org.apache.hyracks.api.messages.IMessage;
import org.apache.hyracks.api.util.JavaSerializationUtils;
@@ -75,11 +79,40 @@
case REPORT_MAX_RESOURCE_ID_REQUEST:
reportMaxResourceId();
break;
+ case TAKEOVER_PARTITIONS_REQUEST:
+ handleTakeoverPartitons(message);
+ break;
+ case TAKEOVER_METADATA_NODE_REQUEST:
+ handleTakeoverMetadataNode(message);
+ break;
default:
break;
}
}
+ private void handleTakeoverPartitons(IMessage message) throws Exception {
+ TakeoverPartitionsRequestMessage msg = (TakeoverPartitionsRequestMessage) message;
+ IAsterixAppRuntimeContext appContext = (IAsterixAppRuntimeContext) ncs.getApplicationContext()
+ .getApplicationObject();
+ IRemoteRecoveryManager remoteRecoeryManager = appContext.getRemoteRecoveryManager();
+ remoteRecoeryManager.takeoverPartitons(msg.getFailedNode(), msg.getPartitions());
+ //send response after takeover is completed
+ TakeoverPartitionsResponseMessage reponse = new TakeoverPartitionsResponseMessage(msg.getRequestId(),
+ appContext.getTransactionSubsystem().getId(), msg.getPartitions());
+ sendMessage(reponse, null);
+ }
+
+ private void handleTakeoverMetadataNode(IMessage message) throws Exception {
+ IAsterixAppRuntimeContext appContext = (IAsterixAppRuntimeContext) ncs.getApplicationContext()
+ .getApplicationObject();
+ appContext.initializeMetadata(false);
+ appContext.exportMetadataNodeStub();
+ //send response after takeover is completed
+ TakeoverMetadataNodeResponseMessage reponse = new TakeoverMetadataNodeResponseMessage(
+ appContext.getTransactionSubsystem().getId());
+ sendMessage(reponse, null);
+ }
+
@Override
public void reportMaxResourceId() throws Exception {
IAsterixAppRuntimeContext appContext = (IAsterixAppRuntimeContext) ncs.getApplicationContext()
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/api/IAsterixAppRuntimeContext.java b/asterix-common/src/main/java/org/apache/asterix/common/api/IAsterixAppRuntimeContext.java
index 3386252..975180b 100644
--- a/asterix-common/src/main/java/org/apache/asterix/common/api/IAsterixAppRuntimeContext.java
+++ b/asterix-common/src/main/java/org/apache/asterix/common/api/IAsterixAppRuntimeContext.java
@@ -19,6 +19,7 @@
package org.apache.asterix.common.api;
import java.io.IOException;
+import java.rmi.RemoteException;
import java.util.List;
import java.util.concurrent.Executor;
@@ -89,4 +90,17 @@
public IReplicationChannel getReplicationChannel();
public void initializeResourceIdFactory() throws HyracksDataException;
+
+ /**
+ * Exports the metadata node to the metadata RMI port.
+ * @throws RemoteException
+ */
+ public void exportMetadataNodeStub() throws RemoteException;
+
+ /**
+ * Initializes the metadata node and bootstraps the metadata.
+ * @param newUniverse
+ * @throws Exception
+ */
+ public void initializeMetadata(boolean newUniverse) throws Exception;
}
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/cluster/IGlobalRecoveryMaanger.java b/asterix-common/src/main/java/org/apache/asterix/common/cluster/IGlobalRecoveryMaanger.java
new file mode 100644
index 0000000..48b1e73
--- /dev/null
+++ b/asterix-common/src/main/java/org/apache/asterix/common/cluster/IGlobalRecoveryMaanger.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.cluster;
+
+import org.apache.asterix.common.api.IClusterEventsSubscriber;
+
+public interface IGlobalRecoveryMaanger extends IClusterEventsSubscriber {
+
+ /**
+ * Starts the global recovery process if the cluster state changed to ACTIVE.
+ */
+ public void startGlobalRecovery();
+}
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixMetadataProperties.java b/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixMetadataProperties.java
index 8e2c4e7..473a163 100644
--- a/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixMetadataProperties.java
+++ b/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixMetadataProperties.java
@@ -65,4 +65,8 @@
public SortedMap<Integer, ClusterPartition> getClusterPartitions() {
return accessor.getClusterPartitions();
}
+
+ public Map<String, String> getTransactionLogDirs() {
+ return accessor.getTransactionLogDirs();
+ }
}
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixReplicationProperties.java b/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixReplicationProperties.java
index 1ef7e3e..fa5b503 100644
--- a/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixReplicationProperties.java
+++ b/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixReplicationProperties.java
@@ -36,7 +36,6 @@
private static int REPLICATION_TIME_OUT_DEFAULT = 15;
private static final String NODE_IP_ADDRESS_DEFAULT = "127.0.0.1";
- private static final String REPLICATION_STORE_DEFAULT = "asterix-replication";
private final String NODE_NAME_PREFIX;
private final Cluster cluster;
@@ -102,8 +101,8 @@
}
if (nodeIndex == -1) {
- LOGGER.log(Level.WARNING, "Could not find node " + getRealCluserNodeID(nodeId)
- + " in cluster configurations");
+ LOGGER.log(Level.WARNING,
+ "Could not find node " + getRealCluserNodeID(nodeId) + " in cluster configurations");
return null;
}
@@ -179,13 +178,6 @@
return replicaIds;
}
- public String getReplicationStore() {
- if (cluster != null) {
- return cluster.getDataReplication().getReplicationStore();
- }
- return REPLICATION_STORE_DEFAULT;
- }
-
public int getReplicationFactor() {
if (cluster != null) {
if (cluster.getDataReplication() == null || cluster.getDataReplication().getReplicationFactor() == null) {
@@ -202,5 +194,4 @@
}
return REPLICATION_TIME_OUT_DEFAULT;
}
-
}
\ No newline at end of file
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/dataflow/IAsterixApplicationContextInfo.java b/asterix-common/src/main/java/org/apache/asterix/common/dataflow/IAsterixApplicationContextInfo.java
index 8dc3efe..a5cd72b 100644
--- a/asterix-common/src/main/java/org/apache/asterix/common/dataflow/IAsterixApplicationContextInfo.java
+++ b/asterix-common/src/main/java/org/apache/asterix/common/dataflow/IAsterixApplicationContextInfo.java
@@ -18,6 +18,7 @@
*/
package org.apache.asterix.common.dataflow;
+import org.apache.asterix.common.cluster.IGlobalRecoveryMaanger;
import org.apache.hyracks.api.application.ICCApplicationContext;
import org.apache.hyracks.storage.am.common.api.IIndexLifecycleManagerProvider;
import org.apache.hyracks.storage.common.IStorageManagerInterface;
@@ -48,4 +49,6 @@
* @return ICCApplicationContext implementation instance
*/
public ICCApplicationContext getCCApplicationContext();
+
+ public IGlobalRecoveryMaanger getGlobalRecoveryManager();
}
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/messaging/TakeoverMetadataNodeRequestMessage.java b/asterix-common/src/main/java/org/apache/asterix/common/messaging/TakeoverMetadataNodeRequestMessage.java
new file mode 100644
index 0000000..78b86a8
--- /dev/null
+++ b/asterix-common/src/main/java/org/apache/asterix/common/messaging/TakeoverMetadataNodeRequestMessage.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.messaging;
+
+public class TakeoverMetadataNodeRequestMessage extends AbstractApplicationMessage {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public ApplicationMessageType getMessageType() {
+ return ApplicationMessageType.TAKEOVER_METADATA_NODE_REQUEST;
+ }
+}
\ No newline at end of file
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/messaging/TakeoverMetadataNodeResponseMessage.java b/asterix-common/src/main/java/org/apache/asterix/common/messaging/TakeoverMetadataNodeResponseMessage.java
new file mode 100644
index 0000000..78f7429
--- /dev/null
+++ b/asterix-common/src/main/java/org/apache/asterix/common/messaging/TakeoverMetadataNodeResponseMessage.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.messaging;
+
+public class TakeoverMetadataNodeResponseMessage extends AbstractApplicationMessage {
+
+ private static final long serialVersionUID = 1L;
+ private final String nodeId;
+
+ public TakeoverMetadataNodeResponseMessage(String nodeId) {
+ this.nodeId = nodeId;
+ }
+
+ @Override
+ public ApplicationMessageType getMessageType() {
+ return ApplicationMessageType.TAKEOVER_METADATA_NODE_RESPONSE;
+ }
+
+ public String getNodeId() {
+ return nodeId;
+ }
+}
\ No newline at end of file
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/messaging/TakeoverPartitionsRequestMessage.java b/asterix-common/src/main/java/org/apache/asterix/common/messaging/TakeoverPartitionsRequestMessage.java
new file mode 100644
index 0000000..abfa7d2
--- /dev/null
+++ b/asterix-common/src/main/java/org/apache/asterix/common/messaging/TakeoverPartitionsRequestMessage.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.messaging;
+
+public class TakeoverPartitionsRequestMessage extends AbstractApplicationMessage {
+
+ private static final long serialVersionUID = 1L;
+ private final Integer[] partitions;
+ private final String failedNode;
+ private final long requestId;
+ private final String nodeId;
+
+ public TakeoverPartitionsRequestMessage(long requestId, String nodeId, String failedNode,
+ Integer[] partitionsToTakeover) {
+ this.requestId = requestId;
+ this.nodeId = nodeId;
+ this.failedNode = failedNode;
+ this.partitions = partitionsToTakeover;
+ }
+
+ @Override
+ public ApplicationMessageType getMessageType() {
+ return ApplicationMessageType.TAKEOVER_PARTITIONS_REQUEST;
+ }
+
+ public Integer[] getPartitions() {
+ return partitions;
+ }
+
+ public long getRequestId() {
+ return requestId;
+ }
+
+ public String getFailedNode() {
+ return failedNode;
+ }
+
+ public String getNodeId() {
+ return nodeId;
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder();
+ sb.append("Request ID: " + requestId);
+ sb.append(" Node ID: " + nodeId);
+ sb.append(" Failed Node: " + failedNode);
+ sb.append(" Partitions: ");
+ for (Integer partitionId : partitions) {
+ sb.append(partitionId + ",");
+ }
+ //remove last comma
+ sb.charAt(sb.length() - 1);
+ return sb.toString();
+ }
+}
\ No newline at end of file
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/messaging/TakeoverPartitionsResponseMessage.java b/asterix-common/src/main/java/org/apache/asterix/common/messaging/TakeoverPartitionsResponseMessage.java
new file mode 100644
index 0000000..86eb3cb
--- /dev/null
+++ b/asterix-common/src/main/java/org/apache/asterix/common/messaging/TakeoverPartitionsResponseMessage.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.messaging;
+
+public class TakeoverPartitionsResponseMessage extends AbstractApplicationMessage {
+
+ private static final long serialVersionUID = 1L;
+ private final Integer[] partitions;
+ private final String nodeId;
+ private final long requestId;
+
+ public TakeoverPartitionsResponseMessage(long requestId, String nodeId, Integer[] partitionsToTakeover) {
+ this.requestId = requestId;
+ this.nodeId = nodeId;
+ this.partitions = partitionsToTakeover;
+ }
+
+ @Override
+ public ApplicationMessageType getMessageType() {
+ return ApplicationMessageType.TAKEOVER_PARTITIONS_RESPONSE;
+ }
+
+ public Integer[] getPartitions() {
+ return partitions;
+ }
+
+ public String getNodeId() {
+ return nodeId;
+ }
+
+ public long getRequestId() {
+ return requestId;
+ }
+}
\ No newline at end of file
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/IApplicationMessage.java b/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/IApplicationMessage.java
index 61ab7cd..57a0dae 100644
--- a/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/IApplicationMessage.java
+++ b/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/IApplicationMessage.java
@@ -26,7 +26,11 @@
RESOURCE_ID_REQUEST,
RESOURCE_ID_RESPONSE,
REPORT_MAX_RESOURCE_ID_REQUEST,
- REPORT_MAX_RESOURCE_ID_RESPONSE
+ REPORT_MAX_RESOURCE_ID_RESPONSE,
+ TAKEOVER_PARTITIONS_REQUEST,
+ TAKEOVER_PARTITIONS_RESPONSE,
+ TAKEOVER_METADATA_NODE_REQUEST,
+ TAKEOVER_METADATA_NODE_RESPONSE
}
public abstract ApplicationMessageType getMessageType();
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/ICCMessageBroker.java b/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/ICCMessageBroker.java
new file mode 100644
index 0000000..7dafbd5
--- /dev/null
+++ b/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/ICCMessageBroker.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.messaging.api;
+
+import org.apache.hyracks.api.messages.IMessageBroker;
+
+public interface ICCMessageBroker extends IMessageBroker {
+
+ /**
+ * Sends the passed message to the specified {@code nodeId}
+ * @param msg
+ * @param nodeId
+ * @throws Exception
+ */
+ public void sendApplicationMessageToNC(IApplicationMessage msg, String nodeId) throws Exception;
+}
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/replication/IRemoteRecoveryManager.java b/asterix-common/src/main/java/org/apache/asterix/common/replication/IRemoteRecoveryManager.java
index 63d29a0..ecc9494 100644
--- a/asterix-common/src/main/java/org/apache/asterix/common/replication/IRemoteRecoveryManager.java
+++ b/asterix-common/src/main/java/org/apache/asterix/common/replication/IRemoteRecoveryManager.java
@@ -18,8 +18,24 @@
*/
package org.apache.asterix.common.replication;
+import java.io.IOException;
+
+import org.apache.asterix.common.exceptions.ACIDException;
+
public interface IRemoteRecoveryManager {
+ /**
+ * Attempts to perform the remote recovery process from an active remote replica.
+ */
public void performRemoteRecovery();
+ /**
+ * Performs the partitions takeover process from the {@code failedNode}
+ * @param failedNode
+ * @param partitions
+ * @throws IOException
+ * @throws ACIDException
+ */
+ public void takeoverPartitons(String failedNode, Integer[] partitions) throws IOException, ACIDException;
+
}
\ No newline at end of file
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/replication/IReplicaResourcesManager.java b/asterix-common/src/main/java/org/apache/asterix/common/replication/IReplicaResourcesManager.java
index c796f37..f13d300 100644
--- a/asterix-common/src/main/java/org/apache/asterix/common/replication/IReplicaResourcesManager.java
+++ b/asterix-common/src/main/java/org/apache/asterix/common/replication/IReplicaResourcesManager.java
@@ -22,10 +22,15 @@
public interface IReplicaResourcesManager {
- public String getIndexPath(String backupNodeId, int IODeviceNum, String dataverse, String dataset);
-
- public String getLocalStorageFolder();
-
+ /**
+ * @param remoteNodes
+ * @return The minimum LSN of all indexes that belong to {@code remoteNodes}.
+ */
public long getMinRemoteLSN(Set<String> remoteNodes);
+ /**
+ * @param partitions
+ * @return the minimum LSN of all indexes that belong to {@code partitions}.
+ */
+ public long getPartitionsMinLSN(Integer[] partitions);
}
\ No newline at end of file
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/transactions/IRecoveryManager.java b/asterix-common/src/main/java/org/apache/asterix/common/transactions/IRecoveryManager.java
index 9ea9957..a2b7a82 100644
--- a/asterix-common/src/main/java/org/apache/asterix/common/transactions/IRecoveryManager.java
+++ b/asterix-common/src/main/java/org/apache/asterix/common/transactions/IRecoveryManager.java
@@ -111,4 +111,14 @@
* @throws HyracksDataException
*/
public long getLocalMinFirstLSN() throws HyracksDataException;
+
+ /**
+ * Replay the logs that belong to the passed {@code partitions} starting from the {@code lowWaterMarkLSN}
+ * @param partitions
+ * @param lowWaterMarkLSN
+ * @param failedNode
+ * @throws IOException
+ * @throws ACIDException
+ */
+ public void replayPartitionsLogs(Integer[] partitions, long lowWaterMarkLSN, String failedNode) throws IOException, ACIDException;
}
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/utils/StoragePathUtil.java b/asterix-common/src/main/java/org/apache/asterix/common/utils/StoragePathUtil.java
index acfb9d5..48e42bd 100644
--- a/asterix-common/src/main/java/org/apache/asterix/common/utils/StoragePathUtil.java
+++ b/asterix-common/src/main/java/org/apache/asterix/common/utils/StoragePathUtil.java
@@ -56,6 +56,14 @@
}
public static String prepareDataverseIndexName(String dataverseName, String datasetName, String idxName) {
- return dataverseName + File.separator + datasetName + StoragePathUtil.DATASET_INDEX_NAME_SEPARATOR + idxName;
+ return prepareDataverseIndexName(dataverseName, prepareFullIndexName(datasetName, idxName));
+ }
+
+ public static String prepareDataverseIndexName(String dataverseName, String fullIndexName) {
+ return dataverseName + File.separator + fullIndexName;
+ }
+
+ private static String prepareFullIndexName(String datasetName, String idxName) {
+ return (datasetName + DATASET_INDEX_NAME_SEPARATOR + idxName);
}
}
diff --git a/asterix-common/src/main/resources/schema/cluster.xsd b/asterix-common/src/main/resources/schema/cluster.xsd
index 872c959..e0605f0 100644
--- a/asterix-common/src/main/resources/schema/cluster.xsd
+++ b/asterix-common/src/main/resources/schema/cluster.xsd
@@ -47,7 +47,7 @@
<xs:element name="enabled" type="xs:boolean" />
<xs:element name="replication_port" type="xs:integer" />
<xs:element name="replication_factor" type="xs:integer" />
- <xs:element name="replication_store" type="xs:string" />
+ <xs:element name="auto_failover" type="xs:boolean" />
<xs:element name="replication_time_out" type="xs:integer" />
<!-- definition of complex elements -->
@@ -82,7 +82,7 @@
<xs:element ref="cl:enabled" />
<xs:element ref="cl:replication_port" />
<xs:element ref="cl:replication_factor" />
- <xs:element ref="cl:replication_store" />
+ <xs:element ref="cl:auto_failover" />
<xs:element ref="cl:replication_time_out" />
</xs:sequence>
</xs:complexType>
diff --git a/asterix-common/src/test/java/org/apache/asterix/test/aql/TestExecutor.java b/asterix-common/src/test/java/org/apache/asterix/test/aql/TestExecutor.java
index 447e96d..f54db63 100644
--- a/asterix-common/src/test/java/org/apache/asterix/test/aql/TestExecutor.java
+++ b/asterix-common/src/test/java/org/apache/asterix/test/aql/TestExecutor.java
@@ -24,12 +24,14 @@
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.FileReader;
+import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
+import java.nio.charset.StandardCharsets;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
@@ -54,6 +56,7 @@
import org.apache.commons.httpclient.methods.StringRequestEntity;
import org.apache.commons.httpclient.params.HttpMethodParams;
import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang3.StringUtils;
import org.json.JSONObject;
public class TestExecutor {
@@ -361,6 +364,14 @@
return getProcessOutput(p);
}
+ private static String executeVagrantScript(ProcessBuilder pb, String node, String scriptName) throws Exception {
+ pb.command("vagrant", "ssh", node, "--", pb.environment().get("SCRIPT_HOME") + scriptName);
+ Process p = pb.start();
+ p.waitFor();
+ InputStream input = p.getInputStream();
+ return IOUtils.toString(input, StandardCharsets.UTF_8.name());
+ }
+
private static String getScriptPath(String queryPath, String scriptBasePath, String scriptFileName) {
String targetWord = "queries" + File.separator;
int targetWordSize = targetWord.lastIndexOf(File.separator);
@@ -565,6 +576,22 @@
}
System.err.println("...but that was expected.");
break;
+ case "vagrant_script":
+ try {
+ String[] command = statement.trim().split(" ");
+ if (command.length != 2) {
+ throw new Exception("invalid vagrant script format");
+ }
+ String nodeId = command[0];
+ String scriptName = command[1];
+ String output = executeVagrantScript(pb, nodeId, scriptName);
+ if (output.contains("ERROR")) {
+ throw new Exception(output);
+ }
+ } catch (Exception e) {
+ throw new Exception("Test \"" + testFile + "\" FAILED!\n", e);
+ }
+ break;
default:
throw new IllegalArgumentException("No statements of type " + ctx.getType());
}
diff --git a/asterix-events/src/main/java/org/apache/asterix/event/util/PatternCreator.java b/asterix-events/src/main/java/org/apache/asterix/event/util/PatternCreator.java
index 6085019..ce84cc8 100644
--- a/asterix-events/src/main/java/org/apache/asterix/event/util/PatternCreator.java
+++ b/asterix-events/src/main/java/org/apache/asterix/event/util/PatternCreator.java
@@ -315,10 +315,6 @@
patternList.addAll(createRemoveAsterixLogDirPattern(instance).getPattern());
patternList.addAll(createRemoveAsterixRootMetadata(instance).getPattern());
patternList.addAll(createRemoveAsterixTxnLogs(instance).getPattern());
- if (instance.getCluster().getDataReplication() != null
- && instance.getCluster().getDataReplication().isEnabled()) {
- patternList.addAll(createRemoveAsterixReplicationPattern(instance).getPattern());
- }
Patterns patterns = new Patterns(patternList);
return patterns;
}
@@ -647,29 +643,4 @@
Patterns patterns = new Patterns(patternList);
return patterns;
}
-
- private Patterns createRemoveAsterixReplicationPattern(AsterixInstance instance) throws Exception {
-
- List<Pattern> patternList = new ArrayList<Pattern>();
- Cluster cluster = instance.getCluster();
-
- Nodeid nodeid = null;
- String pargs = null;
- Event event = null;
- for (Node node : cluster.getNode()) {
- String[] nodeIODevices;
- String iodevices = node.getIodevices() == null ? cluster.getIodevices() : node.getIodevices();
- nodeIODevices = iodevices.trim().split(",");
- for (String nodeIODevice : nodeIODevices) {
- pargs = nodeIODevice + File.separator + cluster.getDataReplication().getReplicationStore();
- nodeid = new Nodeid(new Value(null, node.getId()));
- event = new Event("file_delete", nodeid, pargs);
- patternList.add(new Pattern(null, 1, null, event));
- }
- }
-
- Patterns patterns = new Patterns(patternList);
- return patterns;
- }
-
}
diff --git a/asterix-installer/pom.xml b/asterix-installer/pom.xml
index 34c3e0a..b257e71 100644
--- a/asterix-installer/pom.xml
+++ b/asterix-installer/pom.xml
@@ -29,6 +29,7 @@
<failsafe.test.excludes>**/DmlRecoveryIT.java</failsafe.test.excludes>
<cluster.test.excludes>**/AsterixClusterLifeCycleIT.java</cluster.test.excludes>
<cluster.extest.excludes>**/ClusterExecutionIT.java</cluster.extest.excludes>
+ <replication.test.excludes>**/ReplicationIT.java</replication.test.excludes>
</properties>
<licenses>
@@ -123,6 +124,7 @@
<exclude>${failsafe.test.excludes}</exclude>
<exclude>${cluster.test.excludes}</exclude>
<exclude>${cluster.extest.excludes}</exclude>
+ <exclude>${replication.test.excludes}</exclude>
</excludes>
</configuration>
<executions>
@@ -194,6 +196,8 @@
<forkMode>pertest</forkMode>
<excludes>
<exclude>${failsafe.test.excludes}</exclude>
+ <exclude>${cluster.test.excludes}</exclude>
+ <exclude>${cluster.extest.excludes}</exclude>
</excludes>
</configuration>
<executions>
diff --git a/asterix-installer/src/main/java/org/apache/asterix/installer/command/ValidateCommand.java b/asterix-installer/src/main/java/org/apache/asterix/installer/command/ValidateCommand.java
index 09c65c8..9559394 100644
--- a/asterix-installer/src/main/java/org/apache/asterix/installer/command/ValidateCommand.java
+++ b/asterix-installer/src/main/java/org/apache/asterix/installer/command/ValidateCommand.java
@@ -288,12 +288,6 @@
valid = false;
}
- if (cluster.getDataReplication().getReplicationStore() == null
- || cluster.getDataReplication().getReplicationStore().length() == 0) {
- valid = false;
- LOGGER.fatal("Replication store not defined. " + ERROR);
- }
-
if (cluster.getDataReplication().getReplicationPort() == null
|| cluster.getDataReplication().getReplicationPort().toString().length() == 0) {
valid = false;
diff --git a/asterix-installer/src/test/java/org/apache/asterix/installer/test/ReplicationIT.java b/asterix-installer/src/test/java/org/apache/asterix/installer/test/ReplicationIT.java
new file mode 100644
index 0000000..86c15ae
--- /dev/null
+++ b/asterix-installer/src/test/java/org/apache/asterix/installer/test/ReplicationIT.java
@@ -0,0 +1,257 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.installer.test;
+
+import java.io.File;
+import java.io.FilenameFilter;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Map;
+import java.util.logging.Logger;
+
+import org.apache.asterix.test.aql.TestExecutor;
+import org.apache.asterix.testframework.context.TestCaseContext;
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.codehaus.plexus.util.FileUtils;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
+@RunWith(Parameterized.class)
+public class ReplicationIT {
+
+ private static final String PATH_BASE = StringUtils
+ .join(new String[] { "src", "test", "resources", "integrationts", "replication" }, File.separator);
+ private static final String CLUSTER_BASE = StringUtils
+ .join(new String[] { "src", "test", "resources", "clusterts" }, File.separator);
+ private static final String PATH_ACTUAL = "repliationtest" + File.separator;
+ private static String managixFolderName;
+ private static final Logger LOGGER = Logger.getLogger(ReplicationIT.class.getName());
+ private static File asterixProjectDir = new File(System.getProperty("user.dir"));
+ private static final String CLUSTER_CC_ADDRESS = "10.10.0.2";
+ private static final int CLUSTER_CC_API_PORT = 19002;
+ private static ProcessBuilder pb;
+ private static Map<String, String> env;
+ private final static TestExecutor testExecutor = new TestExecutor(CLUSTER_CC_ADDRESS, CLUSTER_CC_API_PORT);
+ private static String SCRIPT_HOME;
+ protected TestCaseContext tcCtx;
+
+ public ReplicationIT(TestCaseContext tcCtx) {
+ this.tcCtx = tcCtx;
+ }
+
+ @BeforeClass
+ public static void setUp() throws Exception {
+ File outdir = new File(PATH_ACTUAL);
+ outdir.mkdirs();
+
+ // vagrant setup
+ File installerTargetDir = new File(asterixProjectDir, "target");
+ String[] installerFiles = installerTargetDir.list(new FilenameFilter() {
+ @Override
+ public boolean accept(File dir, String name) {
+ return new File(dir, name).isDirectory() && name.startsWith("asterix-installer")
+ && name.endsWith("binary-assembly");
+ }
+ });
+
+ if (installerFiles == null || installerFiles.length == 0) {
+ throw new Exception("Couldn't find installer binaries");
+ }
+
+ managixFolderName = installerFiles[0];
+
+ //copy tests data
+ FileUtils.copyDirectoryStructure(
+ new File(StringUtils.join(
+ new String[] { "..", "asterix-replication", "src", "test", "resources", "data" },
+ File.separator)),
+ new File(StringUtils.join(new String[] { "src", "test", "resources", "clusterts", "data" },
+ File.separator)));
+
+ //copy tests scripts
+ FileUtils.copyDirectoryStructure(
+ new File(StringUtils.join(
+ new String[] { "..", "asterix-replication", "src", "test", "resources", "scripts" },
+ File.separator)),
+ new File(StringUtils.join(new String[] { "src", "test", "resources", "clusterts", "scripts" },
+ File.separator)));
+
+ invoke("cp", "-r", installerTargetDir.toString() + "/" + managixFolderName,
+ asterixProjectDir + "/" + CLUSTER_BASE);
+
+ remoteInvoke("cp -r /vagrant/" + managixFolderName + " /tmp/asterix");
+
+ SCRIPT_HOME = "/vagrant/scripts/";
+ pb = new ProcessBuilder();
+ env = pb.environment();
+ env.put("SCRIPT_HOME", SCRIPT_HOME);
+ File cwd = new File(asterixProjectDir.toString() + "/" + CLUSTER_BASE);
+ pb.directory(cwd);
+ pb.redirectErrorStream(true);
+
+ //make scripts executable
+ String chmodScriptsCmd = "chmod -R +x " + SCRIPT_HOME;
+ remoteInvoke(chmodScriptsCmd, "cc");
+ remoteInvoke(chmodScriptsCmd, "nc1");
+ remoteInvoke(chmodScriptsCmd, "nc2");
+
+ //managix configure
+ logOutput(managixInvoke("configure").getInputStream());
+
+ //managix validate
+ String validateOutput = IOUtils.toString(managixInvoke("validate").getInputStream(),
+ StandardCharsets.UTF_8.name());
+ if (validateOutput.contains("ERROR")) {
+ throw new Exception("Managix validate error: " + validateOutput);
+ }
+ }
+
+ @AfterClass
+ public static void tearDown() throws Exception {
+ //remove files
+ remoteInvoke("rm -rf /vagrant/asterix");
+ }
+
+ @Before
+ public void beforeTest() throws Exception {
+ //create instance
+ managixInvoke("create -n vagrant-ssh -c /vagrant/cluster_with_replication.xml").getInputStream();
+ }
+
+ @After
+ public void afterTest() throws Exception {
+ //stop instance
+ managixInvoke("stop -n vagrant-ssh");
+
+ //verify that all processes have been stopped
+ String killProcesses = "kill_cc_and_nc.sh";
+ executeVagrantScript("cc", killProcesses);
+ executeVagrantScript("nc1", killProcesses);
+ executeVagrantScript("nc2", killProcesses);
+
+ //delete storage
+ String deleteStorage = "delete_storage.sh";
+ executeVagrantScript("cc", deleteStorage);
+ executeVagrantScript("nc1", deleteStorage);
+ executeVagrantScript("nc2", deleteStorage);
+
+ //delete instance
+ managixInvoke("delete -n vagrant-ssh");
+ }
+
+ @Test
+ public void test() throws Exception {
+ testExecutor.executeTest(PATH_ACTUAL, tcCtx, pb, false);
+ }
+
+ @Parameters(name = "ReplicationIT {index}: {0}")
+ public static Collection<Object[]> tests() throws Exception {
+ Collection<Object[]> testArgs = buildTestsInXml(TestCaseContext.DEFAULT_TESTSUITE_XML_NAME);
+ if (testArgs.size() == 0) {
+ testArgs = buildTestsInXml(TestCaseContext.DEFAULT_TESTSUITE_XML_NAME);
+ }
+ return testArgs;
+ }
+
+ protected static Collection<Object[]> buildTestsInXml(String xmlfile) throws Exception {
+ Collection<Object[]> testArgs = new ArrayList<Object[]>();
+ TestCaseContext.Builder b = new TestCaseContext.Builder();
+ for (TestCaseContext ctx : b.build(new File(PATH_BASE), xmlfile)) {
+ testArgs.add(new Object[] { ctx });
+ }
+ return testArgs;
+ }
+
+ public static boolean checkOutput(InputStream input, String requiredSubString) {
+ String candidate;
+ try {
+ candidate = IOUtils.toString(input, StandardCharsets.UTF_8.name());
+ } catch (IOException e) {
+ LOGGER.warning("Could not check output of subprocess");
+ return false;
+ }
+ return candidate.contains(requiredSubString);
+ }
+
+ public static boolean checkOutput(String candidate, String requiredSubString) {
+ return candidate.contains(requiredSubString);
+ }
+
+ public static String processOut(Process p) throws IOException {
+ InputStream input = p.getInputStream();
+ return IOUtils.toString(input, StandardCharsets.UTF_8.name());
+ }
+
+ public static void logOutput(InputStream input) {
+ try {
+ LOGGER.info(IOUtils.toString(input, StandardCharsets.UTF_8.name()));
+ } catch (IOException e) {
+ LOGGER.warning("Could not print output of subprocess");
+ }
+ }
+
+ private static Process invoke(String... args) throws Exception {
+ ProcessBuilder pb = new ProcessBuilder(args);
+ pb.redirectErrorStream(true);
+ Process p = pb.start();
+ return p;
+ }
+
+ private static Process remoteInvoke(String cmd) throws Exception {
+ ProcessBuilder pb = new ProcessBuilder("vagrant", "ssh", "cc", "-c", "MANAGIX_HOME=/tmp/asterix/ " + cmd);
+ File cwd = new File(asterixProjectDir.toString() + "/" + CLUSTER_BASE);
+ pb.directory(cwd);
+ pb.redirectErrorStream(true);
+ Process p = pb.start();
+ p.waitFor();
+ return p;
+ }
+
+ private static Process remoteInvoke(String cmd, String node) throws Exception {
+ ProcessBuilder pb = new ProcessBuilder("vagrant", "ssh", node, "--", cmd);
+ File cwd = new File(asterixProjectDir.toString() + "/" + CLUSTER_BASE);
+ pb.directory(cwd);
+ pb.redirectErrorStream(true);
+ Process p = pb.start();
+ p.waitFor();
+ return p;
+ }
+
+ private static Process managixInvoke(String cmd) throws Exception {
+ return remoteInvoke("/tmp/asterix/bin/managix " + cmd);
+ }
+
+ private static String executeVagrantScript(String node, String scriptName) throws Exception {
+ pb.command("vagrant", "ssh", node, "--", SCRIPT_HOME + scriptName);
+ Process p = pb.start();
+ p.waitFor();
+ InputStream input = p.getInputStream();
+ return IOUtils.toString(input, StandardCharsets.UTF_8.name());
+ }
+}
\ No newline at end of file
diff --git a/asterix-installer/src/test/resources/clusterts/cluster_with_replication.xml b/asterix-installer/src/test/resources/clusterts/cluster_with_replication.xml
new file mode 100644
index 0000000..b9f4658
--- /dev/null
+++ b/asterix-installer/src/test/resources/clusterts/cluster_with_replication.xml
@@ -0,0 +1,63 @@
+<!--
+ ! Licensed to the Apache Software Foundation (ASF) under one
+ ! or more contributor license agreements. See the NOTICE file
+ ! distributed with this work for additional information
+ ! regarding copyright ownership. The ASF licenses this file
+ ! to you under the Apache License, Version 2.0 (the
+ ! "License"); you may not use this file except in compliance
+ ! with the License. You may obtain a copy of the License at
+ !
+ ! http://www.apache.org/licenses/LICENSE-2.0
+ !
+ ! Unless required by applicable law or agreed to in writing,
+ ! software distributed under the License is distributed on an
+ ! "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ ! KIND, either express or implied. See the License for the
+ ! specific language governing permissions and limitations
+ ! under the License.
+ !-->
+<cluster xmlns="cluster">
+
+ <name>vagrant</name>
+
+ <username>vagrant</username>
+
+ <working_dir>
+ <dir>/vagrant/asterix/managix-working</dir>
+ <NFS>true</NFS>
+ </working_dir>
+
+ <log_dir>/home/vagrant/asterix/logs/</log_dir>
+ <txn_log_dir>/home/vagrant/asterix/tx_logs</txn_log_dir>
+ <iodevices>/home/vagrant/asterix/p1,/home/vagrant/asterix/p2</iodevices>
+
+ <store>storage</store>
+
+ <java_home>/usr/java/latest</java_home>
+ <metadata_node>nc1</metadata_node>
+
+ <data_replication>
+ <enabled>true</enabled>
+ <replication_port>2000</replication_port>
+ <replication_factor>2</replication_factor>
+ <auto_failover>true</auto_failover>
+ <replication_time_out>10</replication_time_out>
+ </data_replication>
+
+ <master_node>
+ <id>cc</id>
+ <client_ip>10.10.0.2</client_ip>
+ <cluster_ip>10.10.0.2</cluster_ip>
+ <client_port>1098</client_port>
+ <cluster_port>1099</cluster_port>
+ <http_port>8888</http_port>
+ </master_node>
+ <node>
+ <id>nc1</id>
+ <cluster_ip>10.10.0.3</cluster_ip>
+ </node>
+ <node>
+ <id>nc2</id>
+ <cluster_ip>10.10.0.4</cluster_ip>
+ </node>
+</cluster>
\ No newline at end of file
diff --git a/asterix-installer/src/test/resources/clusterts/known_hosts b/asterix-installer/src/test/resources/clusterts/known_hosts
index 5ab452a..273f9f3 100644
--- a/asterix-installer/src/test/resources/clusterts/known_hosts
+++ b/asterix-installer/src/test/resources/clusterts/known_hosts
@@ -1,6 +1,6 @@
-nc1,10.10.0.3 ssh-rsa AAAAB3NzaC1yc2EAAAABIwAAAQEA4DTPG2/P073Ak5htIlGYxFh9BYo3bwxBW/SZ1+MDtBUGSEFG0wP4dDRNxwzdmc2EB5JbiAb4t2wLtlFJUbhZUuhxxv2WP4Uastt+U2CWe8+/OsSCAieDv9+dvhT2YxkyAwSCFjyv9T+ftE7YyuIqgBoDTcsvCGzhcl80xd/mQuboneYdR8A0Q2cbd47BQ1D+76HW3l0t51N4fsvbds/LdkXtqVqCadiTKuZ/Ki9JkWlIwffF1IvtARdkW5hyA2fkdJcNBGcfIlxcEvXUvEcIi2FVdY+rJTR9014SrspPtfTyPe4mMgwNQGGT1dm9tnjhq/N6WrYOQ8J3HUn+r7tO5w==
-nc2,10.10.0.4 ssh-rsa AAAAB3NzaC1yc2EAAAABIwAAAQEA4DTPG2/P073Ak5htIlGYxFh9BYo3bwxBW/SZ1+MDtBUGSEFG0wP4dDRNxwzdmc2EB5JbiAb4t2wLtlFJUbhZUuhxxv2WP4Uastt+U2CWe8+/OsSCAieDv9+dvhT2YxkyAwSCFjyv9T+ftE7YyuIqgBoDTcsvCGzhcl80xd/mQuboneYdR8A0Q2cbd47BQ1D+76HW3l0t51N4fsvbds/LdkXtqVqCadiTKuZ/Ki9JkWlIwffF1IvtARdkW5hyA2fkdJcNBGcfIlxcEvXUvEcIi2FVdY+rJTR9014SrspPtfTyPe4mMgwNQGGT1dm9tnjhq/N6WrYOQ8J3HUn+r7tO5w==
-nc3,10.10.0.5 ssh-rsa AAAAB3NzaC1yc2EAAAABIwAAAQEA4DTPG2/P073Ak5htIlGYxFh9BYo3bwxBW/SZ1+MDtBUGSEFG0wP4dDRNxwzdmc2EB5JbiAb4t2wLtlFJUbhZUuhxxv2WP4Uastt+U2CWe8+/OsSCAieDv9+dvhT2YxkyAwSCFjyv9T+ftE7YyuIqgBoDTcsvCGzhcl80xd/mQuboneYdR8A0Q2cbd47BQ1D+76HW3l0t51N4fsvbds/LdkXtqVqCadiTKuZ/Ki9JkWlIwffF1IvtARdkW5hyA2fkdJcNBGcfIlxcEvXUvEcIi2FVdY+rJTR9014SrspPtfTyPe4mMgwNQGGT1dm9tnjhq/N6WrYOQ8J3HUn+r7tO5w==
-127.0.0.1 ssh-rsa AAAAB3NzaC1yc2EAAAABIwAAAQEA4DTPG2/P073Ak5htIlGYxFh9BYo3bwxBW/SZ1+MDtBUGSEFG0wP4dDRNxwzdmc2EB5JbiAb4t2wLtlFJUbhZUuhxxv2WP4Uastt+U2CWe8+/OsSCAieDv9+dvhT2YxkyAwSCFjyv9T+ftE7YyuIqgBoDTcsvCGzhcl80xd/mQuboneYdR8A0Q2cbd47BQ1D+76HW3l0t51N4fsvbds/LdkXtqVqCadiTKuZ/Ki9JkWlIwffF1IvtARdkW5hyA2fkdJcNBGcfIlxcEvXUvEcIi2FVdY+rJTR9014SrspPtfTyPe4mMgwNQGGT1dm9tnjhq/N6WrYOQ8J3HUn+r7tO5w==
::1 ssh-rsa AAAAB3NzaC1yc2EAAAABIwAAAQEA4DTPG2/P073Ak5htIlGYxFh9BYo3bwxBW/SZ1+MDtBUGSEFG0wP4dDRNxwzdmc2EB5JbiAb4t2wLtlFJUbhZUuhxxv2WP4Uastt+U2CWe8+/OsSCAieDv9+dvhT2YxkyAwSCFjyv9T+ftE7YyuIqgBoDTcsvCGzhcl80xd/mQuboneYdR8A0Q2cbd47BQ1D+76HW3l0t51N4fsvbds/LdkXtqVqCadiTKuZ/Ki9JkWlIwffF1IvtARdkW5hyA2fkdJcNBGcfIlxcEvXUvEcIi2FVdY+rJTR9014SrspPtfTyPe4mMgwNQGGT1dm9tnjhq/N6WrYOQ8J3HUn+r7tO5w==
-10.10.0.2 ssh-rsa AAAAB3NzaC1yc2EAAAABIwAAAQEA4DTPG2/P073Ak5htIlGYxFh9BYo3bwxBW/SZ1+MDtBUGSEFG0wP4dDRNxwzdmc2EB5JbiAb4t2wLtlFJUbhZUuhxxv2WP4Uastt+U2CWe8+/OsSCAieDv9+dvhT2YxkyAwSCFjyv9T+ftE7YyuIqgBoDTcsvCGzhcl80xd/mQuboneYdR8A0Q2cbd47BQ1D+76HW3l0t51N4fsvbds/LdkXtqVqCadiTKuZ/Ki9JkWlIwffF1IvtARdkW5hyA2fkdJcNBGcfIlxcEvXUvEcIi2FVdY+rJTR9014SrspPtfTyPe4mMgwNQGGT1dm9tnjhq/N6WrYOQ8J3HUn+r7tO5w==
+127.0.0.1 ssh-rsa AAAAB3NzaC1yc2EAAAABIwAAAQEA9gfVcSU968DdflnfJkup3em0MFX13uUTxsiaPisyahjcc++d+9yo+OL5Avffznal9Ev9hzptYclUw9Rx1fSs09g6w8qRiaRLHhYw3mgAer6vG9NZxaj2i2TAVUjdPKZDOSgtFzvPZiwWfhdqi6CmzMRi3MO5G3LzIRzB+qjMitgJ4p/R0mLTik40KTBQNxgN5EmegoANjyBDbdvBKf4vQvMwYGvByq4rMDyDCC8JgcR+B1kR8xNKwDYZFokuSrmAw6PXUmjyW6FaWaUFvrqaj64l1yHqLPcZDTbcSzum6+RaR5Gg2upfs3ahICOzzlnwnwtZxznWKU+rNd4T29cHOw==
+nc1,10.10.0.3 ssh-rsa AAAAB3NzaC1yc2EAAAABIwAAAQEA9gfVcSU968DdflnfJkup3em0MFX13uUTxsiaPisyahjcc++d+9yo+OL5Avffznal9Ev9hzptYclUw9Rx1fSs09g6w8qRiaRLHhYw3mgAer6vG9NZxaj2i2TAVUjdPKZDOSgtFzvPZiwWfhdqi6CmzMRi3MO5G3LzIRzB+qjMitgJ4p/R0mLTik40KTBQNxgN5EmegoANjyBDbdvBKf4vQvMwYGvByq4rMDyDCC8JgcR+B1kR8xNKwDYZFokuSrmAw6PXUmjyW6FaWaUFvrqaj64l1yHqLPcZDTbcSzum6+RaR5Gg2upfs3ahICOzzlnwnwtZxznWKU+rNd4T29cHOw==
+nc2,10.10.0.4 ssh-rsa AAAAB3NzaC1yc2EAAAABIwAAAQEA9gfVcSU968DdflnfJkup3em0MFX13uUTxsiaPisyahjcc++d+9yo+OL5Avffznal9Ev9hzptYclUw9Rx1fSs09g6w8qRiaRLHhYw3mgAer6vG9NZxaj2i2TAVUjdPKZDOSgtFzvPZiwWfhdqi6CmzMRi3MO5G3LzIRzB+qjMitgJ4p/R0mLTik40KTBQNxgN5EmegoANjyBDbdvBKf4vQvMwYGvByq4rMDyDCC8JgcR+B1kR8xNKwDYZFokuSrmAw6PXUmjyW6FaWaUFvrqaj64l1yHqLPcZDTbcSzum6+RaR5Gg2upfs3ahICOzzlnwnwtZxznWKU+rNd4T29cHOw==10.10.0.2 ssh-rsa AAAAB3NzaC1yc2EAAAABIwAAAQEA9gfVcSU968DdflnfJkup3em0MFX13uUTxsiaPisyahjcc++d+9yo+OL5Avffznal9Ev9hzptYclUw9Rx1fSs09g6w8qRiaRLHhYw3mgAer6vG9NZxaj2i2TAVUjdPKZDOSgtFzvPZiwWfhdqi6CmzMRi3MO5G3LzIRzB+qjMitgJ4p/R0mLTik40KTBQNxgN5EmegoANjyBDbdvBKf4vQvMwYGvByq4rMDyDCC8JgcR+B1kR8xNKwDYZFokuSrmAw6PXUmjyW6FaWaUFvrqaj64l1yHqLPcZDTbcSzum6+RaR5Gg2upfs3ahICOzzlnwnwtZxznWKU+rNd4T29cHOw==
+10.10.0.4 ssh-rsa AAAAB3NzaC1yc2EAAAABIwAAAQEA9gfVcSU968DdflnfJkup3em0MFX13uUTxsiaPisyahjcc++d+9yo+OL5Avffznal9Ev9hzptYclUw9Rx1fSs09g6w8qRiaRLHhYw3mgAer6vG9NZxaj2i2TAVUjdPKZDOSgtFzvPZiwWfhdqi6CmzMRi3MO5G3LzIRzB+qjMitgJ4p/R0mLTik40KTBQNxgN5EmegoANjyBDbdvBKf4vQvMwYGvByq4rMDyDCC8JgcR+B1kR8xNKwDYZFokuSrmAw6PXUmjyW6FaWaUFvrqaj64l1yHqLPcZDTbcSzum6+RaR5Gg2upfs3ahICOzzlnwnwtZxznWKU+rNd4T29cHOw==
+10.10.0.2 ssh-rsa AAAAB3NzaC1yc2EAAAABIwAAAQEA9gfVcSU968DdflnfJkup3em0MFX13uUTxsiaPisyahjcc++d+9yo+OL5Avffznal9Ev9hzptYclUw9Rx1fSs09g6w8qRiaRLHhYw3mgAer6vG9NZxaj2i2TAVUjdPKZDOSgtFzvPZiwWfhdqi6CmzMRi3MO5G3LzIRzB+qjMitgJ4p/R0mLTik40KTBQNxgN5EmegoANjyBDbdvBKf4vQvMwYGvByq4rMDyDCC8JgcR+B1kR8xNKwDYZFokuSrmAw6PXUmjyW6FaWaUFvrqaj64l1yHqLPcZDTbcSzum6+RaR5Gg2upfs3ahICOzzlnwnwtZxznWKU+rNd4T29cHOw==
\ No newline at end of file
diff --git a/asterix-installer/src/test/resources/integrationts/replication/queries/failover/bulkload/bulkload.1.ddl.aql b/asterix-installer/src/test/resources/integrationts/replication/queries/failover/bulkload/bulkload.1.ddl.aql
new file mode 100644
index 0000000..6a2441e
--- /dev/null
+++ b/asterix-installer/src/test/resources/integrationts/replication/queries/failover/bulkload/bulkload.1.ddl.aql
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Test case Name : bulkload.aql
+ * Description : Check that Bulkload LSM component are replicated correclty.
+ The test goes as follows:
+ start 2 nodes, bulkload a dataset, query data, kill one node
+ and wait until the failover complete, query the data again.
+ * Expected Result : Success
+ * Date : January 6 2016
+ */
+
+drop dataverse TinySocial if exists;
+create dataverse TinySocial;
+use dataverse TinySocial;
+
+create type EmploymentType as open {
+ organization-name: string,
+ start-date: date,
+ end-date: date?
+}
+
+create type FacebookUserType as closed {
+ id: int,
+ alias: string,
+ name: string,
+ user-since: datetime,
+ friend-ids: {{ int32 }},
+ employment: [EmploymentType]
+}
+
+/********* 2. Create Datasets ***********/
+use dataverse TinySocial;
+
+drop dataset FacebookUsers if exists;
+
+create dataset FacebookUsers(FacebookUserType)
+primary key id;
\ No newline at end of file
diff --git a/asterix-installer/src/test/resources/integrationts/replication/queries/failover/bulkload/bulkload.2.update.aql b/asterix-installer/src/test/resources/integrationts/replication/queries/failover/bulkload/bulkload.2.update.aql
new file mode 100644
index 0000000..ae14ad0
--- /dev/null
+++ b/asterix-installer/src/test/resources/integrationts/replication/queries/failover/bulkload/bulkload.2.update.aql
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Test case Name : bulkload.aql
+ * Description : Check that Bulkload LSM component are replicated correclty.
+ The test goes as follows:
+ start 2 nodes, bulkload a dataset, query data, kill one node
+ and wait until the failover complete, query the data again.
+ * Expected Result : Success
+ * Date : January 6 2016
+ */
+
+use dataverse TinySocial;
+
+load dataset FacebookUsers using localfs
+(("path"="vagrant-ssh_nc1:///vagrant/data/fbu.adm"),("format"="adm"));
\ No newline at end of file
diff --git a/asterix-installer/src/test/resources/integrationts/replication/queries/failover/bulkload/bulkload.3.txnqbc.aql b/asterix-installer/src/test/resources/integrationts/replication/queries/failover/bulkload/bulkload.3.txnqbc.aql
new file mode 100644
index 0000000..9c8cb96
--- /dev/null
+++ b/asterix-installer/src/test/resources/integrationts/replication/queries/failover/bulkload/bulkload.3.txnqbc.aql
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Test case Name : bulkload.aql
+ * Description : Check that Bulkload LSM component are replicated correclty.
+ The test goes as follows:
+ start 2 nodes, bulkload a dataset, query data, kill one node
+ and wait until the failover complete, query the data again.
+ * Expected Result : Success
+ * Date : January 6 2016
+ */
+
+use dataverse TinySocial;
+
+count (for $x in dataset FacebookUsers return $x);
\ No newline at end of file
diff --git a/asterix-installer/src/test/resources/integrationts/replication/queries/failover/bulkload/bulkload.4.vagrant_script.aql b/asterix-installer/src/test/resources/integrationts/replication/queries/failover/bulkload/bulkload.4.vagrant_script.aql
new file mode 100644
index 0000000..5695ed7
--- /dev/null
+++ b/asterix-installer/src/test/resources/integrationts/replication/queries/failover/bulkload/bulkload.4.vagrant_script.aql
@@ -0,0 +1 @@
+nc2 kill_cc_and_nc.sh
\ No newline at end of file
diff --git a/asterix-installer/src/test/resources/integrationts/replication/queries/failover/bulkload/bulkload.5.sleep.aql b/asterix-installer/src/test/resources/integrationts/replication/queries/failover/bulkload/bulkload.5.sleep.aql
new file mode 100644
index 0000000..51b0e76
--- /dev/null
+++ b/asterix-installer/src/test/resources/integrationts/replication/queries/failover/bulkload/bulkload.5.sleep.aql
@@ -0,0 +1 @@
+60000
\ No newline at end of file
diff --git a/asterix-installer/src/test/resources/integrationts/replication/queries/failover/bulkload/bulkload.6.txnqar.aql b/asterix-installer/src/test/resources/integrationts/replication/queries/failover/bulkload/bulkload.6.txnqar.aql
new file mode 100644
index 0000000..9c8cb96
--- /dev/null
+++ b/asterix-installer/src/test/resources/integrationts/replication/queries/failover/bulkload/bulkload.6.txnqar.aql
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Test case Name : bulkload.aql
+ * Description : Check that Bulkload LSM component are replicated correclty.
+ The test goes as follows:
+ start 2 nodes, bulkload a dataset, query data, kill one node
+ and wait until the failover complete, query the data again.
+ * Expected Result : Success
+ * Date : January 6 2016
+ */
+
+use dataverse TinySocial;
+
+count (for $x in dataset FacebookUsers return $x);
\ No newline at end of file
diff --git a/asterix-installer/src/test/resources/integrationts/replication/queries/failover/mem_component_recovery/mem_component_recovery.1.ddl.aql b/asterix-installer/src/test/resources/integrationts/replication/queries/failover/mem_component_recovery/mem_component_recovery.1.ddl.aql
new file mode 100644
index 0000000..8de2067
--- /dev/null
+++ b/asterix-installer/src/test/resources/integrationts/replication/queries/failover/mem_component_recovery/mem_component_recovery.1.ddl.aql
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Test case Name : mem_component_recovery.aql
+ * Description : Check that Memory LSM component are replicated and recovered correclty.
+ The test goes as follows:
+ start 2 nodes, bulkload a dataset, copy it to in-memory dataset, query
+ data from memory, kill one node and wait until the failover complete,
+ query the data again.
+ * Expected Result : Success
+ * Date : January 6 2016
+ */
+
+drop dataverse TinySocial if exists;
+create dataverse TinySocial;
+use dataverse TinySocial;
+
+create type EmploymentType as open {
+ organization-name: string,
+ start-date: date,
+ end-date: date?
+}
+
+create type FacebookUserType as closed {
+ id: int,
+ alias: string,
+ name: string,
+ user-since: datetime,
+ friend-ids: {{ int32 }},
+ employment: [EmploymentType]
+}
+
+/********* 2. Create Datasets ***********/
+use dataverse TinySocial;
+
+drop dataset FacebookUsers if exists;
+
+create dataset FacebookUsers(FacebookUserType)
+primary key id;
+
+create dataset FacebookUsersInMemory(FacebookUserType)
+primary key id;
\ No newline at end of file
diff --git a/asterix-installer/src/test/resources/integrationts/replication/queries/failover/mem_component_recovery/mem_component_recovery.2.update.aql b/asterix-installer/src/test/resources/integrationts/replication/queries/failover/mem_component_recovery/mem_component_recovery.2.update.aql
new file mode 100644
index 0000000..8087689
--- /dev/null
+++ b/asterix-installer/src/test/resources/integrationts/replication/queries/failover/mem_component_recovery/mem_component_recovery.2.update.aql
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Test case Name : mem_component_recovery.aql
+ * Description : Check that Memory LSM component are replicated and recovered correclty.
+ The test goes as follows:
+ start 2 nodes, bulkload a dataset, copy it to in-memory dataset, query
+ data from memory, kill one node and wait until the failover complete,
+ query the data again.
+ * Expected Result : Success
+ * Date : January 6 2016
+ */
+use dataverse TinySocial;
+
+load dataset FacebookUsers using localfs
+(("path"="vagrant-ssh_nc1:///vagrant/data/fbu.adm"),("format"="adm"));
+
+insert into dataset TinySocial.FacebookUsersInMemory(for $x in dataset TinySocial.FacebookUsers return $x);
\ No newline at end of file
diff --git a/asterix-installer/src/test/resources/integrationts/replication/queries/failover/mem_component_recovery/mem_component_recovery.3.txnqbc.aql b/asterix-installer/src/test/resources/integrationts/replication/queries/failover/mem_component_recovery/mem_component_recovery.3.txnqbc.aql
new file mode 100644
index 0000000..e25e409
--- /dev/null
+++ b/asterix-installer/src/test/resources/integrationts/replication/queries/failover/mem_component_recovery/mem_component_recovery.3.txnqbc.aql
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Test case Name : mem_component_recovery.aql
+ * Description : Check that Memory LSM component are replicated and recovered correclty.
+ The test goes as follows:
+ start 2 nodes, bulkload a dataset, copy it to in-memory dataset, query
+ data from memory, kill one node and wait until the failover complete,
+ query the data again.
+ * Expected Result : Success
+ * Date : January 6 2016
+ */
+
+use dataverse TinySocial;
+
+count (for $x in dataset FacebookUsersInMemory return $x);
\ No newline at end of file
diff --git a/asterix-installer/src/test/resources/integrationts/replication/queries/failover/mem_component_recovery/mem_component_recovery.4.vagrant_script.aql b/asterix-installer/src/test/resources/integrationts/replication/queries/failover/mem_component_recovery/mem_component_recovery.4.vagrant_script.aql
new file mode 100644
index 0000000..5695ed7
--- /dev/null
+++ b/asterix-installer/src/test/resources/integrationts/replication/queries/failover/mem_component_recovery/mem_component_recovery.4.vagrant_script.aql
@@ -0,0 +1 @@
+nc2 kill_cc_and_nc.sh
\ No newline at end of file
diff --git a/asterix-installer/src/test/resources/integrationts/replication/queries/failover/mem_component_recovery/mem_component_recovery.5.sleep.aql b/asterix-installer/src/test/resources/integrationts/replication/queries/failover/mem_component_recovery/mem_component_recovery.5.sleep.aql
new file mode 100644
index 0000000..51b0e76
--- /dev/null
+++ b/asterix-installer/src/test/resources/integrationts/replication/queries/failover/mem_component_recovery/mem_component_recovery.5.sleep.aql
@@ -0,0 +1 @@
+60000
\ No newline at end of file
diff --git a/asterix-installer/src/test/resources/integrationts/replication/queries/failover/mem_component_recovery/mem_component_recovery.6.txnqar.aql b/asterix-installer/src/test/resources/integrationts/replication/queries/failover/mem_component_recovery/mem_component_recovery.6.txnqar.aql
new file mode 100644
index 0000000..e25e409
--- /dev/null
+++ b/asterix-installer/src/test/resources/integrationts/replication/queries/failover/mem_component_recovery/mem_component_recovery.6.txnqar.aql
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Test case Name : mem_component_recovery.aql
+ * Description : Check that Memory LSM component are replicated and recovered correclty.
+ The test goes as follows:
+ start 2 nodes, bulkload a dataset, copy it to in-memory dataset, query
+ data from memory, kill one node and wait until the failover complete,
+ query the data again.
+ * Expected Result : Success
+ * Date : January 6 2016
+ */
+
+use dataverse TinySocial;
+
+count (for $x in dataset FacebookUsersInMemory return $x);
\ No newline at end of file
diff --git a/asterix-installer/src/test/resources/integrationts/replication/queries/failover/metadata_node/metadata_node.1.ddl.aql b/asterix-installer/src/test/resources/integrationts/replication/queries/failover/metadata_node/metadata_node.1.ddl.aql
new file mode 100644
index 0000000..113d144
--- /dev/null
+++ b/asterix-installer/src/test/resources/integrationts/replication/queries/failover/metadata_node/metadata_node.1.ddl.aql
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Test case Name : metadata_node_recovery.aql
+ * Description : Check that metadata node failover is done correctly.
+ The test goes as follows:
+ start 2 nodes, create a dataset, kill metadata node
+ and wait until the failover complete, verify the
+ dataset still exists.
+ * Expected Result : Success
+ * Date : January 6 2016
+ */
+
+drop dataverse TinySocial if exists;
+create dataverse TinySocial;
+use dataverse TinySocial;
+
+create type EmploymentType as open {
+ organization-name: string,
+ start-date: date,
+ end-date: date?
+}
+
+create type FacebookUserType as closed {
+ id: int,
+ alias: string,
+ name: string,
+ user-since: datetime,
+ friend-ids: {{ int32 }},
+ employment: [EmploymentType]
+}
+
+/********* 2. Create Datasets ***********/
+use dataverse TinySocial;
+
+drop dataset FacebookUsers if exists;
+
+create dataset FacebookUsers(FacebookUserType)
+primary key id;
\ No newline at end of file
diff --git a/asterix-installer/src/test/resources/integrationts/replication/queries/failover/metadata_node/metadata_node.2.txnqbc.aql b/asterix-installer/src/test/resources/integrationts/replication/queries/failover/metadata_node/metadata_node.2.txnqbc.aql
new file mode 100644
index 0000000..76bdcfe
--- /dev/null
+++ b/asterix-installer/src/test/resources/integrationts/replication/queries/failover/metadata_node/metadata_node.2.txnqbc.aql
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Test case Name : metadata_node_recovery.aql
+ * Description : Check that metadata node failover is done correctly.
+ The test goes as follows:
+ start 2 nodes, create a dataset, kill metadata node
+ and wait until the failover complete, verify the
+ dataset still exists.
+ * Expected Result : Success
+ * Date : January 6 2016
+ */
+
+for $x in dataset Metadata.Dataset where $x.DatasetName ='FacebookUsers' return $x.DatasetName;
\ No newline at end of file
diff --git a/asterix-installer/src/test/resources/integrationts/replication/queries/failover/metadata_node/metadata_node.3.vagrant_script.aql b/asterix-installer/src/test/resources/integrationts/replication/queries/failover/metadata_node/metadata_node.3.vagrant_script.aql
new file mode 100644
index 0000000..5eec164
--- /dev/null
+++ b/asterix-installer/src/test/resources/integrationts/replication/queries/failover/metadata_node/metadata_node.3.vagrant_script.aql
@@ -0,0 +1 @@
+nc1 kill_cc_and_nc.sh
\ No newline at end of file
diff --git a/asterix-installer/src/test/resources/integrationts/replication/queries/failover/metadata_node/metadata_node.4.sleep.aql b/asterix-installer/src/test/resources/integrationts/replication/queries/failover/metadata_node/metadata_node.4.sleep.aql
new file mode 100644
index 0000000..51b0e76
--- /dev/null
+++ b/asterix-installer/src/test/resources/integrationts/replication/queries/failover/metadata_node/metadata_node.4.sleep.aql
@@ -0,0 +1 @@
+60000
\ No newline at end of file
diff --git a/asterix-installer/src/test/resources/integrationts/replication/queries/failover/metadata_node/metadata_node.5.txnqar.aql b/asterix-installer/src/test/resources/integrationts/replication/queries/failover/metadata_node/metadata_node.5.txnqar.aql
new file mode 100644
index 0000000..ac1c593
--- /dev/null
+++ b/asterix-installer/src/test/resources/integrationts/replication/queries/failover/metadata_node/metadata_node.5.txnqar.aql
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Test case Name : metadata_node_recovery.aql
+ * Description : Check that metadata node failover is done correctly.
+ The test goes as follows:
+ start 2 nodes, create a dataset, kill metadata node
+ and wait until the failover complete, verify the
+ dataset still exists.
+ * Expected Result : Success
+ * Date : January 6 2016
+ */
+
+use dataverse TinySocial;
+
+for $x in dataset Metadata.Dataset where $x.DatasetName ='FacebookUsers' return $x.DatasetName;
\ No newline at end of file
diff --git a/asterix-installer/src/test/resources/integrationts/replication/testsuite.xml b/asterix-installer/src/test/resources/integrationts/replication/testsuite.xml
new file mode 100644
index 0000000..f033086
--- /dev/null
+++ b/asterix-installer/src/test/resources/integrationts/replication/testsuite.xml
@@ -0,0 +1,37 @@
+<!--
+ ! Licensed to the Apache Software Foundation (ASF) under one
+ ! or more contributor license agreements. See the NOTICE file
+ ! distributed with this work for additional information
+ ! regarding copyright ownership. The ASF licenses this file
+ ! to you under the Apache License, Version 2.0 (the
+ ! "License"); you may not use this file except in compliance
+ ! with the License. You may obtain a copy of the License at
+ !
+ ! http://www.apache.org/licenses/LICENSE-2.0
+ !
+ ! Unless required by applicable law or agreed to in writing,
+ ! software distributed under the License is distributed on an
+ ! "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ ! KIND, either express or implied. See the License for the
+ ! specific language governing permissions and limitations
+ ! under the License.
+ !-->
+<test-suite xmlns="urn:xml.testframework.asterix.apache.org" ResultOffsetPath="results" QueryOffsetPath="queries" QueryFileExtension=".aql">
+ <test-group name="failover">
+ <test-case FilePath="failover">
+ <compilation-unit name="bulkload">
+ <output-dir compare="Text">bulkload</output-dir>
+ </compilation-unit>
+ </test-case>
+ <test-case FilePath="failover">
+ <compilation-unit name="mem_component_recovery">
+ <output-dir compare="Text">mem_component_recovery</output-dir>
+ </compilation-unit>
+ </test-case>
+ <test-case FilePath="failover">
+ <compilation-unit name="metadata_node">
+ <output-dir compare="Text">metadata_node</output-dir>
+ </compilation-unit>
+ </test-case>
+ </test-group>
+</test-suite>
\ No newline at end of file
diff --git a/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java b/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java
index aa7f7d5..823e861 100644
--- a/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java
+++ b/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java
@@ -383,6 +383,12 @@
dataLifecycleManager.register(absolutePath, lsmBtree);
} else {
final LocalResource resource = localResourceRepository.getResourceByPath(absolutePath);
+ if (resource == null) {
+ throw new Exception("Could not find required metadata indexes. Please delete "
+ + propertiesProvider.getMetadataProperties().getTransactionLogDirs()
+ .get(runtimeContext.getTransactionSubsystem().getId())
+ + " to intialize as a new instance. (WARNING: all data will be lost.)");
+ }
resourceID = resource.getResourceId();
lsmBtree = (LSMBTree) dataLifecycleManager.getIndex(absolutePath);
if (lsmBtree == null) {
diff --git a/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SplitsAndConstraintsUtil.java b/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SplitsAndConstraintsUtil.java
index b6f3c9e..601ce15 100644
--- a/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SplitsAndConstraintsUtil.java
+++ b/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SplitsAndConstraintsUtil.java
@@ -142,4 +142,10 @@
FileSplit[] splits = splitsForFilesIndex(mdTxnCtx, dataverseName, datasetName, targetIdxName, create);
return StoragePathUtil.splitProviderAndPartitionConstraints(splits);
}
+
+ public static String getIndexPath(String partitionPath, int partition, String dataverse, String fullIndexName) {
+ String storageDirName = AsterixClusterProperties.INSTANCE.getStorageDirectoryName();
+ return partitionPath + StoragePathUtil.prepareStoragePartitionPath(storageDirName, partition) + File.separator
+ + StoragePathUtil.prepareDataverseIndexName(dataverse, fullIndexName);
+ }
}
diff --git a/asterix-om/src/main/java/org/apache/asterix/om/util/AsterixAppContextInfo.java b/asterix-om/src/main/java/org/apache/asterix/om/util/AsterixAppContextInfo.java
index 866162b..082618b 100644
--- a/asterix-om/src/main/java/org/apache/asterix/om/util/AsterixAppContextInfo.java
+++ b/asterix-om/src/main/java/org/apache/asterix/om/util/AsterixAppContextInfo.java
@@ -20,10 +20,11 @@
import java.util.logging.Logger;
+import org.apache.asterix.common.cluster.IGlobalRecoveryMaanger;
+import org.apache.asterix.common.config.AsterixBuildProperties;
import org.apache.asterix.common.config.AsterixCompilerProperties;
import org.apache.asterix.common.config.AsterixExternalProperties;
import org.apache.asterix.common.config.AsterixFeedProperties;
-import org.apache.asterix.common.config.AsterixBuildProperties;
import org.apache.asterix.common.config.AsterixMetadataProperties;
import org.apache.asterix.common.config.AsterixPropertiesAccessor;
import org.apache.asterix.common.config.AsterixReplicationProperties;
@@ -56,12 +57,13 @@
private AsterixFeedProperties feedProperties;
private AsterixBuildProperties buildProperties;
private AsterixReplicationProperties replicationProperties;
-
+ private final IGlobalRecoveryMaanger globalRecoveryMaanger;
private IHyracksClientConnection hcc;
- public static void initialize(ICCApplicationContext ccAppCtx, IHyracksClientConnection hcc) throws AsterixException {
+ public static void initialize(ICCApplicationContext ccAppCtx, IHyracksClientConnection hcc,
+ IGlobalRecoveryMaanger globalRecoveryMaanger) throws AsterixException {
if (INSTANCE == null) {
- INSTANCE = new AsterixAppContextInfo(ccAppCtx, hcc);
+ INSTANCE = new AsterixAppContextInfo(ccAppCtx, hcc, globalRecoveryMaanger);
}
AsterixPropertiesAccessor propertiesAccessor = new AsterixPropertiesAccessor();
INSTANCE.compilerProperties = new AsterixCompilerProperties(propertiesAccessor);
@@ -77,9 +79,11 @@
Logger.getLogger("org.apache").setLevel(INSTANCE.externalProperties.getLogLevel());
}
- private AsterixAppContextInfo(ICCApplicationContext ccAppCtx, IHyracksClientConnection hcc) {
+ private AsterixAppContextInfo(ICCApplicationContext ccAppCtx, IHyracksClientConnection hcc,
+ IGlobalRecoveryMaanger globalRecoveryMaanger) {
this.appCtx = ccAppCtx;
this.hcc = hcc;
+ this.globalRecoveryMaanger = globalRecoveryMaanger;
}
public static AsterixAppContextInfo getInstance() {
@@ -144,4 +148,9 @@
public AsterixReplicationProperties getReplicationProperties() {
return replicationProperties;
}
+
+ @Override
+ public IGlobalRecoveryMaanger getGlobalRecoveryManager() {
+ return globalRecoveryMaanger;
+ }
}
diff --git a/asterix-om/src/main/java/org/apache/asterix/om/util/AsterixClusterProperties.java b/asterix-om/src/main/java/org/apache/asterix/om/util/AsterixClusterProperties.java
index 80008c5..c2c28df 100644
--- a/asterix-om/src/main/java/org/apache/asterix/om/util/AsterixClusterProperties.java
+++ b/asterix-om/src/main/java/org/apache/asterix/om/util/AsterixClusterProperties.java
@@ -35,6 +35,12 @@
import org.apache.asterix.common.api.IClusterManagementWork.ClusterState;
import org.apache.asterix.common.cluster.ClusterPartition;
+import org.apache.asterix.common.config.AsterixReplicationProperties;
+import org.apache.asterix.common.messaging.TakeoverMetadataNodeRequestMessage;
+import org.apache.asterix.common.messaging.TakeoverMetadataNodeResponseMessage;
+import org.apache.asterix.common.messaging.TakeoverPartitionsRequestMessage;
+import org.apache.asterix.common.messaging.TakeoverPartitionsResponseMessage;
+import org.apache.asterix.common.messaging.api.ICCMessageBroker;
import org.apache.asterix.event.schema.cluster.Cluster;
import org.apache.asterix.event.schema.cluster.Node;
import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
@@ -45,6 +51,11 @@
*/
public class AsterixClusterProperties {
+ /**
+ * TODO: currently after instance restarts we require all nodes to join again, otherwise the cluster wont be ACTIVE.
+ * we may overcome this by storing the cluster state before the instance shutdown and using it on startup to identify
+ * the nodes that are expected the join.
+ */
private static final Logger LOGGER = Logger.getLogger(AsterixClusterProperties.class.getName());
@@ -63,6 +74,12 @@
private Map<String, ClusterPartition[]> node2PartitionsMap = null;
private SortedMap<Integer, ClusterPartition> clusterPartitions = null;
+ private Map<Long, TakeoverPartitionsRequestMessage> pendingTakeoverRequests = null;
+
+ private long takeoverRequestId = 0;
+ private String currentMetadataNode = null;
+ private boolean isMetadataNodeActive = false;
+ private boolean autoFailover = false;
private AsterixClusterProperties() {
InputStream is = this.getClass().getClassLoader().getResourceAsStream(CLUSTER_CONFIGURATION_FILE);
@@ -82,6 +99,13 @@
if (AsterixAppContextInfo.getInstance().getCCApplicationContext() != null) {
node2PartitionsMap = AsterixAppContextInfo.getInstance().getMetadataProperties().getNodePartitions();
clusterPartitions = AsterixAppContextInfo.getInstance().getMetadataProperties().getClusterPartitions();
+ currentMetadataNode = AsterixAppContextInfo.getInstance().getMetadataProperties().getMetadataNodeName();
+ if (isAutoFailoverEnabled()) {
+ autoFailover = cluster.getDataReplication().isAutoFailover();
+ }
+ if (autoFailover) {
+ pendingTakeoverRequests = new HashMap<>();
+ }
}
}
}
@@ -91,18 +115,24 @@
public synchronized void removeNCConfiguration(String nodeId) {
updateNodePartitions(nodeId, false);
ncConfiguration.remove(nodeId);
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info(" Removing configuration parameters for node id " + nodeId);
+ if (nodeId.equals(currentMetadataNode)) {
+ isMetadataNodeActive = false;
+ LOGGER.info("Metadata node is now inactive");
}
- // TODO implement fault tolerance as follows:
- // 1. collect the partitions of the failed NC
- // 2. For each partition, request a remote replica to take over.
- // 3. wait until each remote replica completes the recovery for the lost partitions
- // 4. update the cluster state
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Removing configuration parameters for node id " + nodeId);
+ }
+ if (autoFailover) {
+ requestPartitionsTakeover(nodeId);
+ }
}
public synchronized void addNCConfiguration(String nodeId, Map<String, String> configuration) {
ncConfiguration.put(nodeId, configuration);
+ if (nodeId.equals(currentMetadataNode)) {
+ isMetadataNodeActive = true;
+ LOGGER.info("Metadata node is now active");
+ }
updateNodePartitions(nodeId, true);
if (LOGGER.isLoggable(Level.INFO)) {
LOGGER.info(" Registering configuration parameters for node id " + nodeId);
@@ -118,8 +148,6 @@
p.setActive(added);
if (added) {
p.setActiveNodeId(nodeId);
- } else {
- p.setActiveNodeId(null);
}
}
resetClusterPartitionConstraint();
@@ -135,13 +163,20 @@
return;
}
}
- // if all storage partitions are active, then the cluster is active
- state = ClusterState.ACTIVE;
- LOGGER.info("Cluster is now ACTIVE");
+ //if all storage partitions are active as well as the metadata node, then the cluster is active
+ if (isMetadataNodeActive) {
+ state = ClusterState.ACTIVE;
+ LOGGER.info("Cluster is now ACTIVE");
+ //start global recovery
+ AsterixAppContextInfo.getInstance().getGlobalRecoveryManager().startGlobalRecovery();
+ } else {
+ requestMetadataNodeTakeover();
+ }
}
/**
* Returns the number of IO devices configured for a Node Controller
+ *
* @param nodeId
* unique identifier of the Node Controller
* @return number of IO devices. -1 if the node id is not valid. A node id
@@ -155,6 +190,7 @@
/**
* Returns the IO devices configured for a Node Controller
+ *
* @param nodeId
* unique identifier of the Node Controller
* @return a list of IO devices. null if node id is not valid. A node id is not valid
@@ -257,4 +293,133 @@
// virtual cluster without cluster config file
return DEFAULT_STORAGE_DIR_NAME;
}
-}
+
+ private synchronized void requestPartitionsTakeover(String failedNodeId) {
+ //replica -> list of partitions to takeover
+ Map<String, List<Integer>> partitionRecoveryPlan = new HashMap<>();
+ AsterixReplicationProperties replicationProperties = AsterixAppContextInfo.getInstance()
+ .getReplicationProperties();
+
+ //collect the partitions of the failed NC
+ List<ClusterPartition> lostPartitions = getNodeAssignedPartitions(failedNodeId);
+ for (ClusterPartition partition : lostPartitions) {
+ //find replicas for this partitions
+ Set<String> partitionReplicas = replicationProperties.getNodeReplicasIds(partition.getNodeId());
+ //find a replica that is still active
+ for (String replica : partitionReplicas) {
+ //TODO (mhubail) currently this assigns the partition to the first found active replica.
+ //It needs to be modified to consider load balancing.
+ if (ncConfiguration.containsKey(replica)) {
+ if (!partitionRecoveryPlan.containsKey(replica)) {
+ List<Integer> replicaPartitions = new ArrayList<>();
+ replicaPartitions.add(partition.getPartitionId());
+ partitionRecoveryPlan.put(replica, replicaPartitions);
+ } else {
+ partitionRecoveryPlan.get(replica).add(partition.getPartitionId());
+ }
+ }
+ }
+ }
+
+ ICCMessageBroker messageBroker = (ICCMessageBroker) AsterixAppContextInfo.getInstance()
+ .getCCApplicationContext().getMessageBroker();
+ //For each replica, send a request to takeover the assigned partitions
+ for (String replica : partitionRecoveryPlan.keySet()) {
+ Integer[] partitionsToTakeover = partitionRecoveryPlan.get(replica).toArray(new Integer[] {});
+ long requestId = takeoverRequestId++;
+ TakeoverPartitionsRequestMessage takeoverRequest = new TakeoverPartitionsRequestMessage(requestId, replica,
+ failedNodeId, partitionsToTakeover);
+ pendingTakeoverRequests.put(requestId, takeoverRequest);
+ try {
+ messageBroker.sendApplicationMessageToNC(takeoverRequest, replica);
+ } catch (Exception e) {
+ /**
+ * if we fail to send the request, it means the NC we tried to send the request to
+ * has failed. When the failure notification arrives, we will send any pending request
+ * that belongs to the failed NC to a different active replica.
+ */
+ LOGGER.warning("Failed to send takeover request: " + takeoverRequest);
+ e.printStackTrace();
+ }
+ }
+ }
+
+ private synchronized List<ClusterPartition> getNodeAssignedPartitions(String nodeId) {
+ List<ClusterPartition> nodePartitions = new ArrayList<>();
+ for (ClusterPartition partition : clusterPartitions.values()) {
+ if (partition.getActiveNodeId().equals(nodeId)) {
+ nodePartitions.add(partition);
+ }
+ }
+ /**
+ * if there is any pending takeover request that this node was supposed to handle,
+ * it needs to be sent to a different replica
+ */
+ List<Long> failedTakeoverRequests = new ArrayList<>();
+ for (TakeoverPartitionsRequestMessage request : pendingTakeoverRequests.values()) {
+ if (request.getNodeId().equals(nodeId)) {
+ for (Integer partitionId : request.getPartitions()) {
+ nodePartitions.add(clusterPartitions.get(partitionId));
+ }
+ failedTakeoverRequests.add(request.getId());
+ }
+ }
+
+ //remove failed requests
+ for (Long requestId : failedTakeoverRequests) {
+ pendingTakeoverRequests.remove(requestId);
+ }
+
+ return nodePartitions;
+ }
+
+ private synchronized void requestMetadataNodeTakeover() {
+ //need a new node to takeover metadata node
+ ClusterPartition metadataPartiton = AsterixAppContextInfo.getInstance().getMetadataProperties()
+ .getMetadataPartition();
+ //request the metadataPartition node to register itself as the metadata node
+ TakeoverMetadataNodeRequestMessage takeoverRequest = new TakeoverMetadataNodeRequestMessage();
+ ICCMessageBroker messageBroker = (ICCMessageBroker) AsterixAppContextInfo.getInstance()
+ .getCCApplicationContext().getMessageBroker();
+ try {
+ messageBroker.sendApplicationMessageToNC(takeoverRequest, metadataPartiton.getActiveNodeId());
+ } catch (Exception e) {
+ /**
+ * if we fail to send the request, it means the NC we tried to send the request to
+ * has failed. When the failure notification arrives, a new NC will be assigned to
+ * the metadata partition and a new metadata node takeover request will be sent to it.
+ */
+ LOGGER.warning("Failed to send metadata node takeover request to: " + metadataPartiton.getActiveNodeId());
+ e.printStackTrace();
+ }
+ }
+
+ public synchronized void processPartitionTakeoverResponse(TakeoverPartitionsResponseMessage reponse) {
+ for (Integer partitonId : reponse.getPartitions()) {
+ ClusterPartition partition = clusterPartitions.get(partitonId);
+ partition.setActive(true);
+ partition.setActiveNodeId(reponse.getNodeId());
+ }
+ pendingTakeoverRequests.remove(reponse.getRequestId());
+ resetClusterPartitionConstraint();
+ updateClusterState();
+ }
+
+ public synchronized void processMetadataNodeTakeoverResponse(TakeoverMetadataNodeResponseMessage reponse) {
+ currentMetadataNode = reponse.getNodeId();
+ isMetadataNodeActive = true;
+ LOGGER.info("Current metadata node: " + currentMetadataNode);
+ updateClusterState();
+ }
+
+ public synchronized String getCurrentMetadataNode() {
+ return currentMetadataNode;
+ }
+
+ public boolean isAutoFailoverEnabled() {
+ if (cluster != null && cluster.getDataReplication() != null && cluster.getDataReplication().isEnabled()) {
+ return cluster.getDataReplication().isAutoFailover();
+ }
+ return false;
+ }
+}
\ No newline at end of file
diff --git a/asterix-replication/src/main/java/org/apache/asterix/replication/functions/AsterixReplicationProtocol.java b/asterix-replication/src/main/java/org/apache/asterix/replication/functions/ReplicationProtocol.java
similarity index 95%
rename from asterix-replication/src/main/java/org/apache/asterix/replication/functions/AsterixReplicationProtocol.java
rename to asterix-replication/src/main/java/org/apache/asterix/replication/functions/ReplicationProtocol.java
index 8e020eb..be8f8e3 100644
--- a/asterix-replication/src/main/java/org/apache/asterix/replication/functions/AsterixReplicationProtocol.java
+++ b/asterix-replication/src/main/java/org/apache/asterix/replication/functions/ReplicationProtocol.java
@@ -30,10 +30,10 @@
import org.apache.asterix.common.replication.ReplicaEvent;
import org.apache.asterix.common.transactions.ILogRecord;
import org.apache.asterix.replication.management.NetworkingUtil;
-import org.apache.asterix.replication.storage.AsterixLSMIndexFileProperties;
import org.apache.asterix.replication.storage.LSMComponentProperties;
+import org.apache.asterix.replication.storage.LSMIndexFileProperties;
-public class AsterixReplicationProtocol {
+public class ReplicationProtocol {
/**
* All replication messages start with ReplicationFunctions (4 bytes), then the length of the request in bytes
@@ -115,7 +115,7 @@
//read replication request type
NetworkingUtil.readBytes(socketChannel, byteBuffer, REPLICATION_REQUEST_TYPE_SIZE);
- ReplicationRequestType requestType = AsterixReplicationProtocol.ReplicationRequestType.values()[byteBuffer
+ ReplicationRequestType requestType = ReplicationProtocol.ReplicationRequestType.values()[byteBuffer
.getInt()];
return requestType;
}
@@ -163,7 +163,7 @@
requestBuffer.flip();
}
- public static ByteBuffer writeFileReplicationRequest(ByteBuffer requestBuffer, AsterixLSMIndexFileProperties afp,
+ public static ByteBuffer writeFileReplicationRequest(ByteBuffer requestBuffer, LSMIndexFileProperties afp,
ReplicationRequestType requestType) throws IOException {
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
DataOutputStream oos = new DataOutputStream(outputStream);
@@ -183,10 +183,10 @@
return requestBuffer;
}
- public static AsterixLSMIndexFileProperties readFileReplicationRequest(ByteBuffer buffer) throws IOException {
+ public static LSMIndexFileProperties readFileReplicationRequest(ByteBuffer buffer) throws IOException {
ByteArrayInputStream bais = new ByteArrayInputStream(buffer.array(), buffer.position(), buffer.limit());
DataInputStream dis = new DataInputStream(bais);
- return AsterixLSMIndexFileProperties.create(dis);
+ return LSMIndexFileProperties.create(dis);
}
public static ReplicaLogsRequest readReplicaLogsRequest(ByteBuffer buffer) throws IOException {
@@ -335,12 +335,12 @@
* @throws IOException
*/
public static void sendGoodbye(SocketChannel socketChannel) throws IOException {
- ByteBuffer goodbyeBuffer = AsterixReplicationProtocol.getGoodbyeBuffer();
+ ByteBuffer goodbyeBuffer = ReplicationProtocol.getGoodbyeBuffer();
NetworkingUtil.transferBufferToChannel(socketChannel, goodbyeBuffer);
}
public static void sendAck(SocketChannel socketChannel) throws IOException {
- ByteBuffer ackBuffer = AsterixReplicationProtocol.getAckBuffer();
+ ByteBuffer ackBuffer = ReplicationProtocol.getAckBuffer();
NetworkingUtil.transferBufferToChannel(socketChannel, ackBuffer);
}
}
\ No newline at end of file
diff --git a/asterix-replication/src/main/java/org/apache/asterix/replication/logging/ReplicationLogBuffer.java b/asterix-replication/src/main/java/org/apache/asterix/replication/logging/ReplicationLogBuffer.java
index 38be05e..a7cfaec 100644
--- a/asterix-replication/src/main/java/org/apache/asterix/replication/logging/ReplicationLogBuffer.java
+++ b/asterix-replication/src/main/java/org/apache/asterix/replication/logging/ReplicationLogBuffer.java
@@ -27,7 +27,7 @@
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.asterix.common.transactions.ILogRecord;
-import org.apache.asterix.replication.functions.AsterixReplicationProtocol;
+import org.apache.asterix.replication.functions.ReplicationProtocol;
import org.apache.asterix.replication.management.NetworkingUtil;
import org.apache.asterix.replication.management.ReplicationManager;
@@ -53,7 +53,7 @@
}
public void append(ILogRecord logRecord) {
- appendBuffer.putInt(AsterixReplicationProtocol.ReplicationRequestType.REPLICATE_LOG.ordinal());
+ appendBuffer.putInt(ReplicationProtocol.ReplicationRequestType.REPLICATE_LOG.ordinal());
appendBuffer.putInt(logRecord.getSerializedLogSize());
appendBuffer.put(logRecord.getSerializedLog());
diff --git a/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicaEventNotifier.java b/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicaEventNotifier.java
index 633d87a..9915c83 100644
--- a/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicaEventNotifier.java
+++ b/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicaEventNotifier.java
@@ -30,7 +30,7 @@
import org.apache.asterix.common.config.AsterixReplicationProperties;
import org.apache.asterix.common.replication.Replica;
import org.apache.asterix.common.replication.ReplicaEvent;
-import org.apache.asterix.replication.functions.AsterixReplicationProtocol;
+import org.apache.asterix.replication.functions.ReplicationProtocol;
public class ReplicaEventNotifier implements Runnable {
@@ -61,7 +61,7 @@
ByteBuffer buffer = null;
try {
- buffer = AsterixReplicationProtocol.writeReplicaEventRequest(event);
+ buffer = ReplicationProtocol.writeReplicaEventRequest(event);
} catch (IOException e) {
e.printStackTrace();
}
@@ -79,7 +79,7 @@
//send replica event
connection.write(buffer);
//send goodbye
- connection.write(AsterixReplicationProtocol.getGoodbyeBuffer());
+ connection.write(ReplicationProtocol.getGoodbyeBuffer());
break;
} catch (IOException | UnresolvedAddressException e) {
try {
diff --git a/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicaStateChecker.java b/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicaStateChecker.java
index 07ed144..0c94c61 100644
--- a/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicaStateChecker.java
+++ b/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicaStateChecker.java
@@ -28,7 +28,7 @@
import org.apache.asterix.common.config.AsterixReplicationProperties;
import org.apache.asterix.common.replication.Replica;
import org.apache.asterix.common.replication.Replica.ReplicaState;
-import org.apache.asterix.replication.functions.AsterixReplicationProtocol;
+import org.apache.asterix.replication.functions.ReplicationProtocol;
public class ReplicaStateChecker implements Callable<Void> {
@@ -61,7 +61,7 @@
connection = SocketChannel.open();
connection.configureBlocking(true);
connection.connect(new InetSocketAddress(replicaAddress.getHostString(), replicaAddress.getPort()));
- ByteBuffer buffer = AsterixReplicationProtocol.getGoodbyeBuffer();
+ ByteBuffer buffer = ReplicationProtocol.getGoodbyeBuffer();
connection.write(buffer);
replicationManager.updateReplicaState(replica.getId(), ReplicaState.ACTIVE, suspendReplication);
return null;
diff --git a/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationChannel.java b/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationChannel.java
index e6b2ebf..c97fe94 100644
--- a/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationChannel.java
+++ b/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationChannel.java
@@ -42,7 +42,9 @@
import java.util.logging.Logger;
import org.apache.asterix.common.api.IDatasetLifecycleManager;
+import org.apache.asterix.common.cluster.ClusterPartition;
import org.apache.asterix.common.config.AsterixReplicationProperties;
+import org.apache.asterix.common.config.IAsterixPropertiesProvider;
import org.apache.asterix.common.context.DatasetLifecycleManager.IndexInfo;
import org.apache.asterix.common.exceptions.ACIDException;
import org.apache.asterix.common.ioopcallbacks.AbstractLSMIOOperationCallback;
@@ -59,15 +61,15 @@
import org.apache.asterix.common.transactions.LogRecord;
import org.apache.asterix.common.transactions.LogSource;
import org.apache.asterix.common.transactions.LogType;
-import org.apache.asterix.replication.functions.AsterixReplicationProtocol;
-import org.apache.asterix.replication.functions.AsterixReplicationProtocol.ReplicationRequestType;
+import org.apache.asterix.replication.functions.ReplicationProtocol;
+import org.apache.asterix.replication.functions.ReplicationProtocol.ReplicationRequestType;
import org.apache.asterix.replication.functions.ReplicaFilesRequest;
import org.apache.asterix.replication.functions.ReplicaIndexFlushRequest;
import org.apache.asterix.replication.functions.ReplicaLogsRequest;
import org.apache.asterix.replication.logging.RemoteLogMapping;
-import org.apache.asterix.replication.storage.AsterixLSMIndexFileProperties;
import org.apache.asterix.replication.storage.LSMComponentLSNSyncTask;
import org.apache.asterix.replication.storage.LSMComponentProperties;
+import org.apache.asterix.replication.storage.LSMIndexFileProperties;
import org.apache.asterix.replication.storage.ReplicaResourcesManager;
import org.apache.hyracks.api.application.INCApplicationContext;
import org.apache.hyracks.storage.am.common.api.IMetaDataPageManager;
@@ -210,7 +212,7 @@
public void run() {
Thread.currentThread().setName("Replication Thread");
try {
- ReplicationRequestType replicationFunction = AsterixReplicationProtocol.getRequestType(socketChannel,
+ ReplicationRequestType replicationFunction = ReplicationProtocol.getRequestType(socketChannel,
inBuffer);
while (replicationFunction != ReplicationRequestType.GOODBYE) {
switch (replicationFunction) {
@@ -251,7 +253,7 @@
throw new IllegalStateException("Unknown replication request");
}
}
- replicationFunction = AsterixReplicationProtocol.getRequestType(socketChannel, inBuffer);
+ replicationFunction = ReplicationProtocol.getRequestType(socketChannel, inBuffer);
}
} catch (Exception e) {
e.printStackTrace();
@@ -267,9 +269,9 @@
}
private void handleFlushIndex() throws IOException {
- inBuffer = AsterixReplicationProtocol.readRequest(socketChannel, inBuffer);
+ inBuffer = ReplicationProtocol.readRequest(socketChannel, inBuffer);
//1. read which indexes are requested to be flushed from remote replica
- ReplicaIndexFlushRequest request = AsterixReplicationProtocol.readReplicaIndexFlushRequest(inBuffer);
+ ReplicaIndexFlushRequest request = ReplicationProtocol.readReplicaIndexFlushRequest(inBuffer);
Set<Long> requestedIndexesToBeFlushed = request.getLaggingRescouresIds();
//2. check which indexes can be flushed (open indexes) and which cannot be flushed (closed or have empty memory component)
@@ -302,26 +304,25 @@
//the remaining indexes in the requested set are those which cannot be flushed.
//4. respond back to the requester that those indexes cannot be flushed
ReplicaIndexFlushRequest laggingIndexesResponse = new ReplicaIndexFlushRequest(requestedIndexesToBeFlushed);
- outBuffer = AsterixReplicationProtocol.writeGetReplicaIndexFlushRequest(outBuffer, laggingIndexesResponse);
+ outBuffer = ReplicationProtocol.writeGetReplicaIndexFlushRequest(outBuffer, laggingIndexesResponse);
NetworkingUtil.transferBufferToChannel(socketChannel, outBuffer);
}
private void handleLSMComponentProperties() throws IOException {
- inBuffer = AsterixReplicationProtocol.readRequest(socketChannel, inBuffer);
- LSMComponentProperties lsmCompProp = AsterixReplicationProtocol.readLSMPropertiesRequest(inBuffer);
+ inBuffer = ReplicationProtocol.readRequest(socketChannel, inBuffer);
+ LSMComponentProperties lsmCompProp = ReplicationProtocol.readLSMPropertiesRequest(inBuffer);
//create mask to indicate that this component is not valid yet
replicaResourcesManager.createRemoteLSMComponentMask(lsmCompProp);
lsmComponentId2PropertiesMap.put(lsmCompProp.getComponentId(), lsmCompProp);
}
private void handleReplicateFile() throws IOException {
- inBuffer = AsterixReplicationProtocol.readRequest(socketChannel, inBuffer);
- AsterixLSMIndexFileProperties afp = AsterixReplicationProtocol.readFileReplicationRequest(inBuffer);
+ inBuffer = ReplicationProtocol.readRequest(socketChannel, inBuffer);
+ LSMIndexFileProperties afp = ReplicationProtocol.readFileReplicationRequest(inBuffer);
- String replicaFolderPath = replicaResourcesManager.getIndexPath(afp.getNodeId(), afp.getIoDeviceNum(),
- afp.getDataverse(), afp.getIdxName());
-
- String replicaFilePath = replicaFolderPath + File.separator + afp.getFileName();
+ //get index path
+ String indexPath = replicaResourcesManager.getIndexPath(afp);
+ String replicaFilePath = indexPath + File.separator + afp.getFileName();
//create file
File destFile = new File(replicaFilePath);
@@ -334,20 +335,20 @@
fileChannel.force(true);
if (afp.requiresAck()) {
- AsterixReplicationProtocol.sendAck(socketChannel);
+ ReplicationProtocol.sendAck(socketChannel);
}
if (afp.isLSMComponentFile()) {
- String compoentId = LSMComponentProperties.getLSMComponentID(afp.getFilePath(), afp.getNodeId());
+ String componentId = LSMComponentProperties.getLSMComponentID(afp.getFilePath());
if (afp.getLSNByteOffset() != IMetaDataPageManager.INVALID_LSN_OFFSET) {
- LSMComponentLSNSyncTask syncTask = new LSMComponentLSNSyncTask(compoentId,
+ LSMComponentLSNSyncTask syncTask = new LSMComponentLSNSyncTask(componentId,
destFile.getAbsolutePath(), afp.getLSNByteOffset());
lsmComponentRemoteLSN2LocalLSNMappingTaskQ.offer(syncTask);
} else {
- updateLSMComponentRemainingFiles(compoentId);
+ updateLSMComponentRemainingFiles(componentId);
}
} else {
//index metadata file
- replicaResourcesManager.initializeReplicaIndexLSNMap(replicaFolderPath, logManager.getAppendLSN());
+ replicaResourcesManager.initializeReplicaIndexLSNMap(indexPath, logManager.getAppendLSN());
}
}
}
@@ -370,43 +371,48 @@
}
private void handleGetReplicaFiles() throws IOException {
- inBuffer = AsterixReplicationProtocol.readRequest(socketChannel, inBuffer);
- ReplicaFilesRequest request = AsterixReplicationProtocol.readReplicaFileRequest(inBuffer);
+ inBuffer = ReplicationProtocol.readRequest(socketChannel, inBuffer);
+ ReplicaFilesRequest request = ReplicationProtocol.readReplicaFileRequest(inBuffer);
- AsterixLSMIndexFileProperties fileProperties = new AsterixLSMIndexFileProperties();
+ LSMIndexFileProperties fileProperties = new LSMIndexFileProperties();
List<String> filesList;
Set<String> replicaIds = request.getReplicaIds();
-
+ Map<String, ClusterPartition[]> nodePartitions = ((IAsterixPropertiesProvider) asterixAppRuntimeContextProvider
+ .getAppContext()).getMetadataProperties().getNodePartitions();
for (String replicaId : replicaIds) {
- filesList = replicaResourcesManager.getResourcesForReplica(replicaId);
+ //get replica partitions
+ ClusterPartition[] replicaPatitions = nodePartitions.get(replicaId);
+ for (ClusterPartition partition : replicaPatitions) {
+ filesList = replicaResourcesManager.getPartitionIndexesFiles(partition.getPartitionId());
- //start sending files
- for (String filePath : filesList) {
- try (RandomAccessFile fromFile = new RandomAccessFile(filePath, "r");
- FileChannel fileChannel = fromFile.getChannel();) {
- long fileSize = fileChannel.size();
- fileProperties.initialize(filePath, fileSize, replicaId, false,
- IMetaDataPageManager.INVALID_LSN_OFFSET, false);
- outBuffer = AsterixReplicationProtocol.writeFileReplicationRequest(outBuffer, fileProperties,
- ReplicationRequestType.REPLICATE_FILE);
+ //start sending files
+ for (String filePath : filesList) {
+ try (RandomAccessFile fromFile = new RandomAccessFile(filePath, "r");
+ FileChannel fileChannel = fromFile.getChannel();) {
+ long fileSize = fileChannel.size();
+ fileProperties.initialize(filePath, fileSize, replicaId, false,
+ IMetaDataPageManager.INVALID_LSN_OFFSET, false);
+ outBuffer = ReplicationProtocol.writeFileReplicationRequest(outBuffer,
+ fileProperties, ReplicationRequestType.REPLICATE_FILE);
- //send file info
- NetworkingUtil.transferBufferToChannel(socketChannel, outBuffer);
+ //send file info
+ NetworkingUtil.transferBufferToChannel(socketChannel, outBuffer);
- //transfer file
- NetworkingUtil.sendFile(fileChannel, socketChannel);
+ //transfer file
+ NetworkingUtil.sendFile(fileChannel, socketChannel);
+ }
}
}
}
//send goodbye (end of files)
- AsterixReplicationProtocol.sendGoodbye(socketChannel);
+ ReplicationProtocol.sendGoodbye(socketChannel);
}
private void handleGetRemoteLogs() throws IOException, ACIDException {
- inBuffer = AsterixReplicationProtocol.readRequest(socketChannel, inBuffer);
- ReplicaLogsRequest request = AsterixReplicationProtocol.readReplicaLogsRequest(inBuffer);
+ inBuffer = ReplicationProtocol.readRequest(socketChannel, inBuffer);
+ ReplicaLogsRequest request = ReplicationProtocol.readReplicaLogsRequest(inBuffer);
Set<String> replicaIds = request.getReplicaIds();
long fromLSN = request.getFromLSN();
@@ -433,13 +439,13 @@
if (replicaIds.contains(logRecord.getNodeId()) && logRecord.getLogType() != LogType.FLUSH) {
if (logRecord.getSerializedLogSize() > outBuffer.capacity()) {
int requestSize = logRecord.getSerializedLogSize()
- + AsterixReplicationProtocol.REPLICATION_REQUEST_HEADER_SIZE;
+ + ReplicationProtocol.REPLICATION_REQUEST_HEADER_SIZE;
outBuffer = ByteBuffer.allocate(requestSize);
}
//set log source to REMOTE_RECOVERY to avoid re-logging on the recipient side
logRecord.setLogSource(LogSource.REMOTE_RECOVERY);
- AsterixReplicationProtocol.writeRemoteRecoveryLogRequest(outBuffer, logRecord);
+ ReplicationProtocol.writeRemoteRecoveryLogRequest(outBuffer, logRecord);
NetworkingUtil.transferBufferToChannel(socketChannel, outBuffer);
}
logRecord = logReader.next();
@@ -449,32 +455,32 @@
}
//send goodbye (end of logs)
- AsterixReplicationProtocol.sendGoodbye(socketChannel);
+ ReplicationProtocol.sendGoodbye(socketChannel);
}
private void handleUpdateReplica() throws IOException {
- inBuffer = AsterixReplicationProtocol.readRequest(socketChannel, inBuffer);
- Replica replica = AsterixReplicationProtocol.readReplicaUpdateRequest(inBuffer);
+ inBuffer = ReplicationProtocol.readRequest(socketChannel, inBuffer);
+ Replica replica = ReplicationProtocol.readReplicaUpdateRequest(inBuffer);
replicationManager.updateReplicaInfo(replica);
}
private void handleReplicaEvent() throws IOException {
- inBuffer = AsterixReplicationProtocol.readRequest(socketChannel, inBuffer);
- ReplicaEvent event = AsterixReplicationProtocol.readReplicaEventRequest(inBuffer);
+ inBuffer = ReplicationProtocol.readRequest(socketChannel, inBuffer);
+ ReplicaEvent event = ReplicationProtocol.readReplicaEventRequest(inBuffer);
replicationManager.reportReplicaEvent(event);
}
private void handleDeleteFile() throws IOException {
- inBuffer = AsterixReplicationProtocol.readRequest(socketChannel, inBuffer);
- AsterixLSMIndexFileProperties fileProp = AsterixReplicationProtocol.readFileReplicationRequest(inBuffer);
- replicaResourcesManager.deleteRemoteFile(fileProp);
+ inBuffer = ReplicationProtocol.readRequest(socketChannel, inBuffer);
+ LSMIndexFileProperties fileProp = ReplicationProtocol.readFileReplicationRequest(inBuffer);
+ replicaResourcesManager.deleteIndexFile(fileProp);
if (fileProp.requiresAck()) {
- AsterixReplicationProtocol.sendAck(socketChannel);
+ ReplicationProtocol.sendAck(socketChannel);
}
}
private void handleLogReplication() throws IOException, ACIDException {
- inBuffer = AsterixReplicationProtocol.readRequest(socketChannel, inBuffer);
+ inBuffer = ReplicationProtocol.readRequest(socketChannel, inBuffer);
//Deserialize log
remoteLog.readRemoteLog(inBuffer, false, localNodeID);
@@ -518,7 +524,7 @@
//send ACK to requester
try {
socketChannel.socket().getOutputStream().write(
- (localNodeID + AsterixReplicationProtocol.JOB_COMMIT_ACK + logRecord.getJobId() + "\n")
+ (localNodeID + ReplicationProtocol.JOB_COMMIT_ACK + logRecord.getJobId() + "\n")
.getBytes());
socketChannel.socket().getOutputStream().flush();
} catch (IOException e) {
@@ -625,6 +631,7 @@
}
File destFile = new File(syncTask.getComponentFilePath());
+ //prepare local LSN buffer
ByteBuffer metadataBuffer = ByteBuffer.allocate(Long.BYTES);
metadataBuffer.putLong(lsmCompProp.getReplicaLSN());
metadataBuffer.flip();
diff --git a/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationManager.java b/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationManager.java
index 36e5dff..5c35df4 100644
--- a/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationManager.java
+++ b/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationManager.java
@@ -66,17 +66,16 @@
import org.apache.asterix.common.transactions.LogRecord;
import org.apache.asterix.common.transactions.LogType;
import org.apache.asterix.event.schema.cluster.Node;
-import org.apache.asterix.replication.functions.AsterixReplicationProtocol;
-import org.apache.asterix.replication.functions.AsterixReplicationProtocol.ReplicationRequestType;
+import org.apache.asterix.replication.functions.ReplicationProtocol;
+import org.apache.asterix.replication.functions.ReplicationProtocol.ReplicationRequestType;
import org.apache.asterix.replication.functions.ReplicaFilesRequest;
import org.apache.asterix.replication.functions.ReplicaIndexFlushRequest;
import org.apache.asterix.replication.functions.ReplicaLogsRequest;
import org.apache.asterix.replication.logging.ReplicationLogBuffer;
import org.apache.asterix.replication.logging.ReplicationLogFlusher;
-import org.apache.asterix.replication.storage.AsterixLSMIndexFileProperties;
import org.apache.asterix.replication.storage.LSMComponentProperties;
+import org.apache.asterix.replication.storage.LSMIndexFileProperties;
import org.apache.asterix.replication.storage.ReplicaResourcesManager;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.replication.IReplicationJob;
import org.apache.hyracks.api.replication.IReplicationJob.ReplicationExecutionType;
import org.apache.hyracks.api.replication.IReplicationJob.ReplicationJobType;
@@ -129,7 +128,8 @@
private ReplicationLogFlusher txnlogsReplicator;
private Future<? extends Object> txnLogReplicatorTask;
private Map<String, SocketChannel> logsReplicaSockets = null;
-
+ //TODO this class needs to be refactored by moving its private classes to separate files
+ //and possibly using MessageBroker to send/receive remote replicas events.
public ReplicationManager(String nodeId, AsterixReplicationProperties replicationProperties,
IReplicaResourcesManager remoteResoucesManager, ILogManager logManager,
IAsterixAppRuntimeContextProvider asterixAppRuntimeContextProvider) {
@@ -255,7 +255,7 @@
throws IOException {
boolean isLSMComponentFile;
ByteBuffer responseBuffer = null;
- AsterixLSMIndexFileProperties asterixFileProperties = new AsterixLSMIndexFileProperties();
+ LSMIndexFileProperties asterixFileProperties = new LSMIndexFileProperties();
if (requestBuffer == null) {
requestBuffer = ByteBuffer.allocate(INITIAL_BUFFER_SIZE);
}
@@ -277,7 +277,7 @@
//send LSMComponent properties
LSMComponentJob = (ILSMIndexReplicationJob) job;
LSMComponentProperties lsmCompProp = new LSMComponentProperties(LSMComponentJob, nodeId);
- requestBuffer = AsterixReplicationProtocol.writeLSMComponentPropertiesRequest(lsmCompProp,
+ requestBuffer = ReplicationProtocol.writeLSMComponentPropertiesRequest(lsmCompProp,
requestBuffer);
sendRequest(replicasSockets, requestBuffer);
}
@@ -310,7 +310,7 @@
IMetaDataPageManager.INVALID_LSN_OFFSET, remainingFiles == 0);
}
- requestBuffer = AsterixReplicationProtocol.writeFileReplicationRequest(requestBuffer,
+ requestBuffer = ReplicationProtocol.writeFileReplicationRequest(requestBuffer,
asterixFileProperties, ReplicationRequestType.REPLICATE_FILE);
Iterator<Map.Entry<String, SocketChannel>> iterator = replicasSockets.entrySet().iterator();
@@ -350,7 +350,7 @@
remainingFiles--;
asterixFileProperties.initialize(filePath, -1, nodeId, isLSMComponentFile,
IMetaDataPageManager.INVALID_LSN_OFFSET, remainingFiles == 0);
- AsterixReplicationProtocol.writeFileReplicationRequest(requestBuffer, asterixFileProperties,
+ ReplicationProtocol.writeFileReplicationRequest(requestBuffer, asterixFileProperties,
ReplicationRequestType.DELETE_FILE);
Iterator<Map.Entry<String, SocketChannel>> iterator = replicasSockets.entrySet().iterator();
@@ -392,13 +392,13 @@
private static ReplicationRequestType waitForResponse(SocketChannel socketChannel, ByteBuffer responseBuffer)
throws IOException {
if (responseBuffer == null) {
- responseBuffer = ByteBuffer.allocate(AsterixReplicationProtocol.REPLICATION_REQUEST_TYPE_SIZE);
+ responseBuffer = ByteBuffer.allocate(ReplicationProtocol.REPLICATION_REQUEST_TYPE_SIZE);
} else {
responseBuffer.clear();
}
//read response from remote replicas
- ReplicationRequestType responseFunction = AsterixReplicationProtocol.getRequestType(socketChannel,
+ ReplicationRequestType responseFunction = ReplicationProtocol.getRequestType(socketChannel,
responseBuffer);
return responseFunction;
}
@@ -519,7 +519,7 @@
node.setClusterIp(newAddress);
Replica replica = new Replica(node);
- ByteBuffer buffer = AsterixReplicationProtocol.writeUpdateReplicaRequest(replica);
+ ByteBuffer buffer = ReplicationProtocol.writeUpdateReplicaRequest(replica);
Map<String, SocketChannel> replicaSockets = getActiveRemoteReplicasSockets();
sendRequest(replicaSockets, buffer);
closeReplicaSockets(replicaSockets);
@@ -537,7 +537,7 @@
node.setClusterIp(NetworkingUtil.getHostAddress(hostIPAddressFirstOctet));
Replica replica = new Replica(node);
ReplicaEvent event = new ReplicaEvent(replica, ReplicaEventType.SHUTDOWN);
- ByteBuffer buffer = AsterixReplicationProtocol.writeReplicaEventRequest(event);
+ ByteBuffer buffer = ReplicationProtocol.writeReplicaEventRequest(event);
Map<String, SocketChannel> replicaSockets = getActiveRemoteReplicasSockets();
sendRequest(replicaSockets, buffer);
closeReplicaSockets(replicaSockets);
@@ -581,7 +581,7 @@
*/
private void closeReplicaSockets(Map<String, SocketChannel> replicaSockets) {
//send goodbye
- ByteBuffer goodbyeBuffer = AsterixReplicationProtocol.getGoodbyeBuffer();
+ ByteBuffer goodbyeBuffer = ReplicationProtocol.getGoodbyeBuffer();
sendRequest(replicaSockets, goodbyeBuffer);
Iterator<Map.Entry<String, SocketChannel>> iterator = replicaSockets.entrySet().iterator();
@@ -910,7 +910,7 @@
ByteBuffer requestBuffer = ByteBuffer.allocate(INITIAL_BUFFER_SIZE);
for (String replicaId : replicaIds) {
//1. identify replica indexes with LSN less than nonSharpCheckpointTargetLSN.
- HashMap<Long, String> laggingIndexes = replicaResourcesManager.getLaggingReplicaIndexesId2PathMap(replicaId,
+ Map<Long, String> laggingIndexes = replicaResourcesManager.getLaggingReplicaIndexesId2PathMap(replicaId,
nonSharpCheckpointTargetLSN);
if (laggingIndexes.size() > 0) {
@@ -919,7 +919,7 @@
try (SocketChannel socketChannel = getReplicaSocket(replicaId)) {
ReplicaIndexFlushRequest laggingIndexesRequest = new ReplicaIndexFlushRequest(
laggingIndexes.keySet());
- requestBuffer = AsterixReplicationProtocol.writeGetReplicaIndexFlushRequest(requestBuffer,
+ requestBuffer = ReplicationProtocol.writeGetReplicaIndexFlushRequest(requestBuffer,
laggingIndexesRequest);
NetworkingUtil.transferBufferToChannel(socketChannel, requestBuffer);
@@ -927,19 +927,19 @@
ReplicationRequestType responseFunction = waitForResponse(socketChannel, requestBuffer);
if (responseFunction == ReplicationRequestType.FLUSH_INDEX) {
- requestBuffer = AsterixReplicationProtocol.readRequest(socketChannel, requestBuffer);
+ requestBuffer = ReplicationProtocol.readRequest(socketChannel, requestBuffer);
//returning the indexes that were not flushed
- laggingIndexesResponse = AsterixReplicationProtocol.readReplicaIndexFlushRequest(requestBuffer);
+ laggingIndexesResponse = ReplicationProtocol.readReplicaIndexFlushRequest(requestBuffer);
}
//send goodbye
- AsterixReplicationProtocol.sendGoodbye(socketChannel);
+ ReplicationProtocol.sendGoodbye(socketChannel);
}
//4. update the LSN_MAP for indexes that were not flushed to the current append LSN to indicate no operations happend.
if (laggingIndexesResponse != null) {
for (Long resouceId : laggingIndexesResponse.getLaggingRescouresIds()) {
String indexPath = laggingIndexes.get(resouceId);
- HashMap<Long, Long> indexLSNMap = replicaResourcesManager.getReplicaIndexLSNMap(indexPath);
+ Map<Long, Long> indexLSNMap = replicaResourcesManager.getReplicaIndexLSNMap(indexPath);
indexLSNMap.put(ReplicaResourcesManager.REPLICA_INDEX_CREATION_LSN, startLSN);
replicaResourcesManager.updateReplicaIndexLSNMap(indexPath, indexLSNMap);
}
@@ -953,7 +953,7 @@
public long getMaxRemoteLSN(Set<String> remoteReplicas) throws IOException {
long maxRemoteLSN = 0;
- AsterixReplicationProtocol.writeGetReplicaMaxLSNRequest(dataBuffer);
+ ReplicationProtocol.writeGetReplicaMaxLSNRequest(dataBuffer);
Map<String, SocketChannel> replicaSockets = new HashMap<String, SocketChannel>();
try {
for (String replicaId : remoteReplicas) {
@@ -988,26 +988,26 @@
@Override
public void requestReplicaFiles(String selectedReplicaId, Set<String> replicasDataToRecover) throws IOException {
ReplicaFilesRequest request = new ReplicaFilesRequest(replicasDataToRecover);
- AsterixReplicationProtocol.writeGetReplicaFilesRequest(dataBuffer, request);
+ ReplicationProtocol.writeGetReplicaFilesRequest(dataBuffer, request);
try (SocketChannel socketChannel = getReplicaSocket(selectedReplicaId)) {
//transfer request
NetworkingUtil.transferBufferToChannel(socketChannel, dataBuffer);
- String destFolder;
+ String indexPath;
String destFilePath;
-
- ReplicationRequestType responseFunction = AsterixReplicationProtocol.getRequestType(socketChannel,
+ ReplicationRequestType responseFunction = ReplicationProtocol.getRequestType(socketChannel,
dataBuffer);
- AsterixLSMIndexFileProperties fileProperties;
+ LSMIndexFileProperties fileProperties;
while (responseFunction != ReplicationRequestType.GOODBYE) {
- dataBuffer = AsterixReplicationProtocol.readRequest(socketChannel, dataBuffer);
+ dataBuffer = ReplicationProtocol.readRequest(socketChannel, dataBuffer);
- fileProperties = AsterixReplicationProtocol.readFileReplicationRequest(dataBuffer);
- destFolder = replicaResourcesManager.getIndexPath(fileProperties.getNodeId(),
- fileProperties.getIoDeviceNum(), fileProperties.getDataverse(), fileProperties.getIdxName());
- destFilePath = destFolder + File.separator + fileProperties.getFileName();
+ fileProperties = ReplicationProtocol.readFileReplicationRequest(dataBuffer);
+
+ //get index path
+ indexPath = replicaResourcesManager.getIndexPath(fileProperties);
+ destFilePath = indexPath + File.separator + fileProperties.getFileName();
//create file
File destFile = new File(destFilePath);
@@ -1024,14 +1024,14 @@
//we need to create LSN map for .metadata files that belong to remote replicas
if (!fileProperties.isLSMComponentFile() && !fileProperties.getNodeId().equals(nodeId)) {
//replica index
- replicaResourcesManager.initializeReplicaIndexLSNMap(destFolder, logManager.getAppendLSN());
+ replicaResourcesManager.initializeReplicaIndexLSNMap(indexPath, logManager.getAppendLSN());
}
- responseFunction = AsterixReplicationProtocol.getRequestType(socketChannel, dataBuffer);
+ responseFunction = ReplicationProtocol.getRequestType(socketChannel, dataBuffer);
}
//send goodbye
- AsterixReplicationProtocol.sendGoodbye(socketChannel);
+ ReplicationProtocol.sendGoodbye(socketChannel);
}
}
@@ -1039,7 +1039,7 @@
@Override
public long requestReplicaMinLSN(String selectedReplicaId) throws IOException {
long minLSN = 0;
- AsterixReplicationProtocol.writeMinLSNRequest(dataBuffer);
+ ReplicationProtocol.writeMinLSNRequest(dataBuffer);
try (SocketChannel socketChannel = getReplicaSocket(selectedReplicaId);) {
//transfer request
NetworkingUtil.transferBufferToChannel(socketChannel, dataBuffer);
@@ -1049,7 +1049,7 @@
minLSN = dataBuffer.getLong();
//send goodbye
- AsterixReplicationProtocol.sendGoodbye(socketChannel);
+ ReplicationProtocol.sendGoodbye(socketChannel);
}
return minLSN;
@@ -1060,19 +1060,19 @@
public ArrayList<ILogRecord> requestReplicaLogs(String remoteNode, Set<String> nodeIdsToRecoverFor, long fromLSN)
throws IOException, ACIDException {
ReplicaLogsRequest request = new ReplicaLogsRequest(nodeIdsToRecoverFor, fromLSN);
- dataBuffer = AsterixReplicationProtocol.writeGetReplicaLogsRequest(dataBuffer, request);
+ dataBuffer = ReplicationProtocol.writeGetReplicaLogsRequest(dataBuffer, request);
try (SocketChannel socketChannel = getReplicaSocket(remoteNode)) {
//transfer request
NetworkingUtil.transferBufferToChannel(socketChannel, dataBuffer);
//read response type
- ReplicationRequestType responseType = AsterixReplicationProtocol.getRequestType(socketChannel, dataBuffer);
+ ReplicationRequestType responseType = ReplicationProtocol.getRequestType(socketChannel, dataBuffer);
ArrayList<ILogRecord> recoveryLogs = new ArrayList<ILogRecord>();
ILogRecord logRecord = new LogRecord();
while (responseType != ReplicationRequestType.GOODBYE) {
- dataBuffer = AsterixReplicationProtocol.readRequest(socketChannel, dataBuffer);
+ dataBuffer = ReplicationProtocol.readRequest(socketChannel, dataBuffer);
logRecord.readRemoteLog(dataBuffer, true, nodeId);
if (logRecord.getNodeId().equals(nodeId)) {
@@ -1085,11 +1085,11 @@
logManager.log(logRecord);
}
- responseType = AsterixReplicationProtocol.getRequestType(socketChannel, dataBuffer);
+ responseType = ReplicationProtocol.getRequestType(socketChannel, dataBuffer);
}
//send goodbye
- AsterixReplicationProtocol.sendGoodbye(socketChannel);
+ ReplicationProtocol.sendGoodbye(socketChannel);
return recoveryLogs;
}
}
@@ -1136,11 +1136,7 @@
updateReplicaState(replicaId, ReplicaState.DEAD, true);
//delete any invalid LSMComponents for this replica
- try {
- replicaResourcesManager.cleanInvalidLSMComponents(replicaId);
- } catch (HyracksDataException e) {
- e.printStackTrace();
- }
+ replicaResourcesManager.cleanInvalidLSMComponents(replicaId);
}
public void handleShutdownEvent(String replicaId) {
@@ -1237,8 +1233,8 @@
break;
}
//read ACK for job commit log
- String replicaId = AsterixReplicationProtocol.getNodeIdFromLogAckMessage(responseLine);
- int jobId = AsterixReplicationProtocol.getJobIdFromLogAckMessage(responseLine);
+ String replicaId = ReplicationProtocol.getNodeIdFromLogAckMessage(responseLine);
+ int jobId = ReplicationProtocol.getJobIdFromLogAckMessage(responseLine);
addAckToJob(jobId, replicaId);
}
} catch (AsynchronousCloseException e1) {
diff --git a/asterix-replication/src/main/java/org/apache/asterix/replication/recovery/RemoteRecoveryManager.java b/asterix-replication/src/main/java/org/apache/asterix/replication/recovery/RemoteRecoveryManager.java
index a82b535..ee987f8 100644
--- a/asterix-replication/src/main/java/org/apache/asterix/replication/recovery/RemoteRecoveryManager.java
+++ b/asterix-replication/src/main/java/org/apache/asterix/replication/recovery/RemoteRecoveryManager.java
@@ -18,6 +18,7 @@
*/
package org.apache.asterix.replication.recovery;
+import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
@@ -30,10 +31,12 @@
import org.apache.asterix.common.api.IAsterixAppRuntimeContext;
import org.apache.asterix.common.api.IDatasetLifecycleManager;
import org.apache.asterix.common.config.AsterixReplicationProperties;
+import org.apache.asterix.common.exceptions.ACIDException;
import org.apache.asterix.common.replication.IRemoteRecoveryManager;
import org.apache.asterix.common.replication.IReplicationManager;
import org.apache.asterix.common.transactions.ILogManager;
import org.apache.asterix.common.transactions.ILogRecord;
+import org.apache.asterix.om.util.AsterixClusterProperties;
import org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository;
public class RemoteRecoveryManager implements IRemoteRecoveryManager {
@@ -55,13 +58,20 @@
@Override
public void performRemoteRecovery() {
+ //TODO this method needs to be adapted to perform failback when autoFailover is enabled.
+ //Currently we will not allow a node to perform remote recovery since another replica
+ //already tookover its workload and might not resync correctly if there are on on-going
+ //jobs on the replica.
+ if (AsterixClusterProperties.INSTANCE.isAutoFailoverEnabled()) {
+ throw new IllegalStateException("Cannot perform remote recovery when auto failover is enabled.");
+ }
//The whole remote recovery process should be atomic.
//Any error happens, we should start the recovery from the start until the recovery is complete or an illegal state is reached (cannot recovery).
int maxRecoveryAttempts = 10;
PersistentLocalResourceRepository resourceRepository = (PersistentLocalResourceRepository) runtimeContext
.getLocalResourceRepository();
while (true) {
- //start recovery recovery steps
+ //start recovery steps
try {
maxRecoveryAttempts--;
@@ -76,7 +86,7 @@
int activeReplicasCount = replicationManager.getActiveReplicasCount();
if (activeReplicasCount == 0) {
- throw new IllegalStateException("no ACTIVE remote replica(s) exists to performe remote recovery");
+ throw new IllegalStateException("no ACTIVE remote replica(s) exists to perform remote recovery");
}
//2. clean any memory data that could've existed from previous failed recovery attempt
@@ -90,8 +100,7 @@
Map<String, Set<String>> selectedRemoteReplicas = constructRemoteRecoveryPlan();
//5. get max LSN from selected remote replicas
- long maxRemoteLSN = 0;
- maxRemoteLSN = replicationManager.getMaxRemoteLSN(selectedRemoteReplicas.keySet());
+ long maxRemoteLSN = replicationManager.getMaxRemoteLSN(selectedRemoteReplicas.keySet());
//6. force LogManager to start from a partition > maxLSN in selected remote replicas
logManager.renewLogFilesAndStartFromLSN(maxRemoteLSN);
@@ -107,8 +116,7 @@
//2. Initialize local resources based on the newly received files (if we are recovering the primary replica on this node)
if (replicasDataToRecover.contains(logManager.getNodeId())) {
((PersistentLocalResourceRepository) runtimeContext.getLocalResourceRepository())
- .initializeNewUniverse(
- runtimeContext.getReplicaResourcesManager().getLocalStorageFolder());
+ .initializeNewUniverse(AsterixClusterProperties.INSTANCE.getStorageDirectoryName());
//initialize resource id factor to correct max resource id
runtimeContext.initializeResourceIdFactory();
}
@@ -140,7 +148,6 @@
}
private Map<String, Set<String>> constructRemoteRecoveryPlan() {
-
//1. identify which replicas reside in this node
String localNodeId = logManager.getNodeId();
Set<String> nodes = replicationProperties.getNodeReplicasIds(localNodeId);
@@ -205,4 +212,14 @@
return recoveryList;
}
+
+ @Override
+ public void takeoverPartitons(String failedNode, Integer[] partitions) throws IOException, ACIDException {
+ long minLSN = runtimeContext.getReplicaResourcesManager().getPartitionsMinLSN(partitions);
+ //reply logs > minLSN that belong to these partitions
+ //TODO (mhubail) currently we assume the logs for these partitions belong to the failed node
+ //this needs to be updated once log formats are updated to include the partition id
+ runtimeContext.getTransactionSubsystem().getRecoveryManager().replayPartitionsLogs(partitions, minLSN,
+ failedNode);
+ }
}
\ No newline at end of file
diff --git a/asterix-replication/src/main/java/org/apache/asterix/replication/storage/AsterixFilesUtil.java b/asterix-replication/src/main/java/org/apache/asterix/replication/storage/AsterixFilesUtil.java
deleted file mode 100644
index 67b39c4..0000000
--- a/asterix-replication/src/main/java/org/apache/asterix/replication/storage/AsterixFilesUtil.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.replication.storage;
-
-import java.io.File;
-import java.io.IOException;
-import java.nio.file.FileVisitResult;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.nio.file.Paths;
-import java.nio.file.SimpleFileVisitor;
-import java.nio.file.attribute.BasicFileAttributes;
-
-public class AsterixFilesUtil {
-
- public static void deleteFolder(String folderPath) throws IOException {
- File folder = new File(folderPath);
- if (folder.exists()) {
- //delete files inside the folder
- while (deleteDirecotryFiles(folderPath) != true) {
- //if there is a file being written (e.g. LSM Component), wait and try again to delete the file
- try {
- Thread.sleep(500);
- } catch (InterruptedException e) {
- //ignore
- }
- }
-
- //delete the folder itself
- folder.delete();
- }
- }
-
- private static boolean deleteDirecotryFiles(String dirPath) throws IOException {
- try {
- Path directory = Paths.get(dirPath);
- Files.walkFileTree(directory, new SimpleFileVisitor<Path>() {
- @Override
- public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException {
- Files.delete(file);
- return FileVisitResult.CONTINUE;
- }
-
- @Override
- public FileVisitResult postVisitDirectory(Path dir, IOException exc) throws IOException {
- Files.delete(dir);
- return FileVisitResult.CONTINUE;
- }
-
- @Override
- public FileVisitResult visitFileFailed(Path file, IOException exc) throws IOException {
- return FileVisitResult.CONTINUE;
- }
-
- });
- return true;
- } catch (Exception e) {
- e.printStackTrace();
- return false;
- }
- }
-}
diff --git a/asterix-replication/src/main/java/org/apache/asterix/replication/storage/LSMComponentProperties.java b/asterix-replication/src/main/java/org/apache/asterix/replication/storage/LSMComponentProperties.java
index 794a6e1..841a99f 100644
--- a/asterix-replication/src/main/java/org/apache/asterix/replication/storage/LSMComponentProperties.java
+++ b/asterix-replication/src/main/java/org/apache/asterix/replication/storage/LSMComponentProperties.java
@@ -47,9 +47,10 @@
public LSMComponentProperties(ILSMIndexReplicationJob job, String nodeId) {
this.nodeId = nodeId;
- componentId = LSMComponentProperties.getLSMComponentID((String) job.getJobFiles().toArray()[0], nodeId);
+ componentId = LSMComponentProperties.getLSMComponentID((String) job.getJobFiles().toArray()[0]);
numberOfFiles = new AtomicInteger(job.getJobFiles().size());
- originalLSN = LSMComponentProperties.getLSMComponentLSN((AbstractLSMIndex) job.getLSMIndex(), job.getLSMIndexOperationContext());
+ originalLSN = LSMComponentProperties.getLSMComponentLSN((AbstractLSMIndex) job.getLSMIndex(),
+ job.getLSMIndexOperationContext());
opType = job.getLSMOpType();
}
@@ -94,7 +95,7 @@
public String getMaskPath(ReplicaResourcesManager resourceManager) {
if (maskPath == null) {
- AsterixLSMIndexFileProperties afp = new AsterixLSMIndexFileProperties(this);
+ LSMIndexFileProperties afp = new LSMIndexFileProperties(this);
maskPath = getReplicaComponentPath(resourceManager) + File.separator + afp.getFileName()
+ ReplicaResourcesManager.LSM_COMPONENT_MASK_SUFFIX;
}
@@ -103,9 +104,8 @@
public String getReplicaComponentPath(ReplicaResourcesManager resourceManager) {
if (replicaPath == null) {
- AsterixLSMIndexFileProperties afp = new AsterixLSMIndexFileProperties(this);
- replicaPath = resourceManager.getIndexPath(afp.getNodeId(), afp.getIoDeviceNum(), afp.getDataverse(),
- afp.getIdxName());
+ LSMIndexFileProperties afp = new LSMIndexFileProperties(this);
+ replicaPath = resourceManager.getIndexPath(afp);
}
return replicaPath;
}
@@ -113,27 +113,24 @@
/***
* @param filePath
* any file of the LSM component
- * @param nodeId
* @return a unique id based on the timestamp of the component
*/
- public static String getLSMComponentID(String filePath, String nodeId) {
+ public static String getLSMComponentID(String filePath) {
String[] tokens = filePath.split(File.separator);
int arraySize = tokens.length;
String fileName = tokens[arraySize - 1];
- String ioDevoceName = tokens[arraySize - 2];
- String idxName = tokens[arraySize - 3];
- String dataverse = tokens[arraySize - 4];
+ String idxName = tokens[arraySize - 2];
+ String dataverse = tokens[arraySize - 3];
+ String partitionName = tokens[arraySize - 4];
StringBuilder componentId = new StringBuilder();
- componentId.append(nodeId);
+ componentId.append(partitionName);
componentId.append(File.separator);
componentId.append(dataverse);
componentId.append(File.separator);
componentId.append(idxName);
componentId.append(File.separator);
- componentId.append(ioDevoceName);
- componentId.append(File.separator);
componentId.append(fileName.substring(0, fileName.lastIndexOf(AbstractLSMIndexFileManager.SPLIT_STRING)));
return componentId.toString();
}
@@ -142,18 +139,10 @@
return componentId;
}
- public void setComponentId(String componentId) {
- this.componentId = componentId;
- }
-
public long getOriginalLSN() {
return originalLSN;
}
- public void setOriginalLSN(long lSN) {
- originalLSN = lSN;
- }
-
public String getNodeId() {
return nodeId;
}
diff --git a/asterix-replication/src/main/java/org/apache/asterix/replication/storage/AsterixLSMIndexFileProperties.java b/asterix-replication/src/main/java/org/apache/asterix/replication/storage/LSMIndexFileProperties.java
similarity index 69%
rename from asterix-replication/src/main/java/org/apache/asterix/replication/storage/AsterixLSMIndexFileProperties.java
rename to asterix-replication/src/main/java/org/apache/asterix/replication/storage/LSMIndexFileProperties.java
index 5e0c9b0..890d3a2 100644
--- a/asterix-replication/src/main/java/org/apache/asterix/replication/storage/AsterixLSMIndexFileProperties.java
+++ b/asterix-replication/src/main/java/org/apache/asterix/replication/storage/LSMIndexFileProperties.java
@@ -24,31 +24,31 @@
import java.io.IOException;
import java.io.OutputStream;
+import org.apache.asterix.common.utils.StoragePathUtil;
import org.apache.hyracks.storage.am.common.api.IMetaDataPageManager;
-import org.apache.hyracks.storage.am.common.util.IndexFileNameUtil;
-public class AsterixLSMIndexFileProperties {
+public class LSMIndexFileProperties {
private String fileName;
private long fileSize;
private String nodeId;
private String dataverse;
- private int ioDeviceNum;
private String idxName;
private boolean lsmComponentFile;
private String filePath;
private boolean requiresAck = false;
private long LSNByteOffset;
+ private int partition;
- public AsterixLSMIndexFileProperties() {
+ public LSMIndexFileProperties() {
}
- public AsterixLSMIndexFileProperties(String filePath, long fileSize, String nodeId, boolean lsmComponentFile,
+ public LSMIndexFileProperties(String filePath, long fileSize, String nodeId, boolean lsmComponentFile,
long LSNByteOffset, boolean requiresAck) {
initialize(filePath, fileSize, nodeId, lsmComponentFile, LSNByteOffset, requiresAck);
}
- public AsterixLSMIndexFileProperties(LSMComponentProperties lsmComponentProperties) {
+ public LSMIndexFileProperties(LSMComponentProperties lsmComponentProperties) {
initialize(lsmComponentProperties.getComponentId(), -1, lsmComponentProperties.getNodeId(), false,
IMetaDataPageManager.INVALID_LSN_OFFSET, false);
}
@@ -58,19 +58,22 @@
this.filePath = filePath;
this.fileSize = fileSize;
this.nodeId = nodeId;
- String[] tokens = filePath.split(File.separator);
- int arraySize = tokens.length;
- this.fileName = tokens[arraySize - 1];
- this.ioDeviceNum = getDeviceIONumFromName(tokens[arraySize - 2]);
- this.idxName = tokens[arraySize - 3];
- this.dataverse = tokens[arraySize - 4];
this.lsmComponentFile = lsmComponentFile;
this.LSNByteOffset = LSNByteOffset;
this.requiresAck = requiresAck;
}
- public static int getDeviceIONumFromName(String name) {
- return Integer.parseInt(name.substring(IndexFileNameUtil.IO_DEVICE_NAME_PREFIX.length()));
+ public void splitFileName() {
+ String[] tokens = filePath.split(File.separator);
+ int arraySize = tokens.length;
+ this.fileName = tokens[arraySize - 1];
+ this.idxName = tokens[arraySize - 2];
+ this.dataverse = tokens[arraySize - 3];
+ this.partition = getPartitonNumFromName(tokens[arraySize - 4]);
+ }
+
+ private static int getPartitonNumFromName(String name) {
+ return Integer.parseInt(name.substring(StoragePathUtil.PARTITION_DIR_PREFIX.length()));
}
public void serialize(OutputStream out) throws IOException {
@@ -83,15 +86,15 @@
dos.writeBoolean(requiresAck);
}
- public static AsterixLSMIndexFileProperties create(DataInput input) throws IOException {
+ public static LSMIndexFileProperties create(DataInput input) throws IOException {
String nodeId = input.readUTF();
String filePath = input.readUTF();
long fileSize = input.readLong();
boolean lsmComponentFile = input.readBoolean();
long LSNByteOffset = input.readLong();
boolean requiresAck = input.readBoolean();
- AsterixLSMIndexFileProperties fileProp = new AsterixLSMIndexFileProperties();
- fileProp.initialize(filePath, fileSize, nodeId, lsmComponentFile, LSNByteOffset, requiresAck);
+ LSMIndexFileProperties fileProp = new LSMIndexFileProperties(filePath, fileSize, nodeId, lsmComponentFile,
+ LSNByteOffset, requiresAck);
return fileProp;
}
@@ -99,10 +102,6 @@
return filePath;
}
- public void setFilePath(String filePath) {
- this.filePath = filePath;
- }
-
public long getFileSize() {
return fileSize;
}
@@ -111,10 +110,6 @@
return fileName;
}
- public void setFileName(String fileName) {
- this.fileName = fileName;
- }
-
public String getNodeId() {
return nodeId;
}
@@ -131,49 +126,25 @@
this.dataverse = dataverse;
}
- public void setFileSize(long fileSize) {
- this.fileSize = fileSize;
- }
-
- public int getIoDeviceNum() {
- return ioDeviceNum;
- }
-
- public void setIoDeviceNum(int ioDevoceNum) {
- this.ioDeviceNum = ioDevoceNum;
- }
-
public String getIdxName() {
return idxName;
}
- public void setIdxName(String idxName) {
- this.idxName = idxName;
- }
-
public boolean isLSMComponentFile() {
return lsmComponentFile;
}
- public void setLsmComponentFile(boolean lsmComponentFile) {
- this.lsmComponentFile = lsmComponentFile;
- }
-
public boolean requiresAck() {
return requiresAck;
}
- public void setRequiresAck(boolean requiresAck) {
- this.requiresAck = requiresAck;
- }
-
@Override
public String toString() {
StringBuilder sb = new StringBuilder();
sb.append("File Name: " + fileName + " ");
sb.append("File Size: " + fileSize + " ");
sb.append("Node ID: " + nodeId + " ");
- sb.append("I/O Device: " + ioDeviceNum + " ");
+ sb.append("Partition: " + partition + " ");
sb.append("IDX Name: " + idxName + " ");
sb.append("isLSMComponentFile : " + lsmComponentFile + " ");
sb.append("Dataverse: " + dataverse);
@@ -185,7 +156,7 @@
return LSNByteOffset;
}
- public void setLSNByteOffset(long lSNByteOffset) {
- LSNByteOffset = lSNByteOffset;
+ public int getPartition() {
+ return partition;
}
}
\ No newline at end of file
diff --git a/asterix-replication/src/main/java/org/apache/asterix/replication/storage/ReplicaResourcesManager.java b/asterix-replication/src/main/java/org/apache/asterix/replication/storage/ReplicaResourcesManager.java
index 3e3043c..b9f7506 100644
--- a/asterix-replication/src/main/java/org/apache/asterix/replication/storage/ReplicaResourcesManager.java
+++ b/asterix-replication/src/main/java/org/apache/asterix/replication/storage/ReplicaResourcesManager.java
@@ -38,91 +38,65 @@
import java.util.logging.Level;
import java.util.logging.Logger;
+import org.apache.asterix.common.cluster.ClusterPartition;
+import org.apache.asterix.common.config.AsterixMetadataProperties;
import org.apache.asterix.common.replication.IReplicaResourcesManager;
+import org.apache.asterix.common.utils.StoragePathUtil;
+import org.apache.asterix.metadata.utils.SplitsAndConstraintsUtil;
+import org.apache.asterix.om.util.AsterixClusterProperties;
import org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository;
+import org.apache.commons.io.FileUtils;
import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.api.io.IODeviceHandle;
-import org.apache.hyracks.storage.am.common.util.IndexFileNameUtil;
+import org.apache.hyracks.storage.common.file.ILocalResourceRepository;
import org.apache.hyracks.storage.common.file.LocalResource;
public class ReplicaResourcesManager implements IReplicaResourcesManager {
private static final Logger LOGGER = Logger.getLogger(ReplicaResourcesManager.class.getName());
- private final String[] mountPoints;
- private final int numIODevices;
- private static final String REPLICA_FOLDER_SUFFIX = "_replica";
- private final String replicationStorageFolder;
- public final String localStorageFolder;
- private final String localNodeID;
public final static String LSM_COMPONENT_MASK_SUFFIX = "_mask";
private final static String REPLICA_INDEX_LSN_MAP_NAME = ".LSN_MAP";
public static final long REPLICA_INDEX_CREATION_LSN = -1;
private final AtomicLong lastMinRemoteLSN;
+ private final PersistentLocalResourceRepository localRepository;
+ private final Map<String, ClusterPartition[]> nodePartitions;
- public ReplicaResourcesManager(List<IODeviceHandle> devices, String localStorageFolder, String localNodeID,
- String replicationStorageFolder) throws HyracksDataException {
- numIODevices = devices.size();
- this.mountPoints = new String[numIODevices];
- for (int i = 0; i < numIODevices; i++) {
- String mountPoint = devices.get(i).getPath().getPath();
- File mountPointDir = new File(mountPoint);
- if (!mountPointDir.exists()) {
- throw new HyracksDataException(mountPointDir.getAbsolutePath() + " doesn't exist.");
- }
- if (!mountPoint.endsWith(System.getProperty("file.separator"))) {
- mountPoints[i] = new String(mountPoint + System.getProperty("file.separator"));
- } else {
- mountPoints[i] = new String(mountPoint);
- }
- }
- this.localStorageFolder = localStorageFolder;
- this.localNodeID = localNodeID;
- this.replicationStorageFolder = replicationStorageFolder;
+ public ReplicaResourcesManager(ILocalResourceRepository localRepository,
+ AsterixMetadataProperties metadataProperties) {
+ this.localRepository = (PersistentLocalResourceRepository) localRepository;
+ nodePartitions = metadataProperties.getNodePartitions();
lastMinRemoteLSN = new AtomicLong(-1);
}
- @Override
- public String getLocalStorageFolder() {
- return localStorageFolder;
- }
-
- private String getReplicaStorageFolder(String replicaId, int IODeviceNum) {
- if (replicaId.equals(localNodeID)) {
- return mountPoints[IODeviceNum] + localStorageFolder;
- } else {
- return mountPoints[IODeviceNum] + replicationStorageFolder + File.separator + replicaId
- + REPLICA_FOLDER_SUFFIX;
- }
- }
-
- public void deleteRemoteFile(AsterixLSMIndexFileProperties afp) throws IOException {
- String indexPath = getIndexPath(afp.getNodeId(), afp.getIoDeviceNum(), afp.getDataverse(), afp.getIdxName());
+ public void deleteIndexFile(LSMIndexFileProperties afp) {
+ String indexPath = getIndexPath(afp);
if (indexPath != null) {
if (afp.isLSMComponentFile()) {
String backupFilePath = indexPath + File.separator + afp.getFileName();
//delete file
File destFile = new File(backupFilePath);
- if (destFile.exists()) {
- destFile.delete();
- }
+ FileUtils.deleteQuietly(destFile);
} else {
//delete index files
indexPath = indexPath.substring(0, indexPath.lastIndexOf(File.separator));
- AsterixFilesUtil.deleteFolder(indexPath);
+ FileUtils.deleteQuietly(new File(indexPath));
}
}
}
- public String getIndexPath(String replicaId, int IODeviceNum, String dataverse, String dataset) {
- //mounting point/backupNodeId_replica/Dataverse/Dataset/device_id_#/
- String remoteIndexFolderPath = getReplicaStorageFolder(replicaId, IODeviceNum) + File.separator + dataverse
- + File.separator + dataset + File.separator + IndexFileNameUtil.IO_DEVICE_NAME_PREFIX + IODeviceNum;
- Path path = Paths.get(remoteIndexFolderPath);
+ public String getIndexPath(LSMIndexFileProperties fileProperties) {
+ fileProperties.splitFileName();
+ //get partition path in this node
+ String partitionPath = localRepository.getPartitionPath(fileProperties.getPartition());
+ //get index path
+ String indexPath = SplitsAndConstraintsUtil.getIndexPath(partitionPath, fileProperties.getPartition(),
+ fileProperties.getDataverse(), fileProperties.getIdxName());
+
+ Path path = Paths.get(indexPath);
if (!Files.exists(path)) {
- File indexFolder = new File(remoteIndexFolderPath);
+ File indexFolder = new File(indexPath);
indexFolder.mkdirs();
}
- return remoteIndexFolderPath;
+ return indexPath;
}
public void initializeReplicaIndexLSNMap(String indexPath, long currentLSN) throws IOException {
@@ -144,14 +118,10 @@
//remove mask to mark component as valid
String maskPath = lsmComponentProperties.getMaskPath(this);
Path path = Paths.get(maskPath);
-
- if (Files.exists(path)) {
- File maskFile = new File(maskPath);
- maskFile.delete();
- }
+ Files.deleteIfExists(path);
//add component LSN to the index LSNs map
- HashMap<Long, Long> lsnMap = getReplicaIndexLSNMap(lsmComponentProperties.getReplicaComponentPath(this));
+ Map<Long, Long> lsnMap = getReplicaIndexLSNMap(lsmComponentProperties.getReplicaComponentPath(this));
lsnMap.put(lsmComponentProperties.getOriginalLSN(), lsmComponentProperties.getReplicaLSN());
//update map on disk
@@ -159,73 +129,11 @@
}
- public List<String> getResourcesForReplica(String nodeId) throws HyracksDataException {
- List<String> resourcesList = new ArrayList<String>();
- String rootFolder;
- for (int i = 0; i < numIODevices; i++) {
- rootFolder = getReplicaStorageFolder(nodeId, i);
- File rootDirFile = new File(rootFolder);
- if (!rootDirFile.exists()) {
- continue;
- }
-
- File[] dataverseFileList = rootDirFile.listFiles();
- for (File dataverseFile : dataverseFileList) {
- if (dataverseFile.isDirectory()) {
- File[] indexFileList = dataverseFile.listFiles();
- if (indexFileList != null) {
- for (File indexFile : indexFileList) {
- if (indexFile.isDirectory()) {
- File[] ioDevicesList = indexFile.listFiles();
- if (ioDevicesList != null) {
- for (File ioDeviceFile : ioDevicesList) {
- if (ioDeviceFile.isDirectory()) {
- File[] metadataFiles = ioDeviceFile.listFiles(LSM_INDEX_FILES_FILTER);
- if (metadataFiles != null) {
- for (File metadataFile : metadataFiles) {
- resourcesList.add(metadataFile.getAbsolutePath());
- }
- }
- }
- }
- }
- }
- }
- }
- }
- }
- }
- return resourcesList;
- }
-
- public Set<File> getReplicaIndexes(String replicaId) throws HyracksDataException {
+ public Set<File> getReplicaIndexes(String replicaId) {
Set<File> remoteIndexesPaths = new HashSet<File>();
- for (int i = 0; i < numIODevices; i++) {
- String rootReplicaFolder = getReplicaStorageFolder(replicaId, i);
- File rootDirFile = new File(rootReplicaFolder);
- if (!rootDirFile.exists()) {
- continue;
- }
- File[] dataverseFileList = rootDirFile.listFiles();
- for (File dataverseFile : dataverseFileList) {
- if (dataverseFile.isDirectory()) {
- File[] indexFileList = dataverseFile.listFiles();
- if (indexFileList != null) {
- for (File indexFile : indexFileList) {
- if (indexFile.isDirectory()) {
- File[] ioDevicesList = indexFile.listFiles();
- if (ioDevicesList != null) {
- for (File ioDeviceFile : ioDevicesList) {
- if (ioDeviceFile.isDirectory()) {
- remoteIndexesPaths.add(ioDeviceFile);
- }
- }
- }
- }
- }
- }
- }
- }
+ ClusterPartition[] partitions = nodePartitions.get(replicaId);
+ for (ClusterPartition partition : partitions) {
+ remoteIndexesPaths.addAll(getPartitionIndexes(partition.getPartitionId()));
}
return remoteIndexesPaths;
}
@@ -237,41 +145,60 @@
}
long minRemoteLSN = Long.MAX_VALUE;
for (String replica : replicaIds) {
- try {
- //for every index in replica
- Set<File> remoteIndexes = getReplicaIndexes(replica);
- for (File indexFolder : remoteIndexes) {
- //read LSN map
- try {
- //get max LSN per index
- long remoteIndexMaxLSN = getReplicaIndexMaxLSN(indexFolder);
+ //for every index in replica
+ Set<File> remoteIndexes = getReplicaIndexes(replica);
+ for (File indexFolder : remoteIndexes) {
+ //read LSN map
+ try {
+ //get max LSN per index
+ long remoteIndexMaxLSN = getReplicaIndexMaxLSN(indexFolder);
- //get min of all maximums
- minRemoteLSN = Math.min(minRemoteLSN, remoteIndexMaxLSN);
- } catch (IOException e) {
- LOGGER.log(Level.INFO, indexFolder.getAbsolutePath() + " Couldn't read LSN map for index "
- + indexFolder);
- continue;
- }
+ //get min of all maximums
+ minRemoteLSN = Math.min(minRemoteLSN, remoteIndexMaxLSN);
+ } catch (IOException e) {
+ LOGGER.log(Level.INFO,
+ indexFolder.getAbsolutePath() + " Couldn't read LSN map for index " + indexFolder);
+ continue;
}
- } catch (HyracksDataException e) {
- e.printStackTrace();
}
}
lastMinRemoteLSN.set(minRemoteLSN);
return minRemoteLSN;
}
- public HashMap<Long, String> getLaggingReplicaIndexesId2PathMap(String replicaId, long targetLSN)
- throws IOException {
- HashMap<Long, String> laggingReplicaIndexes = new HashMap<Long, String>();
+ @Override
+ public long getPartitionsMinLSN(Integer[] partitions) {
+ long minRemoteLSN = Long.MAX_VALUE;
+ for (Integer partition : partitions) {
+ //for every index in replica
+ Set<File> remoteIndexes = getPartitionIndexes(partition);
+ for (File indexFolder : remoteIndexes) {
+ //read LSN map
+ try {
+ //get max LSN per index
+ long remoteIndexMaxLSN = getReplicaIndexMaxLSN(indexFolder);
+
+ //get min of all maximums
+ minRemoteLSN = Math.min(minRemoteLSN, remoteIndexMaxLSN);
+ } catch (IOException e) {
+ LOGGER.log(Level.INFO,
+ indexFolder.getAbsolutePath() + " Couldn't read LSN map for index " + indexFolder);
+ continue;
+ }
+ }
+ }
+ return minRemoteLSN;
+ }
+
+ public Map<Long, String> getLaggingReplicaIndexesId2PathMap(String replicaId, long targetLSN) throws IOException {
+ Map<Long, String> laggingReplicaIndexes = new HashMap<Long, String>();
try {
//for every index in replica
Set<File> remoteIndexes = getReplicaIndexes(replicaId);
for (File indexFolder : remoteIndexes) {
if (getReplicaIndexMaxLSN(indexFolder) < targetLSN) {
- File localResource = new File(indexFolder + File.separator
- + PersistentLocalResourceRepository.METADATA_FILE_NAME);
+ File localResource = new File(
+ indexFolder + File.separator + PersistentLocalResourceRepository.METADATA_FILE_NAME);
LocalResource resource = PersistentLocalResourceRepository.readLocalResource(localResource);
laggingReplicaIndexes.put(resource.getResourceId(), indexFolder.getAbsolutePath());
}
@@ -286,7 +213,7 @@
private long getReplicaIndexMaxLSN(File indexFolder) throws IOException {
long remoteIndexMaxLSN = 0;
//get max LSN per index
- HashMap<Long, Long> lsnMap = getReplicaIndexLSNMap(indexFolder.getAbsolutePath());
+ Map<Long, Long> lsnMap = getReplicaIndexLSNMap(indexFolder.getAbsolutePath());
if (lsnMap != null) {
for (Long lsn : lsnMap.values()) {
remoteIndexMaxLSN = Math.max(remoteIndexMaxLSN, lsn);
@@ -296,7 +223,7 @@
return remoteIndexMaxLSN;
}
- public void cleanInvalidLSMComponents(String replicaId) throws HyracksDataException {
+ public void cleanInvalidLSMComponents(String replicaId) {
//for every index in replica
Set<File> remoteIndexes = getReplicaIndexes(replicaId);
for (File remoteIndexFile : remoteIndexes) {
@@ -312,7 +239,7 @@
}
}
- private void deleteLSMComponentFilesForMask(File maskFile) {
+ private static void deleteLSMComponentFilesForMask(File maskFile) {
String lsmComponentTimeStamp = maskFile.getName().substring(0,
maskFile.getName().length() - LSM_COMPONENT_MASK_SUFFIX.length());
File indexFolder = maskFile.getParentFile();
@@ -325,78 +252,92 @@
}
}
- @SuppressWarnings("unchecked")
- public synchronized HashMap<Long, Long> getReplicaIndexLSNMap(String indexPath) throws IOException {
- FileInputStream fis = null;
- ObjectInputStream oisFromFis = null;
- try {
- fis = new FileInputStream(indexPath + File.separator + REPLICA_INDEX_LSN_MAP_NAME);
- oisFromFis = new ObjectInputStream(fis);
+ @SuppressWarnings({ "unchecked" })
+ public synchronized Map<Long, Long> getReplicaIndexLSNMap(String indexPath) throws IOException {
+ try (FileInputStream fis = new FileInputStream(indexPath + File.separator + REPLICA_INDEX_LSN_MAP_NAME);
+ ObjectInputStream oisFromFis = new ObjectInputStream(fis)) {
Map<Long, Long> lsnMap = null;
try {
lsnMap = (Map<Long, Long>) oisFromFis.readObject();
} catch (ClassNotFoundException e) {
e.printStackTrace();
}
- return (HashMap<Long, Long>) lsnMap;
- } finally {
- if (oisFromFis != null) {
- oisFromFis.close();
- }
- if (oisFromFis == null && fis != null) {
- fis.close();
- }
+ return lsnMap;
}
}
- public synchronized void updateReplicaIndexLSNMap(String indexPath, HashMap<Long, Long> lsnMap) throws IOException {
- FileOutputStream fos = null;
- ObjectOutputStream oosToFos = null;
- try {
- fos = new FileOutputStream(indexPath + File.separator + REPLICA_INDEX_LSN_MAP_NAME);
- oosToFos = new ObjectOutputStream(fos);
+ public synchronized void updateReplicaIndexLSNMap(String indexPath, Map<Long, Long> lsnMap) throws IOException {
+ try (FileOutputStream fos = new FileOutputStream(indexPath + File.separator + REPLICA_INDEX_LSN_MAP_NAME);
+ ObjectOutputStream oosToFos = new ObjectOutputStream(fos)) {
oosToFos.writeObject(lsnMap);
oosToFos.flush();
lastMinRemoteLSN.set(-1);
- } finally {
- if (oosToFos != null) {
- oosToFos.close();
- }
- if (oosToFos == null && fos != null) {
- fos.close();
+ }
+ }
+
+ /**
+ * @param partition
+ * @return Set of file references to each index in the partition
+ */
+ public Set<File> getPartitionIndexes(int partition) {
+ Set<File> partitionIndexes = new HashSet<File>();
+ String storageDirName = AsterixClusterProperties.INSTANCE.getStorageDirectoryName();
+ String partitionStoragePath = localRepository.getPartitionPath(partition)
+ + StoragePathUtil.prepareStoragePartitionPath(storageDirName, partition);
+ File partitionRoot = new File(partitionStoragePath);
+ if (partitionRoot.exists() && partitionRoot.isDirectory()) {
+ File[] dataverseFileList = partitionRoot.listFiles();
+ if (dataverseFileList != null) {
+ for (File dataverseFile : dataverseFileList) {
+ if (dataverseFile.isDirectory()) {
+ File[] indexFileList = dataverseFile.listFiles();
+ if (indexFileList != null) {
+ for (File indexFile : indexFileList) {
+ partitionIndexes.add(indexFile);
+ }
+ }
+ }
+ }
}
}
+ return partitionIndexes;
+ }
+
+ /**
+ * @param partition
+ * @return Absolute paths to all partition files
+ */
+ public List<String> getPartitionIndexesFiles(int partition) {
+ List<String> partitionFiles = new ArrayList<String>();
+ Set<File> partitionIndexes = getPartitionIndexes(partition);
+ for (File indexDir : partitionIndexes) {
+ if (indexDir.isDirectory()) {
+ File[] indexFiles = indexDir.listFiles(LSM_INDEX_FILES_FILTER);
+ if (indexFiles != null) {
+ for (File file : indexFiles) {
+ partitionFiles.add(file.getAbsolutePath());
+ }
+ }
+ }
+ }
+ return partitionFiles;
}
private static final FilenameFilter LSM_COMPONENTS_MASKS_FILTER = new FilenameFilter() {
public boolean accept(File dir, String name) {
- if (name.endsWith(LSM_COMPONENT_MASK_SUFFIX)) {
- return true;
- } else {
- return false;
- }
+ return name.endsWith(LSM_COMPONENT_MASK_SUFFIX);
}
};
private static final FilenameFilter LSM_COMPONENTS_NON_MASKS_FILTER = new FilenameFilter() {
public boolean accept(File dir, String name) {
- if (!name.endsWith(LSM_COMPONENT_MASK_SUFFIX)) {
- return true;
- } else {
- return false;
- }
+ return !name.endsWith(LSM_COMPONENT_MASK_SUFFIX);
}
};
private static final FilenameFilter LSM_INDEX_FILES_FILTER = new FilenameFilter() {
public boolean accept(File dir, String name) {
- if (name.equalsIgnoreCase(PersistentLocalResourceRepository.METADATA_FILE_NAME)) {
- return true;
- } else if (!name.startsWith(".")) {
- return true;
- } else {
- return false;
- }
+ return name.equalsIgnoreCase(PersistentLocalResourceRepository.METADATA_FILE_NAME) || !name.startsWith(".");
}
};
}
\ No newline at end of file
diff --git a/asterix-replication/src/test/resources/data/fbu.adm b/asterix-replication/src/test/resources/data/fbu.adm
new file mode 100644
index 0000000..7e99ea4
--- /dev/null
+++ b/asterix-replication/src/test/resources/data/fbu.adm
@@ -0,0 +1,10 @@
+{"id":1,"alias":"Margarita","name":"MargaritaStoddard","user-since":datetime("2012-08-20T10:10:00"),"friend-ids":{{2,3,6,10}},"employment":[{"organization-name":"Codetechno","start-date":date("2006-08-06")}]}
+{"id":2,"alias":"Isbel","name":"IsbelDull","user-since":datetime("2011-01-22T10:10:00"),"friend-ids":{{1,4}},"employment":[{"organization-name":"Hexviafind","start-date":date("2010-04-27")}]}
+{"id":3,"alias":"Emory","name":"EmoryUnk","user-since":datetime("2012-07-10T10:10:00"),"friend-ids":{{1,5,8,9}},"employment":[{"organization-name":"geomedia","start-date":date("2010-06-17"),"end-date":date("2010-01-26")}]}
+{"id":4,"alias":"Nicholas","name":"NicholasStroh","user-since":datetime("2010-12-27T10:10:00"),"friend-ids":{{2}},"employment":[{"organization-name":"Zamcorporation","start-date":date("2010-06-08")}]}
+{"id":5,"alias":"Von","name":"VonKemble","user-since":datetime("2010-01-05T10:10:00"),"friend-ids":{{3,6,10}},"employment":[{"organization-name":"Kongreen","start-date":date("2010-11-27")}]}
+{"id":6,"alias":"Willis","name":"WillisWynne","user-since":datetime("2005-01-17T10:10:00"),"friend-ids":{{1,3,7}},"employment":[{"organization-name":"jaydax","start-date":date("2009-05-15")}]}
+{"id":7,"alias":"Suzanna","name":"SuzannaTillson","user-since":datetime("2012-08-07T10:10:00"),"friend-ids":{{6}},"employment":[{"organization-name":"Labzatron","start-date":date("2011-04-19")}]}
+{"id":8,"alias":"Nila","name":"NilaMilliron","user-since":datetime("2008-01-01T10:10:00"),"friend-ids":{{3}},"employment":[{"organization-name":"Plexlane","start-date":date("2010-02-28")}]}
+{"id":9,"alias":"Woodrow","name":"WoodrowNehling","user-since":datetime("2005-09-20T10:10:00"),"friend-ids":{{3,10}},"employment":[{"organization-name":"Zuncan","start-date":date("2003-04-22"),"end-date":date("2009-12-13")}]}
+{"id":10,"alias":"Bram","name":"BramHatch","user-since":datetime("2010-10-16T10:10:00"),"friend-ids":{{1,5,9}},"employment":[{"organization-name":"physcane","start-date":date("2007-06-05"),"end-date":date("2011-11-05")}]}
diff --git a/asterix-replication/src/test/resources/scripts/delete_storage.sh b/asterix-replication/src/test/resources/scripts/delete_storage.sh
new file mode 100755
index 0000000..129030b
--- /dev/null
+++ b/asterix-replication/src/test/resources/scripts/delete_storage.sh
@@ -0,0 +1,18 @@
+#!/usr/bin/env bash
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+rm -rf ~/asterix/*
diff --git a/asterix-replication/src/test/resources/scripts/kill_cc_and_nc.sh b/asterix-replication/src/test/resources/scripts/kill_cc_and_nc.sh
new file mode 100755
index 0000000..2582713
--- /dev/null
+++ b/asterix-replication/src/test/resources/scripts/kill_cc_and_nc.sh
@@ -0,0 +1,18 @@
+#!/usr/bin/env bash
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+jps | awk '{if ($2 == "NCDriver" || $2 == "CCDriver") print $1;}' | xargs -n 1 kill -9
diff --git a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
index 52fd806..9f59148 100644
--- a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
+++ b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
@@ -32,9 +32,12 @@
import java.util.HashSet;
import java.util.List;
import java.util.Set;
+import java.util.SortedMap;
import java.util.logging.Level;
import java.util.logging.Logger;
+import org.apache.asterix.common.cluster.ClusterPartition;
+import org.apache.asterix.common.config.AsterixMetadataProperties;
import org.apache.asterix.common.replication.AsterixReplicationJob;
import org.apache.asterix.common.replication.IReplicationManager;
import org.apache.commons.io.FileUtils;
@@ -63,10 +66,13 @@
private IReplicationManager replicationManager;
private boolean isReplicationEnabled = false;
private Set<String> filesToBeReplicated;
+ private final SortedMap<Integer, ClusterPartition> clusterPartitions;
- public PersistentLocalResourceRepository(List<IODeviceHandle> devices, String nodeId) throws HyracksDataException {
+ public PersistentLocalResourceRepository(List<IODeviceHandle> devices, String nodeId,
+ AsterixMetadataProperties metadataProperties) throws HyracksDataException {
mountPoints = new String[devices.size()];
this.nodeId = nodeId;
+ this.clusterPartitions = metadataProperties.getClusterPartitions();
for (int i = 0; i < mountPoints.length; i++) {
String mountPoint = devices.get(i).getPath().getPath();
File mountPointDir = new File(mountPoint);
@@ -156,37 +162,18 @@
resourceCache.put(resource.getResourcePath(), resource);
}
- FileOutputStream fos = null;
- ObjectOutputStream oosToFos = null;
-
- try {
- fos = new FileOutputStream(resourceFile);
- oosToFos = new ObjectOutputStream(fos);
+ try (FileOutputStream fos = new FileOutputStream(resourceFile);
+ ObjectOutputStream oosToFos = new ObjectOutputStream(fos)) {
oosToFos.writeObject(resource);
oosToFos.flush();
} catch (IOException e) {
throw new HyracksDataException(e);
- } finally {
- if (oosToFos != null) {
- try {
- oosToFos.close();
- } catch (IOException e) {
- throw new HyracksDataException(e);
- }
- }
- if (oosToFos == null && fos != null) {
- try {
- fos.close();
- } catch (IOException e) {
- throw new HyracksDataException(e);
- }
- }
+ }
- //if replication enabled, send resource metadata info to remote nodes
- if (isReplicationEnabled && resource.getResourceId() != STORAGE_LOCAL_RESOURCE_ID) {
- String filePath = getFileName(resource.getResourcePath(), resource.getResourceId());
- createReplicationJob(ReplicationOperation.REPLICATE, filePath);
- }
+ //if replication enabled, send resource metadata info to remote nodes
+ if (isReplicationEnabled && resource.getResourceId() != STORAGE_LOCAL_RESOURCE_ID) {
+ String filePath = getFileName(resource.getResourcePath(), resource.getResourceId());
+ createReplicationJob(ReplicationOperation.REPLICATE, filePath);
}
}
@@ -304,31 +291,12 @@
}
public static LocalResource readLocalResource(File file) throws HyracksDataException {
- FileInputStream fis = null;
- ObjectInputStream oisFromFis = null;
-
- try {
- fis = new FileInputStream(file);
- oisFromFis = new ObjectInputStream(fis);
+ try (FileInputStream fis = new FileInputStream(file);
+ ObjectInputStream oisFromFis = new ObjectInputStream(fis)) {
LocalResource resource = (LocalResource) oisFromFis.readObject();
return resource;
} catch (Exception e) {
throw new HyracksDataException(e);
- } finally {
- if (oisFromFis != null) {
- try {
- oisFromFis.close();
- } catch (IOException e) {
- throw new HyracksDataException(e);
- }
- }
- if (oisFromFis == null && fis != null) {
- try {
- fis.close();
- } catch (IOException e) {
- throw new HyracksDataException(e);
- }
- }
}
}
@@ -427,4 +395,13 @@
}
return storageRootDir;
}
+
+ /**
+ * @param partition
+ * @return The partition local path on this NC.
+ */
+ public String getPartitionPath(int partition) {
+ //currently each partition is replicated on the same IO device number on all NCs.
+ return mountPoints[clusterPartitions.get(partition).getIODeviceNum()];
+ }
}
\ No newline at end of file
diff --git a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepositoryFactory.java b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepositoryFactory.java
index b6bb7dc..e79a0d3 100644
--- a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepositoryFactory.java
+++ b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepositoryFactory.java
@@ -18,6 +18,7 @@
*/
package org.apache.asterix.transaction.management.resource;
+import org.apache.asterix.common.config.AsterixMetadataProperties;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.io.IIOManager;
import org.apache.hyracks.storage.common.file.ILocalResourceRepository;
@@ -26,14 +27,17 @@
public class PersistentLocalResourceRepositoryFactory implements ILocalResourceRepositoryFactory {
private final IIOManager ioManager;
private final String nodeId;
+ private final AsterixMetadataProperties metadataProperties;
- public PersistentLocalResourceRepositoryFactory(IIOManager ioManager, String nodeId) {
+ public PersistentLocalResourceRepositoryFactory(IIOManager ioManager, String nodeId,
+ AsterixMetadataProperties metadataProperties) {
this.ioManager = ioManager;
this.nodeId = nodeId;
+ this.metadataProperties = metadataProperties;
}
@Override
public ILocalResourceRepository createRepository() throws HyracksDataException {
- return new PersistentLocalResourceRepository(ioManager.getIODevices(), nodeId);
+ return new PersistentLocalResourceRepository(ioManager.getIODevices(), nodeId, metadataProperties);
}
}
\ No newline at end of file
diff --git a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/RecoveryManager.java b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/RecoveryManager.java
index 701e529..8fe75f2 100644
--- a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/RecoveryManager.java
+++ b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/RecoveryManager.java
@@ -44,13 +44,11 @@
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
-import java.util.SortedMap;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.apache.asterix.common.api.IDatasetLifecycleManager;
import org.apache.asterix.common.api.ILocalResourceMetadata;
-import org.apache.asterix.common.cluster.ClusterPartition;
import org.apache.asterix.common.config.IAsterixPropertiesProvider;
import org.apache.asterix.common.exceptions.ACIDException;
import org.apache.asterix.common.ioopcallbacks.AbstractLSMIOOperationCallback;
@@ -72,14 +70,11 @@
import org.apache.asterix.transaction.management.service.transaction.TransactionSubsystem;
import org.apache.commons.io.FileUtils;
import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.api.io.IIOManager;
import org.apache.hyracks.api.lifecycle.ILifeCycleComponent;
-import org.apache.hyracks.control.nc.io.IOManager;
import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
import org.apache.hyracks.storage.am.common.api.IIndex;
import org.apache.hyracks.storage.am.common.impls.NoOpOperationCallback;
import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation;
-import org.apache.hyracks.storage.am.common.util.IndexFileNameUtil;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndex;
@@ -300,7 +295,6 @@
//get datasetLifeCycleManager
IDatasetLifecycleManager datasetLifecycleManager = txnSubsystem.getAsterixAppRuntimeContextProvider()
.getDatasetLifecycleManager();
- IIOManager ioManager = appRuntimeContext.getIOManager();
ILocalResourceRepository localResourceRepository = appRuntimeContext.getLocalResourceRepository();
Map<Long, LocalResource> resourcesMap = ((PersistentLocalResourceRepository) localResourceRepository)
.loadAndGetAllResources();
@@ -507,13 +501,10 @@
//#. get indexLifeCycleManager
IAsterixAppRuntimeContextProvider appRuntimeContext = txnSubsystem.getAsterixAppRuntimeContextProvider();
- IIOManager ioManager = appRuntimeContext.getIOManager();
- SortedMap<Integer, ClusterPartition> clusterPartitions = ((IAsterixPropertiesProvider) appRuntimeContext
- .getAppContext()).getMetadataProperties().getClusterPartitions();
IDatasetLifecycleManager datasetLifecycleManager = appRuntimeContext.getDatasetLifecycleManager();
- ILocalResourceRepository localResourceRepository = appRuntimeContext.getLocalResourceRepository();
- Map<Long, LocalResource> resourcesMap = ((PersistentLocalResourceRepository) localResourceRepository)
- .loadAndGetAllResources();
+ PersistentLocalResourceRepository localResourceRepository = (PersistentLocalResourceRepository) appRuntimeContext
+ .getLocalResourceRepository();
+ Map<Long, LocalResource> resourcesMap = localResourceRepository.loadAndGetAllResources();
//#. set log reader to the lowWaterMarkLsn again.
for (int i = 0; i < remoteLogs.size(); i++) {
logRecord = remoteLogs.get(i);
@@ -561,16 +552,11 @@
//get index instance from IndexLifeCycleManager
//if index is not registered into IndexLifeCycleManager,
//create the index using LocalMetadata stored in LocalResourceRepository
- //get the resource path relative to this node
- int resourcePartition = localResource.getPartition();
- //get partition io device id
- //NOTE:
- //currently we store all partition in the same IO device in all nodes. If this changes,
- //this needs to be updated to find the IO device in which the partition is stored in this local node.
- int ioDevice = clusterPartitions.get(resourcePartition).getIODeviceNum();
- String resourceAbsolutePath = ioManager
- .getAbsoluteFileRef(ioDevice, localResource.getResourceName()).getFile()
- .getAbsolutePath();
+ //get partition path in this node
+ String partitionIODevicePath = localResourceRepository
+ .getPartitionPath(localResource.getPartition());
+ String resourceAbsolutePath = partitionIODevicePath + File.separator
+ + localResource.getResourceName();
localResource.setResourcePath(resourceAbsolutePath);
index = (ILSMIndex) datasetLifecycleManager.getIndex(resourceAbsolutePath);
if (index == null) {
@@ -578,8 +564,8 @@
localResourceMetadata = (ILocalResourceMetadata) localResource.getResourceObject();
index = localResourceMetadata.createIndexInstance(appRuntimeContext,
resourceAbsolutePath, localResource.getPartition());
- datasetLifecycleManager.register(localResource.getResourceName(), index);
- datasetLifecycleManager.open(localResource.getResourceName());
+ datasetLifecycleManager.register(resourceAbsolutePath, index);
+ datasetLifecycleManager.open(resourceAbsolutePath);
//#. get maxDiskLastLSN
ILSMIndex lsmIndex = index;
@@ -1099,6 +1085,242 @@
}
}
+ //TODO (mhubail) RecoveryManager has three methods that perform logs REDO based on different parameters.
+ //They need to be refactored to use partitions only once the log format includes partition id.
+ @Override
+ public synchronized void replayPartitionsLogs(Integer[] partitions, long lowWaterMarkLSN, String failedNode)
+ throws IOException, ACIDException {
+ //delete any recovery files from previous failed recovery attempts
+ deleteRecoveryTemporaryFiles();
+
+ int updateLogCount = 0;
+ int entityCommitLogCount = 0;
+ int jobCommitLogCount = 0;
+ int redoCount = 0;
+ int abortLogCount = 0;
+ int jobId = -1;
+
+ state = SystemState.RECOVERING;
+ LOGGER.log(Level.INFO, "[RecoveryMgr] starting recovery ...");
+
+ Set<Integer> winnerJobSet = new HashSet<Integer>();
+ jobId2WinnerEntitiesMap = new HashMap<>();
+
+ TxnId tempKeyTxnId = new TxnId(-1, -1, -1, null, -1, false);
+ JobEntityCommits jobEntityWinners = null;
+ //#. read checkpoint file and set lowWaterMark where anaylsis and redo start
+ long readableSmallestLSN = logMgr.getReadableSmallestLSN();
+ if (lowWaterMarkLSN < readableSmallestLSN) {
+ lowWaterMarkLSN = readableSmallestLSN;
+ }
+ //-------------------------------------------------------------------------
+ // [ analysis phase ]
+ // - collect all committed Lsn
+ //-------------------------------------------------------------------------
+ LOGGER.log(Level.INFO, "[RecoveryMgr] in analysis phase");
+ IAsterixAppRuntimeContextProvider appRuntimeContext = txnSubsystem.getAsterixAppRuntimeContextProvider();
+ //get datasetLifeCycleManager
+ IDatasetLifecycleManager datasetLifecycleManager = txnSubsystem.getAsterixAppRuntimeContextProvider()
+ .getDatasetLifecycleManager();
+ PersistentLocalResourceRepository localResourceRepository = (PersistentLocalResourceRepository) appRuntimeContext
+ .getLocalResourceRepository();
+ Map<Long, LocalResource> resourcesMap = localResourceRepository.loadAndGetAllResources();
+ Map<Long, Long> resourceId2MaxLSNMap = new HashMap<Long, Long>();
+
+ //#. set log reader to the lowWaterMarkLsn
+ ILogReader logReader = logMgr.getLogReader(true);
+ ILogRecord logRecord = null;
+ try {
+ logReader.initializeScan(lowWaterMarkLSN);
+ logRecord = logReader.next();
+ while (logRecord != null) {
+ if (IS_DEBUG_MODE) {
+ LOGGER.info(logRecord.getLogRecordForDisplay());
+ }
+ //TODO update this partitions once the log format is updated to include partitons
+ if (logRecord.getNodeId().equals(failedNode)) {
+ switch (logRecord.getLogType()) {
+ case LogType.UPDATE:
+ updateLogCount++;
+ break;
+ case LogType.JOB_COMMIT:
+ jobId = logRecord.getJobId();
+ winnerJobSet.add(jobId);
+ if (jobId2WinnerEntitiesMap.containsKey(jobId)) {
+ jobEntityWinners = jobId2WinnerEntitiesMap.get(jobId);
+ //to delete any spilled files as well
+ jobEntityWinners.clear();
+ jobId2WinnerEntitiesMap.remove(jobId);
+ }
+ jobCommitLogCount++;
+ break;
+ case LogType.ENTITY_COMMIT:
+ jobId = logRecord.getJobId();
+ if (!jobId2WinnerEntitiesMap.containsKey(jobId)) {
+ jobEntityWinners = new JobEntityCommits(jobId);
+ if (needToFreeMemory()) {
+ //if we don't have enough memory for one more job, we will force all jobs to spill their cached entities to disk.
+ //This could happen only when we have many jobs with small number of records and none of them have job commit.
+ freeJobsCachedEntities(jobId);
+ }
+ jobId2WinnerEntitiesMap.put(jobId, jobEntityWinners);
+ } else {
+ jobEntityWinners = jobId2WinnerEntitiesMap.get(jobId);
+ }
+ jobEntityWinners.add(logRecord);
+ entityCommitLogCount++;
+ break;
+ case LogType.ABORT:
+ abortLogCount++;
+ break;
+ case LogType.FLUSH:
+ break;
+ default:
+ throw new ACIDException("Unsupported LogType: " + logRecord.getLogType());
+ }
+ }
+ logRecord = logReader.next();
+ }
+
+ //prepare winners for search after analysis is done to flush anything remaining in memory to disk.
+ for (JobEntityCommits winners : jobId2WinnerEntitiesMap.values()) {
+ winners.prepareForSearch();
+ }
+ //-------------------------------------------------------------------------
+ // [ redo phase ]
+ // - redo if
+ // 1) The TxnId is committed && --> guarantee durability
+ // 2) lsn > maxDiskLastLsn of the index --> guarantee idempotence
+ //-------------------------------------------------------------------------
+ LOGGER.info("[RecoveryMgr] in redo phase");
+
+ long resourceId;
+ long maxDiskLastLsn;
+ long LSN = -1;
+ ILSMIndex index = null;
+ LocalResource localResource = null;
+ ILocalResourceMetadata localResourceMetadata = null;
+ boolean foundWinner = false;
+ //set log reader to the lowWaterMarkLsn again.
+ logReader.initializeScan(lowWaterMarkLSN);
+ logRecord = logReader.next();
+ while (logRecord != null) {
+ if (IS_DEBUG_MODE) {
+ LOGGER.info(logRecord.getLogRecordForDisplay());
+ }
+ //TODO update this to check for partitions instead of node id once the log format is updated to include partitions
+ if (logRecord.getNodeId().equals(failedNode)) {
+ LSN = logRecord.getLSN();
+ jobId = logRecord.getJobId();
+ foundWinner = false;
+ switch (logRecord.getLogType()) {
+ case LogType.UPDATE:
+ if (winnerJobSet.contains(jobId)) {
+ foundWinner = true;
+ } else if (jobId2WinnerEntitiesMap.containsKey(jobId)) {
+ jobEntityWinners = jobId2WinnerEntitiesMap.get(jobId);
+ tempKeyTxnId.setTxnId(jobId, logRecord.getDatasetId(), logRecord.getPKHashValue(),
+ logRecord.getPKValue(), logRecord.getPKValueSize());
+ if (jobEntityWinners.containsEntityCommitForTxnId(LSN, tempKeyTxnId)) {
+ foundWinner = true;
+ }
+ }
+ if (foundWinner) {
+ resourceId = logRecord.getResourceId();
+ localResource = resourcesMap.get(resourceId);
+ /*******************************************************************
+ * [Notice]
+ * -> Issue
+ * Delete index may cause a problem during redo.
+ * The index operation to be redone couldn't be redone because the corresponding index
+ * may not exist in NC due to the possible index drop DDL operation.
+ * -> Approach
+ * Avoid the problem during redo.
+ * More specifically, the problem will be detected when the localResource of
+ * the corresponding index is retrieved, which will end up with 'null'.
+ * If null is returned, then just go and process the next
+ * log record.
+ *******************************************************************/
+ if (localResource == null) {
+ logRecord = logReader.next();
+ continue;
+ }
+ /*******************************************************************/
+
+ //get index instance from IndexLifeCycleManager
+ //if index is not registered into IndexLifeCycleManager,
+ //create the index using LocalMetadata stored in LocalResourceRepository
+ //get partition path in this node
+ String partitionIODevicePath = localResourceRepository
+ .getPartitionPath(localResource.getPartition());
+ String resourceAbsolutePath = partitionIODevicePath + File.separator
+ + localResource.getResourceName();
+ localResource.setResourcePath(resourceAbsolutePath);
+ index = (ILSMIndex) datasetLifecycleManager.getIndex(resourceAbsolutePath);
+ if (index == null) {
+ //#. create index instance and register to indexLifeCycleManager
+ localResourceMetadata = (ILocalResourceMetadata) localResource.getResourceObject();
+ index = localResourceMetadata.createIndexInstance(appRuntimeContext,
+ resourceAbsolutePath, localResource.getPartition());
+ datasetLifecycleManager.register(resourceAbsolutePath, index);
+ datasetLifecycleManager.open(resourceAbsolutePath);
+
+ //#. get maxDiskLastLSN
+ ILSMIndex lsmIndex = index;
+ try {
+ maxDiskLastLsn = ((AbstractLSMIOOperationCallback) lsmIndex
+ .getIOOperationCallback())
+ .getComponentLSN(lsmIndex.getImmutableComponents());
+ } catch (HyracksDataException e) {
+ datasetLifecycleManager.close(resourceAbsolutePath);
+ throw e;
+ }
+
+ //#. set resourceId and maxDiskLastLSN to the map
+ resourceId2MaxLSNMap.put(resourceId, maxDiskLastLsn);
+ } else {
+ maxDiskLastLsn = resourceId2MaxLSNMap.get(resourceId);
+ }
+
+ if (LSN > maxDiskLastLsn) {
+ redo(logRecord, datasetLifecycleManager);
+ redoCount++;
+ }
+ }
+ break;
+ case LogType.JOB_COMMIT:
+ case LogType.ENTITY_COMMIT:
+ case LogType.ABORT:
+ case LogType.FLUSH:
+ //do nothing
+ break;
+ default:
+ throw new ACIDException("Unsupported LogType: " + logRecord.getLogType());
+ }
+ }
+ logRecord = logReader.next();
+ }
+
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("[RecoveryMgr] recovery is completed.");
+ LOGGER.info("[RecoveryMgr's recovery log count] update/entityCommit/jobCommit/abort/redo = "
+ + updateLogCount + "/" + entityCommitLogCount + "/" + jobCommitLogCount + "/" + abortLogCount
+ + "/" + redoCount);
+ }
+ } finally {
+ logReader.close();
+
+ //close all indexes
+ Set<Long> resourceIdList = resourceId2MaxLSNMap.keySet();
+ for (long r : resourceIdList) {
+ datasetLifecycleManager.close(resourcesMap.get(r).getResourcePath());
+ }
+
+ //delete any recovery files after completing recovery
+ deleteRecoveryTemporaryFiles();
+ }
+ }
+
private class JobEntityCommits {
private static final String PARTITION_FILE_NAME_SEPARATOR = "_";
private final int jobId;
@@ -1145,7 +1367,7 @@
/**
* Call this method when no more entity commits will be added to this job.
- *
+ *
* @throws IOException
*/
public void prepareForSearch() throws IOException {
@@ -1182,7 +1404,6 @@
*/
public ArrayList<File> getCandidiatePartitions(long logLSN) {
ArrayList<File> candidiatePartitions = new ArrayList<File>();
-
for (File partition : jobEntitCommitOnDiskPartitionsFiles) {
String partitionName = partition.getName();
//entity commit log must come after the update log, therefore, consider only partitions with max LSN > logLSN