Remove Log-Based Remote Recovery
This change removes the code that supports log-based remote recovery.
Remote recovery was replaced by the failback process which depends on
copying LSM disk component instead of logs.
Change-Id: I86e3b5832b52207e36c8409a072ccbda564d78b5
Reviewed-on: https://asterix-gerrit.ics.uci.edu/871
Reviewed-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: abdullah alamoudi <bamousaa@gmail.com>
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java
index 643bb16..0a6d62d 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java
@@ -123,18 +123,13 @@
}
//do not attempt to perform remote recovery if this is a virtual NC
- if (replicationEnabled && !virtualNC) {
+ if (autoFailover && !virtualNC) {
if (systemState == SystemState.NEW_UNIVERSE || systemState == SystemState.CORRUPTED) {
- //Try to perform remote recovery
+ //Start failback process
IRemoteRecoveryManager remoteRecoveryMgr = runtimeContext.getRemoteRecoveryManager();
- if (autoFailover) {
- remoteRecoveryMgr.startFailbackProcess();
- systemState = SystemState.RECOVERING;
- pendingFailbackCompletion = true;
- } else {
- remoteRecoveryMgr.performRemoteRecovery();
- systemState = SystemState.HEALTHY;
- }
+ remoteRecoveryMgr.startFailbackProcess();
+ systemState = SystemState.RECOVERING;
+ pendingFailbackCompletion = true;
}
} else {
//recover if the system is corrupted by checking system state.
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/IRemoteRecoveryManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/IRemoteRecoveryManager.java
index 921fd37..a2738e8 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/IRemoteRecoveryManager.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/IRemoteRecoveryManager.java
@@ -25,12 +25,8 @@
public interface IRemoteRecoveryManager {
/**
- * Attempts to perform the remote recovery process from an active remote replica.
- */
- public void performRemoteRecovery();
-
- /**
* Performs the partitions takeover process from the {@code failedNode}
+ *
* @param failedNode
* @param partitions
* @throws IOException
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/IReplicationManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/IReplicationManager.java
index 3fc2af0..755fbbd 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/IReplicationManager.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/IReplicationManager.java
@@ -18,11 +18,9 @@
*/
package org.apache.asterix.common.replication;
-import java.io.File;
import java.io.IOException;
import java.util.Set;
-import org.apache.asterix.common.exceptions.ACIDException;
import org.apache.asterix.common.transactions.ILogRecord;
import org.apache.hyracks.api.replication.IIOReplicationManager;
@@ -46,23 +44,6 @@
public boolean hasBeenReplicated(ILogRecord logRecord);
/**
- * Requests txns logs from a remote replica.
- *
- * @param remoteReplicaId
- * The replica id to send the request to.
- * @param replicasDataToRecover
- * Get logs that belong to those replicas.
- * @param fromLSN
- * Low water mark for logs to be requested.
- * @param recoveryLogsFile
- * a temporary file to store the logs required for recovery
- * @throws IOException
- * @throws ACIDException
- */
- public void requestReplicaLogs(String remoteReplicaId, Set<String> replicasDataToRecover, long fromLSN,
- File recoveryLogsFile) throws IOException, ACIDException;
-
- /**
* Requests LSM components files from a remote replica.
*
* @param remoteReplicaId
@@ -127,16 +108,6 @@
public void reportReplicaEvent(ReplicaEvent event);
/**
- * Requests the current minimum LSN of a remote replica.
- *
- * @param replicaId
- * The replica to send the request to.
- * @return The returned minimum LSN from the remote replica.
- * @throws IOException
- */
- public long requestReplicaMinLSN(String replicaId) throws IOException;
-
- /**
* Sends a request to remote replicas to flush indexes that have LSN less than nonSharpCheckpointTargetLSN
*
* @param nonSharpCheckpointTargetLSN
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/LogRecord.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/LogRecord.java
index c35b155..a16eef8 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/LogRecord.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/LogRecord.java
@@ -560,10 +560,6 @@
//serialized node id String
serilizedSize += Integer.BYTES + nodeId.length();
}
- if (logSource == LogSource.REMOTE_RECOVERY) {
- //for LSN;
- serilizedSize += Long.BYTES;
- }
serilizedSize -= CHKSUM_LEN;
return serilizedSize;
}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/LogSource.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/LogSource.java
index 4f3d5df..75cf6ba 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/LogSource.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/LogSource.java
@@ -22,11 +22,9 @@
public static final byte LOCAL = 0;
public static final byte REMOTE = 1;
- public static final byte REMOTE_RECOVERY = 2;
private static final String STRING_LOCAL = "LOCAL";
private static final String STRING_REMOTE = "REMOTE";
- private static final String STRING_REMOTE_RECOVERY = "REMOTE_RECOVERY";
private static final String STRING_INVALID_LOG_SOURCE = "INVALID_LOG_SOURCE";
@@ -36,8 +34,6 @@
return STRING_LOCAL;
case REMOTE:
return STRING_REMOTE;
- case REMOTE_RECOVERY:
- return STRING_REMOTE_RECOVERY;
default:
return STRING_INVALID_LOG_SOURCE;
}
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/functions/ReplicationProtocol.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/functions/ReplicationProtocol.java
index 1ff6cc4..608e442 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/functions/ReplicationProtocol.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/functions/ReplicationProtocol.java
@@ -27,7 +27,6 @@
import java.nio.channels.SocketChannel;
import org.apache.asterix.common.replication.ReplicaEvent;
-import org.apache.asterix.common.transactions.ILogRecord;
import org.apache.asterix.replication.management.NetworkingUtil;
import org.apache.asterix.replication.storage.LSMComponentProperties;
import org.apache.asterix.replication.storage.LSMIndexFileProperties;
@@ -35,7 +34,7 @@
public class ReplicationProtocol {
/**
- * All replication messages start with ReplicationFunctions (4 bytes), then the length of the request in bytes
+ * All replication messages start with ReplicationRequestType (4 bytes), then the length of the request in bytes
*/
public static final String JOB_REPLICATION_ACK = "$";
@@ -48,9 +47,7 @@
* REPLICATE_FILE: replicate a file(s)
* DELETE_FILE: delete a file(s)
* GET_REPLICA_FILES: used during remote recovery to request lost LSM Components
- * GET_REPLICA_LOGS: used during remote recovery to request lost txn logs
* GET_REPLICA_MAX_LSN: used during remote recovery initialize a log manager LSN
- * GET_REPLICA_MIN_LSN: used during remote recovery to specify the low water mark per replica
* GOODBYE: used to notify replicas that the replication request has been completed
* REPLICA_EVENT: used to notify replicas about a remote replica split/merge.
* LSM_COMPONENT_PROPERTIES: used to send the properties of an LSM Component before its physical files are sent
@@ -62,9 +59,7 @@
REPLICATE_FILE,
DELETE_FILE,
GET_REPLICA_FILES,
- GET_REPLICA_LOGS,
GET_REPLICA_MAX_LSN,
- GET_REPLICA_MIN_LSN,
GOODBYE,
REPLICA_EVENT,
LSM_COMPONENT_PROPERTIES,
@@ -90,21 +85,20 @@
public static ByteBuffer writeLSMComponentPropertiesRequest(LSMComponentProperties lsmCompProp, ByteBuffer buffer)
throws IOException {
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
- DataOutputStream oos = new DataOutputStream(outputStream);
- lsmCompProp.serialize(oos);
- oos.close();
-
- int requestSize = REPLICATION_REQUEST_HEADER_SIZE + oos.size();
- if (buffer.capacity() < requestSize) {
- buffer = ByteBuffer.allocate(requestSize);
- } else {
- buffer.clear();
+ try (DataOutputStream oos = new DataOutputStream(outputStream)) {
+ lsmCompProp.serialize(oos);
+ int requestSize = REPLICATION_REQUEST_HEADER_SIZE + oos.size();
+ if (buffer.capacity() < requestSize) {
+ buffer = ByteBuffer.allocate(requestSize);
+ } else {
+ buffer.clear();
+ }
+ buffer.putInt(ReplicationRequestType.LSM_COMPONENT_PROPERTIES.ordinal());
+ buffer.putInt(oos.size());
+ buffer.put(outputStream.toByteArray());
+ buffer.flip();
+ return buffer;
}
- buffer.putInt(ReplicationRequestType.LSM_COMPONENT_PROPERTIES.ordinal());
- buffer.putInt(oos.size());
- buffer.put(outputStream.toByteArray());
- buffer.flip();
- return buffer;
}
public static ReplicationRequestType getRequestType(SocketChannel socketChannel, ByteBuffer byteBuffer)
@@ -136,47 +130,23 @@
return bb;
}
- public static void writeRemoteRecoveryLogRequest(ByteBuffer requestBuffer, ILogRecord logRecord) {
- requestBuffer.clear();
- //put request type (4 bytes)
- requestBuffer.putInt(ReplicationRequestType.REPLICATE_LOG.ordinal());
- //leave space for log size
- requestBuffer.position(requestBuffer.position() + Integer.BYTES);
- int logSize = logRecord.writeRemoteRecoveryLog(requestBuffer);
- //put request size (4 bytes)
- requestBuffer.putInt(4, logSize);
- requestBuffer.flip();
- }
-
- public static void writeReplicateLogRequest(ByteBuffer requestBuffer, byte[] serializedLog) {
- requestBuffer.clear();
- //put request type (4 bytes)
- requestBuffer.putInt(ReplicationRequestType.REPLICATE_LOG.ordinal());
- //length of the log
- requestBuffer.putInt(serializedLog.length);
- //the log itself
- requestBuffer.put(serializedLog);
- requestBuffer.flip();
- }
-
public static ByteBuffer writeFileReplicationRequest(ByteBuffer requestBuffer, LSMIndexFileProperties afp,
ReplicationRequestType requestType) throws IOException {
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
- DataOutputStream oos = new DataOutputStream(outputStream);
- afp.serialize(oos);
- oos.close();
-
- int requestSize = REPLICATION_REQUEST_HEADER_SIZE + oos.size();
- if (requestBuffer.capacity() < requestSize) {
- requestBuffer = ByteBuffer.allocate(requestSize);
- } else {
- requestBuffer.clear();
+ try (DataOutputStream oos = new DataOutputStream(outputStream)) {
+ afp.serialize(oos);
+ int requestSize = REPLICATION_REQUEST_HEADER_SIZE + oos.size();
+ if (requestBuffer.capacity() < requestSize) {
+ requestBuffer = ByteBuffer.allocate(requestSize);
+ } else {
+ requestBuffer.clear();
+ }
+ requestBuffer.putInt(requestType.ordinal());
+ requestBuffer.putInt(oos.size());
+ requestBuffer.put(outputStream.toByteArray());
+ requestBuffer.flip();
+ return requestBuffer;
}
- requestBuffer.putInt(requestType.ordinal());
- requestBuffer.putInt(oos.size());
- requestBuffer.put(outputStream.toByteArray());
- requestBuffer.flip();
- return requestBuffer;
}
public static LSMIndexFileProperties readFileReplicationRequest(ByteBuffer buffer) throws IOException {
@@ -185,44 +155,17 @@
return LSMIndexFileProperties.create(dis);
}
- public static ReplicaLogsRequest readReplicaLogsRequest(ByteBuffer buffer) throws IOException {
- ByteArrayInputStream bais = new ByteArrayInputStream(buffer.array(), buffer.position(), buffer.limit());
- DataInputStream dis = new DataInputStream(bais);
- return ReplicaLogsRequest.create(dis);
- }
-
- public static ByteBuffer writeGetReplicaLogsRequest(ByteBuffer requestBuffer, ReplicaLogsRequest request)
- throws IOException {
- ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
- DataOutputStream oos = new DataOutputStream(outputStream);
- request.serialize(oos);
- oos.close();
-
- int requestSize = REPLICATION_REQUEST_HEADER_SIZE + oos.size();
- if (requestBuffer.capacity() < requestSize) {
- requestBuffer = ByteBuffer.allocate(requestSize);
- } else {
- requestBuffer.clear();
- }
- requestBuffer.putInt(ReplicationRequestType.GET_REPLICA_LOGS.ordinal());
- requestBuffer.putInt(oos.size());
- requestBuffer.put(outputStream.toByteArray());
- requestBuffer.flip();
- return requestBuffer;
- }
-
public static ByteBuffer writeReplicaEventRequest(ReplicaEvent event) throws IOException {
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
- DataOutputStream oos = new DataOutputStream(outputStream);
- event.serialize(oos);
- oos.close();
-
- ByteBuffer buffer = ByteBuffer.allocate(REPLICATION_REQUEST_HEADER_SIZE + oos.size());
- buffer.putInt(ReplicationRequestType.REPLICA_EVENT.ordinal());
- buffer.putInt(oos.size());
- buffer.put(outputStream.toByteArray());
- buffer.flip();
- return buffer;
+ try (DataOutputStream oos = new DataOutputStream(outputStream)) {
+ event.serialize(oos);
+ ByteBuffer buffer = ByteBuffer.allocate(REPLICATION_REQUEST_HEADER_SIZE + oos.size());
+ buffer.putInt(ReplicationRequestType.REPLICA_EVENT.ordinal());
+ buffer.putInt(oos.size());
+ buffer.put(outputStream.toByteArray());
+ buffer.flip();
+ return buffer;
+ }
}
public static ReplicaEvent readReplicaEventRequest(ByteBuffer buffer) throws IOException {
@@ -255,21 +198,20 @@
public static ByteBuffer writeGetReplicaIndexFlushRequest(ByteBuffer buffer, ReplicaIndexFlushRequest request)
throws IOException {
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
- DataOutputStream oos = new DataOutputStream(outputStream);
- request.serialize(oos);
- oos.close();
-
- int requestSize = REPLICATION_REQUEST_HEADER_SIZE + oos.size();
- if (buffer.capacity() < requestSize) {
- buffer = ByteBuffer.allocate(requestSize);
- } else {
- buffer.clear();
+ try (DataOutputStream oos = new DataOutputStream(outputStream)) {
+ request.serialize(oos);
+ int requestSize = REPLICATION_REQUEST_HEADER_SIZE + oos.size();
+ if (buffer.capacity() < requestSize) {
+ buffer = ByteBuffer.allocate(requestSize);
+ } else {
+ buffer.clear();
+ }
+ buffer.putInt(ReplicationRequestType.FLUSH_INDEX.ordinal());
+ buffer.putInt(oos.size());
+ buffer.put(outputStream.toByteArray());
+ buffer.flip();
+ return buffer;
}
- buffer.putInt(ReplicationRequestType.FLUSH_INDEX.ordinal());
- buffer.putInt(oos.size());
- buffer.put(outputStream.toByteArray());
- buffer.flip();
- return buffer;
}
public static ReplicaFilesRequest readReplicaFileRequest(ByteBuffer buffer) throws IOException {
@@ -290,12 +232,6 @@
requestBuffer.flip();
}
- public static void writeMinLSNRequest(ByteBuffer requestBuffer) {
- requestBuffer.clear();
- requestBuffer.putInt(ReplicationRequestType.GET_REPLICA_MIN_LSN.ordinal());
- requestBuffer.flip();
- }
-
public static int getJobIdFromLogAckMessage(String msg) {
return Integer.parseInt(msg.substring((msg.indexOf(JOB_REPLICATION_ACK) + 1)));
}
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationChannel.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationChannel.java
index 331116b..a152f6c 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationChannel.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationChannel.java
@@ -56,15 +56,12 @@
import org.apache.asterix.common.replication.ReplicaEvent;
import org.apache.asterix.common.transactions.IAsterixAppRuntimeContextProvider;
import org.apache.asterix.common.transactions.ILogManager;
-import org.apache.asterix.common.transactions.ILogReader;
-import org.apache.asterix.common.transactions.ILogRecord;
import org.apache.asterix.common.transactions.LogRecord;
import org.apache.asterix.common.transactions.LogSource;
import org.apache.asterix.common.transactions.LogType;
import org.apache.asterix.common.utils.TransactionUtil;
import org.apache.asterix.replication.functions.ReplicaFilesRequest;
import org.apache.asterix.replication.functions.ReplicaIndexFlushRequest;
-import org.apache.asterix.replication.functions.ReplicaLogsRequest;
import org.apache.asterix.replication.functions.ReplicationProtocol;
import org.apache.asterix.replication.functions.ReplicationProtocol.ReplicationRequestType;
import org.apache.asterix.replication.logging.RemoteLogMapping;
@@ -77,6 +74,8 @@
import org.apache.hyracks.storage.am.common.api.IMetaDataPageManager;
import org.apache.hyracks.storage.am.lsm.common.api.LSMOperationType;
import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndex;
+import org.apache.hyracks.util.StorageUtil;
+import org.apache.hyracks.util.StorageUtil.StorageUnit;
/**
* This class is used to receive and process replication requests from remote replicas or replica events from CC
@@ -93,7 +92,7 @@
private final IReplicationManager replicationManager;
private final AsterixReplicationProperties replicationProperties;
private final IAsterixAppRuntimeContextProvider asterixAppRuntimeContextProvider;
- private final static int INTIAL_BUFFER_SIZE = 4000; //4KB
+ private final static int INTIAL_BUFFER_SIZE = StorageUtil.getSizeInBytes(4, StorageUnit.KILOBYTE);
private final LinkedBlockingQueue<LSMComponentLSNSyncTask> lsmComponentRemoteLSN2LocalLSNMappingTaskQ;
private final LinkedBlockingQueue<LogRecord> pendingNotificationRemoteLogsQ;
private final Map<String, LSMComponentProperties> lsmComponentId2PropertiesMap;
@@ -195,18 +194,6 @@
}
}
- private static void sendRemoteRecoveryLog(ILogRecord logRecord, SocketChannel socketChannel, ByteBuffer outBuffer)
- throws IOException {
- logRecord.setLogSource(LogSource.REMOTE_RECOVERY);
- if (logRecord.getSerializedLogSize() > outBuffer.capacity()) {
- int requestSize = logRecord.getSerializedLogSize() + ReplicationProtocol.REPLICATION_REQUEST_HEADER_SIZE;
- outBuffer = ByteBuffer.allocate(requestSize);
- }
- //set log source to REMOTE_RECOVERY to avoid re-logging on the recipient side
- ReplicationProtocol.writeRemoteRecoveryLogRequest(outBuffer, logRecord);
- NetworkingUtil.transferBufferToChannel(socketChannel, outBuffer);
- }
-
/**
* A replication thread is created per received replication request.
*/
@@ -249,15 +236,9 @@
case GET_REPLICA_MAX_LSN:
handleGetReplicaMaxLSN();
break;
- case GET_REPLICA_MIN_LSN:
- handleGetReplicaMinLSN();
- break;
case GET_REPLICA_FILES:
handleGetReplicaFiles();
break;
- case GET_REPLICA_LOGS:
- handleGetRemoteLogs();
- break;
case FLUSH_INDEX:
handleFlushIndex();
break;
@@ -373,15 +354,6 @@
NetworkingUtil.transferBufferToChannel(socketChannel, outBuffer);
}
- private void handleGetReplicaMinLSN() throws IOException {
- long minLSN = asterixAppRuntimeContextProvider.getAppContext().getTransactionSubsystem()
- .getRecoveryManager().getMinFirstLSN();
- outBuffer.clear();
- outBuffer.putLong(minLSN);
- outBuffer.flip();
- NetworkingUtil.transferBufferToChannel(socketChannel, outBuffer);
- }
-
private void handleGetReplicaFiles() throws IOException {
inBuffer = ReplicationProtocol.readRequest(socketChannel, inBuffer);
ReplicaFilesRequest request = ReplicationProtocol.readReplicaFileRequest(inBuffer);
@@ -426,75 +398,6 @@
ReplicationProtocol.sendGoodbye(socketChannel);
}
- private void handleGetRemoteLogs() throws IOException, ACIDException {
- inBuffer = ReplicationProtocol.readRequest(socketChannel, inBuffer);
- ReplicaLogsRequest request = ReplicationProtocol.readReplicaLogsRequest(inBuffer);
-
- Set<String> replicaIds = request.getReplicaIds();
- //get list of partitions that belong to the replicas in the request
- Set<Integer> requestedPartitions = new HashSet<>();
- Map<String, ClusterPartition[]> nodePartitions = ((IAsterixPropertiesProvider) asterixAppRuntimeContextProvider
- .getAppContext()).getMetadataProperties().getNodePartitions();
- for (String replicaId : replicaIds) {
- //get replica partitions
- ClusterPartition[] replicaPatitions = nodePartitions.get(replicaId);
- for (ClusterPartition partition : replicaPatitions) {
- requestedPartitions.add(partition.getPartitionId());
- }
- }
-
- long fromLSN = request.getFromLSN();
- long minLocalFirstLSN = asterixAppRuntimeContextProvider.getAppContext().getTransactionSubsystem()
- .getRecoveryManager().getLocalMinFirstLSN();
-
- //get Log reader
- ILogReader logReader = logManager.getLogReader(true);
- try {
- if (fromLSN < logManager.getReadableSmallestLSN()) {
- fromLSN = logManager.getReadableSmallestLSN();
- }
-
- logReader.initializeScan(fromLSN);
- ILogRecord logRecord = logReader.next();
- Set<Integer> requestedPartitionsJobs = new HashSet<>();
- while (logRecord != null) {
- //we should not send any local log which has already been converted to disk component
- if (logRecord.getLogSource() == LogSource.LOCAL && logRecord.getLSN() < minLocalFirstLSN) {
- logRecord = logReader.next();
- continue;
- }
- //send only logs that belong to the partitions of the request and required for recovery
- switch (logRecord.getLogType()) {
- case LogType.UPDATE:
- case LogType.ENTITY_COMMIT:
- case LogType.UPSERT_ENTITY_COMMIT:
- if (requestedPartitions.contains(logRecord.getResourcePartition())) {
- sendRemoteRecoveryLog(logRecord, socketChannel, outBuffer);
- requestedPartitionsJobs.add(logRecord.getJobId());
- }
- break;
- case LogType.JOB_COMMIT:
- if (requestedPartitionsJobs.contains(logRecord.getJobId())) {
- sendRemoteRecoveryLog(logRecord, socketChannel, outBuffer);
- requestedPartitionsJobs.remove(logRecord.getJobId());
- }
- break;
- case LogType.ABORT:
- case LogType.FLUSH:
- break;
- default:
- throw new ACIDException("Unsupported LogType: " + logRecord.getLogType());
- }
- logRecord = logReader.next();
- }
- } finally {
- logReader.close();
- }
-
- //send goodbye (end of logs)
- ReplicationProtocol.sendGoodbye(socketChannel);
- }
-
private void handleReplicaEvent() throws IOException {
inBuffer = ReplicationProtocol.readRequest(socketChannel, inBuffer);
ReplicaEvent event = ReplicationProtocol.readReplicaEventRequest(inBuffer);
@@ -691,4 +594,4 @@
}
}
}
-}
+}
\ No newline at end of file
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationManager.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationManager.java
index d983a62..ee872a5 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationManager.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationManager.java
@@ -55,7 +55,6 @@
import org.apache.asterix.common.config.AsterixReplicationProperties;
import org.apache.asterix.common.config.IAsterixPropertiesProvider;
import org.apache.asterix.common.dataflow.AsterixLSMIndexUtil;
-import org.apache.asterix.common.exceptions.ACIDException;
import org.apache.asterix.common.replication.AsterixReplicationJob;
import org.apache.asterix.common.replication.IReplicaResourcesManager;
import org.apache.asterix.common.replication.IReplicationManager;
@@ -65,12 +64,10 @@
import org.apache.asterix.common.transactions.IAsterixAppRuntimeContextProvider;
import org.apache.asterix.common.transactions.ILogManager;
import org.apache.asterix.common.transactions.ILogRecord;
-import org.apache.asterix.common.transactions.LogRecord;
import org.apache.asterix.common.transactions.LogType;
import org.apache.asterix.event.schema.cluster.Node;
import org.apache.asterix.replication.functions.ReplicaFilesRequest;
import org.apache.asterix.replication.functions.ReplicaIndexFlushRequest;
-import org.apache.asterix.replication.functions.ReplicaLogsRequest;
import org.apache.asterix.replication.functions.ReplicationProtocol;
import org.apache.asterix.replication.functions.ReplicationProtocol.ReplicationRequestType;
import org.apache.asterix.replication.logging.ReplicationLogBuffer;
@@ -1039,91 +1036,6 @@
}
}
- //Recovery Method
- @Override
- public long requestReplicaMinLSN(String selectedReplicaId) throws IOException {
- long minLSN = 0;
- ReplicationProtocol.writeMinLSNRequest(dataBuffer);
- try (SocketChannel socketChannel = getReplicaSocket(selectedReplicaId);) {
- //transfer request
- NetworkingUtil.transferBufferToChannel(socketChannel, dataBuffer);
-
- //read response
- NetworkingUtil.readBytes(socketChannel, dataBuffer, Long.BYTES);
- minLSN = dataBuffer.getLong();
-
- //send goodbye
- ReplicationProtocol.sendGoodbye(socketChannel);
- }
-
- return minLSN;
- }
-
- //Recovery Method
- @Override
- public void requestReplicaLogs(String remoteNode, Set<String> nodeIdsToRecoverFor, long fromLSN,
- File recoveryLogsFile) throws IOException, ACIDException {
- ReplicaLogsRequest request = new ReplicaLogsRequest(nodeIdsToRecoverFor, fromLSN);
- dataBuffer = ReplicationProtocol.writeGetReplicaLogsRequest(dataBuffer, request);
- try (SocketChannel socketChannel = getReplicaSocket(remoteNode)) {
- //transfer request
- NetworkingUtil.transferBufferToChannel(socketChannel, dataBuffer);
-
- //read response type
- ReplicationRequestType responseType = ReplicationProtocol.getRequestType(socketChannel, dataBuffer);
-
- ILogRecord logRecord = new LogRecord();
- Set<Integer> nodePartitions = ((PersistentLocalResourceRepository) asterixAppRuntimeContextProvider
- .getLocalResourceRepository()).getNodeOrignalPartitions();
- Set<Integer> nodePartitionsJobs = new HashSet<>();
- try (RandomAccessFile raf = new RandomAccessFile(recoveryLogsFile, "rw");
- FileChannel fileChannel = raf.getChannel();) {
- while (responseType != ReplicationRequestType.GOODBYE) {
- dataBuffer = ReplicationProtocol.readRequest(socketChannel, dataBuffer);
- logRecord.readRemoteLog(dataBuffer, true);
- switch (logRecord.getLogType()) {
- case LogType.UPDATE:
- case LogType.ENTITY_COMMIT:
- case LogType.UPSERT_ENTITY_COMMIT:
- if (nodePartitions.contains(logRecord.getResourcePartition())) {
- nodePartitionsJobs.add(logRecord.getJobId());
- dataBuffer.flip();
- while (dataBuffer.hasRemaining()) {
- //store log in temp file to replay it for recovery
- fileChannel.write(dataBuffer);
- }
- } else {
- //send log to log manager as a remote recovery log
- logManager.log(logRecord);
- }
- break;
- case LogType.JOB_COMMIT:
- if (nodePartitionsJobs.contains(logRecord.getJobId())) {
- nodePartitionsJobs.remove(logRecord.getJobId());
- dataBuffer.flip();
- while (dataBuffer.hasRemaining()) {
- //store log in temp file to replay it for recovery
- fileChannel.write(dataBuffer);
- }
- break;
- }
- logManager.log(logRecord);
- break;
- case LogType.ABORT:
- case LogType.FLUSH:
- break;
- default:
- throw new ACIDException("Unsupported LogType: " + logRecord.getLogType());
- }
- responseType = ReplicationProtocol.getRequestType(socketChannel, dataBuffer);
- }
- }
-
- //send goodbye
- ReplicationProtocol.sendGoodbye(socketChannel);
- }
- }
-
public int getLogPageSize() {
return logManager.getLogPageSize();
}
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/recovery/RemoteRecoveryManager.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/recovery/RemoteRecoveryManager.java
index 3c6fc0e..47e60b2 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/recovery/RemoteRecoveryManager.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/recovery/RemoteRecoveryManager.java
@@ -18,10 +18,7 @@
*/
package org.apache.asterix.replication.recovery;
-import java.io.File;
import java.io.IOException;
-import java.io.RandomAccessFile;
-import java.nio.channels.FileChannel;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
@@ -40,21 +37,17 @@
import org.apache.asterix.common.replication.IRemoteRecoveryManager;
import org.apache.asterix.common.replication.IReplicationManager;
import org.apache.asterix.common.transactions.ILogManager;
-import org.apache.asterix.common.transactions.ILogReader;
import org.apache.asterix.common.transactions.IRecoveryManager;
import org.apache.asterix.om.util.AsterixClusterProperties;
import org.apache.asterix.replication.storage.ReplicaResourcesManager;
import org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository;
-import org.apache.asterix.transaction.management.service.logging.RemoteLogReader;
public class RemoteRecoveryManager implements IRemoteRecoveryManager {
- private static final String RECOVERY_LOGS_FILE_NAME = "recoveryLogs";
private final IReplicationManager replicationManager;
private static final Logger LOGGER = Logger.getLogger(RemoteRecoveryManager.class.getName());
private final IAsterixAppRuntimeContext runtimeContext;
private final AsterixReplicationProperties replicationProperties;
- private final static int REMOTE_RECOVERY_JOB_ID = -1;
private Map<String, Set<String>> failbackRecoveryReplicas;
public RemoteRecoveryManager(IReplicationManager replicationManager, IAsterixAppRuntimeContext runtimeContext,
@@ -64,93 +57,6 @@
this.replicationProperties = replicationProperties;
}
- @Override
- public void performRemoteRecovery() {
- //The whole remote recovery process should be atomic.
- //Any error happens, we should start the recovery from the start until the recovery is
- //complete or an illegal state is reached (cannot recover or max attempts exceed).
- int maxRecoveryAttempts = replicationProperties.getMaxRemoteRecoveryAttempts();
- PersistentLocalResourceRepository resourceRepository = (PersistentLocalResourceRepository) runtimeContext
- .getLocalResourceRepository();
- IRecoveryManager recoveryManager = runtimeContext.getTransactionSubsystem().getRecoveryManager();
- ILogManager logManager = runtimeContext.getTransactionSubsystem().getLogManager();
- while (true) {
- //start recovery steps
- try {
- if (maxRecoveryAttempts <= 0) {
- //to avoid infinite loop in case of unexpected behavior.
- throw new IllegalStateException("Failed to perform remote recovery.");
- }
-
- //delete any existing recovery files from previous failed recovery attempts
- recoveryManager.deleteRecoveryTemporaryFiles();
-
- //create temporary file to store recovery logs
- File recoveryLogsFile = recoveryManager.createJobRecoveryFile(REMOTE_RECOVERY_JOB_ID,
- RECOVERY_LOGS_FILE_NAME);
-
- /*** Prepare for Recovery ***/
- //1. check remote replicas states
- replicationManager.initializeReplicasState();
- int activeReplicasCount = replicationManager.getActiveReplicasCount();
-
- if (activeReplicasCount == 0) {
- throw new IllegalStateException("no ACTIVE remote replica(s) exists to perform remote recovery");
- }
-
- //2. clean any memory data that could've existed from previous failed recovery attempt
- IDatasetLifecycleManager datasetLifeCycleManager = runtimeContext.getDatasetLifecycleManager();
- datasetLifeCycleManager.closeAllDatasets();
-
- //3. remove any existing storage data and initialize storage metadata
- resourceRepository.deleteStorageData(true);
- resourceRepository.initializeNewUniverse(AsterixClusterProperties.INSTANCE.getStorageDirectoryName());
-
- //4. select remote replicas to recover from per lost replica data
- Map<String, Set<String>> selectedRemoteReplicas = constructRemoteRecoveryPlan();
-
- //5. get max LSN from selected remote replicas
- long maxRemoteLSN = replicationManager.getMaxRemoteLSN(selectedRemoteReplicas.keySet());
-
- //6. force LogManager to start from a partition > maxLSN in selected remote replicas
- logManager.renewLogFilesAndStartFromLSN(maxRemoteLSN);
-
- /*** Start Recovery Per Lost Replica ***/
- for (Entry<String, Set<String>> remoteReplica : selectedRemoteReplicas.entrySet()) {
- String replicaId = remoteReplica.getKey();
- Set<String> replicasDataToRecover = remoteReplica.getValue();
-
- //Request indexes metadata and LSM components
- replicationManager.requestReplicaFiles(replicaId, replicasDataToRecover, new HashSet<String>());
-
- //Get min LSN to start requesting logs from
- long minLSN = replicationManager.requestReplicaMinLSN(replicaId);
-
- //Request remote logs from selected remote replicas
- replicationManager.requestReplicaLogs(replicaId, replicasDataToRecover, minLSN, recoveryLogsFile);
-
- //Replay remote logs using recovery manager
- if (replicasDataToRecover.contains(logManager.getNodeId())) {
- //replay logs for local partitions only
- Set<Integer> nodePartitions = resourceRepository.getNodeOrignalPartitions();
- try (RandomAccessFile raf = new RandomAccessFile(recoveryLogsFile, "r");
- FileChannel fileChannel = raf.getChannel();) {
- ILogReader logReader = new RemoteLogReader(fileChannel, fileChannel.size(),
- logManager.getLogPageSize());
- recoveryManager.replayPartitionsLogs(nodePartitions, logReader, 0);
- }
- }
- }
- LOGGER.log(Level.INFO, "Completed remote recovery successfully!");
- break;
- } catch (Exception e) {
- e.printStackTrace();
- LOGGER.log(Level.WARNING, "Failed during remote recovery. Attempting again...");
- maxRecoveryAttempts--;
- }
- }
- }
-
private Map<String, Set<String>> constructRemoteRecoveryPlan() {
//1. identify which replicas reside in this node
String localNodeId = runtimeContext.getTransactionSubsystem().getId();
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/RemoteLogReader.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/RemoteLogReader.java
deleted file mode 100644
index 4dc1700..0000000
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/RemoteLogReader.java
+++ /dev/null
@@ -1,145 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.transaction.management.service.logging;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.nio.channels.FileChannel;
-
-import org.apache.asterix.common.exceptions.ACIDException;
-import org.apache.asterix.common.transactions.ILogReader;
-import org.apache.asterix.common.transactions.ILogRecord;
-import org.apache.asterix.common.transactions.ILogRecord.RecordReadStatus;
-import org.apache.asterix.common.transactions.LogRecord;
-
-public class RemoteLogReader implements ILogReader {
-
- private final FileChannel fileChannel;
- private final ILogRecord logRecord;
- private final ByteBuffer readBuffer;
- private long readLSN;
- private final int logPageSize;
-
- public RemoteLogReader(FileChannel fileChannel, long logFileSize, int logPageSize) {
- this.fileChannel = fileChannel;
- this.logPageSize = logPageSize;
- logRecord = new LogRecord();
- readBuffer = ByteBuffer.allocate(logPageSize);
- }
-
- @Override
- public void initializeScan(long beginLSN) throws ACIDException {
- readLSN = beginLSN;
- fillLogReadBuffer();
- }
-
- private boolean fillLogReadBuffer() throws ACIDException {
- return fillLogReadBuffer(logPageSize, readBuffer);
- }
-
- private boolean fillLogReadBuffer(int pageSize, ByteBuffer readBuffer) throws ACIDException {
- int size = 0;
- int read = 0;
- readBuffer.position(0);
- readBuffer.limit(logPageSize);
- try {
- fileChannel.position(readLSN);
- //We loop here because read() may return 0, but this simply means we are waiting on IO.
- //Therefore we want to break out only when either the buffer is full, or we reach EOF.
- while (size < pageSize && read != -1) {
- read = fileChannel.read(readBuffer);
- if (read > 0) {
- size += read;
- }
- }
- } catch (IOException e) {
- throw new ACIDException(e);
- }
- readBuffer.position(0);
- readBuffer.limit(size);
- if (size == 0 && read == -1) {
- return false; //EOF
- }
- return true;
- }
-
- @Override
- public ILogRecord read(long LSN) throws ACIDException {
- throw new UnsupportedOperationException("Random read is not supported.");
- }
-
- @Override
- public ILogRecord next() throws ACIDException {
- if (readBuffer.position() == readBuffer.limit()) {
- boolean hasRemaining = fillLogReadBuffer();
- if (!hasRemaining) {
- return null;
- }
- }
- ByteBuffer readBuffer = this.readBuffer;
- boolean refilled = false;
-
- while (true) {
- RecordReadStatus status = logRecord.readRemoteLog(readBuffer, true);
- switch (status) {
- case TRUNCATED: {
- if (!refilled) {
- //we may have just read off the end of the buffer, so try refiling it
- if (!fillLogReadBuffer()) {
- return null;
- }
- refilled = true;
- //now see what we have in the refilled buffer
- continue;
- }
- return null;
- }
- case LARGE_RECORD: {
- readBuffer = ByteBuffer.allocate(logRecord.getLogSize());
- fillLogReadBuffer(logRecord.getLogSize(), readBuffer);
- //now see what we have in the expanded buffer
- continue;
- }
- case BAD_CHKSUM: {
- return null;
- }
- case OK:
- break;
- }
- break;
- }
-
- readLSN += logRecord.getSerializedLogSize();
- return logRecord;
- }
-
- @Override
- public void close() throws ACIDException {
- try {
- if (fileChannel != null) {
- if (fileChannel.isOpen()) {
- fileChannel.close();
- }
- }
- } catch (IOException e) {
- throw new ACIDException(e);
- }
- }
-
-}