Asterix NCs Fault Tolerance
This change includes the following:
- Adapt replication to unique partitions storage.
- Implement auto failover for failing NCs.
- Implement auto failover for metadata node.
- Fix for ASTERIXDB-1251 using proper error message.
- Basic replication test cases using vagrant virtual cluster for:
1. LSM bulkload components replication.
2. LSM Memory components replication and recovery.
3. Metadata node takeover.
These test cases will be part of the cluster test profile.
Change-Id: Ice26d980912a315fcb3efdd571d6ce88717cfea4
Reviewed-on: https://asterix-gerrit.ics.uci.edu/573
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Till Westmann <tillw@apache.org>
Reviewed-by: abdullah alamoudi <bamousaa@gmail.com>
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/api/IAsterixAppRuntimeContext.java b/asterix-common/src/main/java/org/apache/asterix/common/api/IAsterixAppRuntimeContext.java
index 3386252..975180b 100644
--- a/asterix-common/src/main/java/org/apache/asterix/common/api/IAsterixAppRuntimeContext.java
+++ b/asterix-common/src/main/java/org/apache/asterix/common/api/IAsterixAppRuntimeContext.java
@@ -19,6 +19,7 @@
package org.apache.asterix.common.api;
import java.io.IOException;
+import java.rmi.RemoteException;
import java.util.List;
import java.util.concurrent.Executor;
@@ -89,4 +90,17 @@
public IReplicationChannel getReplicationChannel();
public void initializeResourceIdFactory() throws HyracksDataException;
+
+ /**
+ * Exports the metadata node to the metadata RMI port.
+ * @throws RemoteException
+ */
+ public void exportMetadataNodeStub() throws RemoteException;
+
+ /**
+ * Initializes the metadata node and bootstraps the metadata.
+ * @param newUniverse
+ * @throws Exception
+ */
+ public void initializeMetadata(boolean newUniverse) throws Exception;
}
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/cluster/IGlobalRecoveryMaanger.java b/asterix-common/src/main/java/org/apache/asterix/common/cluster/IGlobalRecoveryMaanger.java
new file mode 100644
index 0000000..48b1e73
--- /dev/null
+++ b/asterix-common/src/main/java/org/apache/asterix/common/cluster/IGlobalRecoveryMaanger.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.cluster;
+
+import org.apache.asterix.common.api.IClusterEventsSubscriber;
+
+public interface IGlobalRecoveryMaanger extends IClusterEventsSubscriber {
+
+ /**
+ * Starts the global recovery process if the cluster state changed to ACTIVE.
+ */
+ public void startGlobalRecovery();
+}
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixMetadataProperties.java b/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixMetadataProperties.java
index 8e2c4e7..473a163 100644
--- a/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixMetadataProperties.java
+++ b/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixMetadataProperties.java
@@ -65,4 +65,8 @@
public SortedMap<Integer, ClusterPartition> getClusterPartitions() {
return accessor.getClusterPartitions();
}
+
+ public Map<String, String> getTransactionLogDirs() {
+ return accessor.getTransactionLogDirs();
+ }
}
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixReplicationProperties.java b/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixReplicationProperties.java
index 1ef7e3e..fa5b503 100644
--- a/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixReplicationProperties.java
+++ b/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixReplicationProperties.java
@@ -36,7 +36,6 @@
private static int REPLICATION_TIME_OUT_DEFAULT = 15;
private static final String NODE_IP_ADDRESS_DEFAULT = "127.0.0.1";
- private static final String REPLICATION_STORE_DEFAULT = "asterix-replication";
private final String NODE_NAME_PREFIX;
private final Cluster cluster;
@@ -102,8 +101,8 @@
}
if (nodeIndex == -1) {
- LOGGER.log(Level.WARNING, "Could not find node " + getRealCluserNodeID(nodeId)
- + " in cluster configurations");
+ LOGGER.log(Level.WARNING,
+ "Could not find node " + getRealCluserNodeID(nodeId) + " in cluster configurations");
return null;
}
@@ -179,13 +178,6 @@
return replicaIds;
}
- public String getReplicationStore() {
- if (cluster != null) {
- return cluster.getDataReplication().getReplicationStore();
- }
- return REPLICATION_STORE_DEFAULT;
- }
-
public int getReplicationFactor() {
if (cluster != null) {
if (cluster.getDataReplication() == null || cluster.getDataReplication().getReplicationFactor() == null) {
@@ -202,5 +194,4 @@
}
return REPLICATION_TIME_OUT_DEFAULT;
}
-
}
\ No newline at end of file
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/dataflow/IAsterixApplicationContextInfo.java b/asterix-common/src/main/java/org/apache/asterix/common/dataflow/IAsterixApplicationContextInfo.java
index 8dc3efe..a5cd72b 100644
--- a/asterix-common/src/main/java/org/apache/asterix/common/dataflow/IAsterixApplicationContextInfo.java
+++ b/asterix-common/src/main/java/org/apache/asterix/common/dataflow/IAsterixApplicationContextInfo.java
@@ -18,6 +18,7 @@
*/
package org.apache.asterix.common.dataflow;
+import org.apache.asterix.common.cluster.IGlobalRecoveryMaanger;
import org.apache.hyracks.api.application.ICCApplicationContext;
import org.apache.hyracks.storage.am.common.api.IIndexLifecycleManagerProvider;
import org.apache.hyracks.storage.common.IStorageManagerInterface;
@@ -48,4 +49,6 @@
* @return ICCApplicationContext implementation instance
*/
public ICCApplicationContext getCCApplicationContext();
+
+ public IGlobalRecoveryMaanger getGlobalRecoveryManager();
}
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/messaging/TakeoverMetadataNodeRequestMessage.java b/asterix-common/src/main/java/org/apache/asterix/common/messaging/TakeoverMetadataNodeRequestMessage.java
new file mode 100644
index 0000000..78b86a8
--- /dev/null
+++ b/asterix-common/src/main/java/org/apache/asterix/common/messaging/TakeoverMetadataNodeRequestMessage.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.messaging;
+
+public class TakeoverMetadataNodeRequestMessage extends AbstractApplicationMessage {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public ApplicationMessageType getMessageType() {
+ return ApplicationMessageType.TAKEOVER_METADATA_NODE_REQUEST;
+ }
+}
\ No newline at end of file
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/messaging/TakeoverMetadataNodeResponseMessage.java b/asterix-common/src/main/java/org/apache/asterix/common/messaging/TakeoverMetadataNodeResponseMessage.java
new file mode 100644
index 0000000..78f7429
--- /dev/null
+++ b/asterix-common/src/main/java/org/apache/asterix/common/messaging/TakeoverMetadataNodeResponseMessage.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.messaging;
+
+public class TakeoverMetadataNodeResponseMessage extends AbstractApplicationMessage {
+
+ private static final long serialVersionUID = 1L;
+ private final String nodeId;
+
+ public TakeoverMetadataNodeResponseMessage(String nodeId) {
+ this.nodeId = nodeId;
+ }
+
+ @Override
+ public ApplicationMessageType getMessageType() {
+ return ApplicationMessageType.TAKEOVER_METADATA_NODE_RESPONSE;
+ }
+
+ public String getNodeId() {
+ return nodeId;
+ }
+}
\ No newline at end of file
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/messaging/TakeoverPartitionsRequestMessage.java b/asterix-common/src/main/java/org/apache/asterix/common/messaging/TakeoverPartitionsRequestMessage.java
new file mode 100644
index 0000000..abfa7d2
--- /dev/null
+++ b/asterix-common/src/main/java/org/apache/asterix/common/messaging/TakeoverPartitionsRequestMessage.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.messaging;
+
+public class TakeoverPartitionsRequestMessage extends AbstractApplicationMessage {
+
+ private static final long serialVersionUID = 1L;
+ private final Integer[] partitions;
+ private final String failedNode;
+ private final long requestId;
+ private final String nodeId;
+
+ public TakeoverPartitionsRequestMessage(long requestId, String nodeId, String failedNode,
+ Integer[] partitionsToTakeover) {
+ this.requestId = requestId;
+ this.nodeId = nodeId;
+ this.failedNode = failedNode;
+ this.partitions = partitionsToTakeover;
+ }
+
+ @Override
+ public ApplicationMessageType getMessageType() {
+ return ApplicationMessageType.TAKEOVER_PARTITIONS_REQUEST;
+ }
+
+ public Integer[] getPartitions() {
+ return partitions;
+ }
+
+ public long getRequestId() {
+ return requestId;
+ }
+
+ public String getFailedNode() {
+ return failedNode;
+ }
+
+ public String getNodeId() {
+ return nodeId;
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder();
+ sb.append("Request ID: " + requestId);
+ sb.append(" Node ID: " + nodeId);
+ sb.append(" Failed Node: " + failedNode);
+ sb.append(" Partitions: ");
+ for (Integer partitionId : partitions) {
+ sb.append(partitionId + ",");
+ }
+ //remove last comma
+ sb.charAt(sb.length() - 1);
+ return sb.toString();
+ }
+}
\ No newline at end of file
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/messaging/TakeoverPartitionsResponseMessage.java b/asterix-common/src/main/java/org/apache/asterix/common/messaging/TakeoverPartitionsResponseMessage.java
new file mode 100644
index 0000000..86eb3cb
--- /dev/null
+++ b/asterix-common/src/main/java/org/apache/asterix/common/messaging/TakeoverPartitionsResponseMessage.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.messaging;
+
+public class TakeoverPartitionsResponseMessage extends AbstractApplicationMessage {
+
+ private static final long serialVersionUID = 1L;
+ private final Integer[] partitions;
+ private final String nodeId;
+ private final long requestId;
+
+ public TakeoverPartitionsResponseMessage(long requestId, String nodeId, Integer[] partitionsToTakeover) {
+ this.requestId = requestId;
+ this.nodeId = nodeId;
+ this.partitions = partitionsToTakeover;
+ }
+
+ @Override
+ public ApplicationMessageType getMessageType() {
+ return ApplicationMessageType.TAKEOVER_PARTITIONS_RESPONSE;
+ }
+
+ public Integer[] getPartitions() {
+ return partitions;
+ }
+
+ public String getNodeId() {
+ return nodeId;
+ }
+
+ public long getRequestId() {
+ return requestId;
+ }
+}
\ No newline at end of file
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/IApplicationMessage.java b/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/IApplicationMessage.java
index 61ab7cd..57a0dae 100644
--- a/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/IApplicationMessage.java
+++ b/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/IApplicationMessage.java
@@ -26,7 +26,11 @@
RESOURCE_ID_REQUEST,
RESOURCE_ID_RESPONSE,
REPORT_MAX_RESOURCE_ID_REQUEST,
- REPORT_MAX_RESOURCE_ID_RESPONSE
+ REPORT_MAX_RESOURCE_ID_RESPONSE,
+ TAKEOVER_PARTITIONS_REQUEST,
+ TAKEOVER_PARTITIONS_RESPONSE,
+ TAKEOVER_METADATA_NODE_REQUEST,
+ TAKEOVER_METADATA_NODE_RESPONSE
}
public abstract ApplicationMessageType getMessageType();
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/ICCMessageBroker.java b/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/ICCMessageBroker.java
new file mode 100644
index 0000000..7dafbd5
--- /dev/null
+++ b/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/ICCMessageBroker.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.messaging.api;
+
+import org.apache.hyracks.api.messages.IMessageBroker;
+
+public interface ICCMessageBroker extends IMessageBroker {
+
+ /**
+ * Sends the passed message to the specified {@code nodeId}
+ * @param msg
+ * @param nodeId
+ * @throws Exception
+ */
+ public void sendApplicationMessageToNC(IApplicationMessage msg, String nodeId) throws Exception;
+}
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/replication/IRemoteRecoveryManager.java b/asterix-common/src/main/java/org/apache/asterix/common/replication/IRemoteRecoveryManager.java
index 63d29a0..ecc9494 100644
--- a/asterix-common/src/main/java/org/apache/asterix/common/replication/IRemoteRecoveryManager.java
+++ b/asterix-common/src/main/java/org/apache/asterix/common/replication/IRemoteRecoveryManager.java
@@ -18,8 +18,24 @@
*/
package org.apache.asterix.common.replication;
+import java.io.IOException;
+
+import org.apache.asterix.common.exceptions.ACIDException;
+
public interface IRemoteRecoveryManager {
+ /**
+ * Attempts to perform the remote recovery process from an active remote replica.
+ */
public void performRemoteRecovery();
+ /**
+ * Performs the partitions takeover process from the {@code failedNode}
+ * @param failedNode
+ * @param partitions
+ * @throws IOException
+ * @throws ACIDException
+ */
+ public void takeoverPartitons(String failedNode, Integer[] partitions) throws IOException, ACIDException;
+
}
\ No newline at end of file
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/replication/IReplicaResourcesManager.java b/asterix-common/src/main/java/org/apache/asterix/common/replication/IReplicaResourcesManager.java
index c796f37..f13d300 100644
--- a/asterix-common/src/main/java/org/apache/asterix/common/replication/IReplicaResourcesManager.java
+++ b/asterix-common/src/main/java/org/apache/asterix/common/replication/IReplicaResourcesManager.java
@@ -22,10 +22,15 @@
public interface IReplicaResourcesManager {
- public String getIndexPath(String backupNodeId, int IODeviceNum, String dataverse, String dataset);
-
- public String getLocalStorageFolder();
-
+ /**
+ * @param remoteNodes
+ * @return The minimum LSN of all indexes that belong to {@code remoteNodes}.
+ */
public long getMinRemoteLSN(Set<String> remoteNodes);
+ /**
+ * @param partitions
+ * @return the minimum LSN of all indexes that belong to {@code partitions}.
+ */
+ public long getPartitionsMinLSN(Integer[] partitions);
}
\ No newline at end of file
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/transactions/IRecoveryManager.java b/asterix-common/src/main/java/org/apache/asterix/common/transactions/IRecoveryManager.java
index 9ea9957..a2b7a82 100644
--- a/asterix-common/src/main/java/org/apache/asterix/common/transactions/IRecoveryManager.java
+++ b/asterix-common/src/main/java/org/apache/asterix/common/transactions/IRecoveryManager.java
@@ -111,4 +111,14 @@
* @throws HyracksDataException
*/
public long getLocalMinFirstLSN() throws HyracksDataException;
+
+ /**
+ * Replay the logs that belong to the passed {@code partitions} starting from the {@code lowWaterMarkLSN}
+ * @param partitions
+ * @param lowWaterMarkLSN
+ * @param failedNode
+ * @throws IOException
+ * @throws ACIDException
+ */
+ public void replayPartitionsLogs(Integer[] partitions, long lowWaterMarkLSN, String failedNode) throws IOException, ACIDException;
}
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/utils/StoragePathUtil.java b/asterix-common/src/main/java/org/apache/asterix/common/utils/StoragePathUtil.java
index acfb9d5..48e42bd 100644
--- a/asterix-common/src/main/java/org/apache/asterix/common/utils/StoragePathUtil.java
+++ b/asterix-common/src/main/java/org/apache/asterix/common/utils/StoragePathUtil.java
@@ -56,6 +56,14 @@
}
public static String prepareDataverseIndexName(String dataverseName, String datasetName, String idxName) {
- return dataverseName + File.separator + datasetName + StoragePathUtil.DATASET_INDEX_NAME_SEPARATOR + idxName;
+ return prepareDataverseIndexName(dataverseName, prepareFullIndexName(datasetName, idxName));
+ }
+
+ public static String prepareDataverseIndexName(String dataverseName, String fullIndexName) {
+ return dataverseName + File.separator + fullIndexName;
+ }
+
+ private static String prepareFullIndexName(String datasetName, String idxName) {
+ return (datasetName + DATASET_INDEX_NAME_SEPARATOR + idxName);
}
}
diff --git a/asterix-common/src/main/resources/schema/cluster.xsd b/asterix-common/src/main/resources/schema/cluster.xsd
index 872c959..e0605f0 100644
--- a/asterix-common/src/main/resources/schema/cluster.xsd
+++ b/asterix-common/src/main/resources/schema/cluster.xsd
@@ -47,7 +47,7 @@
<xs:element name="enabled" type="xs:boolean" />
<xs:element name="replication_port" type="xs:integer" />
<xs:element name="replication_factor" type="xs:integer" />
- <xs:element name="replication_store" type="xs:string" />
+ <xs:element name="auto_failover" type="xs:boolean" />
<xs:element name="replication_time_out" type="xs:integer" />
<!-- definition of complex elements -->
@@ -82,7 +82,7 @@
<xs:element ref="cl:enabled" />
<xs:element ref="cl:replication_port" />
<xs:element ref="cl:replication_factor" />
- <xs:element ref="cl:replication_store" />
+ <xs:element ref="cl:auto_failover" />
<xs:element ref="cl:replication_time_out" />
</xs:sequence>
</xs:complexType>