Txn Log Replication Optimizations
- Add configurations for txn log replication.
- Establish handshake for continuous log replication.
- Replicate logs in batches instead of one by one.
- Eliminate HashMap iterator object creation per logs batch.
- Fix synchronization for jobs' ACKs from remote replicas.
- Bug fixes for big objects replication.
- Some SONAR fixes for old code.
Change-Id: I25b5b84eba0cd41ac8e87e71368072879fcf8582
Reviewed-on: https://asterix-gerrit.ics.uci.edu/883
Reviewed-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: abdullah alamoudi <bamousaa@gmail.com>
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java
index 0a6d62d..c71d77e 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java
@@ -150,7 +150,7 @@
}
}
- private void startReplicationService() {
+ private void startReplicationService() throws InterruptedException {
//Open replication channel
runtimeContext.getReplicationChannel().start();
diff --git a/asterixdb/asterix-app/src/main/resources/asterix-build-configuration.xml b/asterixdb/asterix-app/src/main/resources/asterix-build-configuration.xml
index b1d0649..1d693f9 100644
--- a/asterixdb/asterix-app/src/main/resources/asterix-build-configuration.xml
+++ b/asterixdb/asterix-app/src/main/resources/asterix-build-configuration.xml
@@ -98,10 +98,4 @@
<description>Enabling plot of Algebricks plan to tmp folder. (Default = false)
</description>
</property>
- <property>
- <name>log.level</name>
- <value>WARNING</value>
- <description>The minimum log level to be displayed. (Default = INFO)
- </description>
- </property>
</asterixConfiguration>
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixReplicationProperties.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixReplicationProperties.java
index 5d31d9a..7f51bbd 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixReplicationProperties.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixReplicationProperties.java
@@ -26,19 +26,31 @@
import org.apache.asterix.common.replication.Replica;
import org.apache.asterix.event.schema.cluster.Cluster;
import org.apache.asterix.event.schema.cluster.Node;
+import org.apache.hyracks.util.StorageUtil;
+import org.apache.hyracks.util.StorageUtil.StorageUnit;
public class AsterixReplicationProperties extends AbstractAsterixProperties {
private static final Logger LOGGER = Logger.getLogger(AsterixReplicationProperties.class.getName());
- private static int REPLICATION_DATAPORT_DEFAULT = 2000;
- private static int REPLICATION_FACTOR_DEFAULT = 1;
- private static int REPLICATION_TIME_OUT_DEFAULT = 15;
+ private static final int REPLICATION_DATAPORT_DEFAULT = 2000;
+ private static final int REPLICATION_FACTOR_DEFAULT = 1;
+ private static final int REPLICATION_TIME_OUT_DEFAULT = 15;
private static final int MAX_REMOTE_RECOVERY_ATTEMPTS = 5;
private static final String NODE_IP_ADDRESS_DEFAULT = "127.0.0.1";
private final String NODE_NAME_PREFIX;
private final Cluster cluster;
+ private static final String REPLICATION_LOG_BATCH_SIZE_KEY = "replication.log.batchsize";
+ private static final int REPLICATION_LOG_BATCH_SIZE_DEFAULT = StorageUtil.getSizeInBytes(4, StorageUnit.KILOBYTE);
+
+ private static final String REPLICATION_LOG_BUFFER_NUM_PAGES_KEY = "replication.log.buffer.numpages";
+ private static final int REPLICATION_LOG_BUFFER_NUM_PAGES_DEFAULT = 8;
+
+ private static final String REPLICATION_LOG_BUFFER_PAGE_SIZE_KEY = "replication.log.buffer.pagesize";
+ private static final int REPLICATION_LOG_BUFFER_PAGE_SIZE_DEFAULT = StorageUtil.getSizeInBytes(128,
+ StorageUnit.KILOBYTE);
+
public AsterixReplicationProperties(AsterixPropertiesAccessor accessor, Cluster cluster) {
super(accessor);
this.cluster = cluster;
@@ -90,7 +102,7 @@
}
public Set<Replica> getRemoteReplicas(String nodeId) {
- Set<Replica> remoteReplicas = new HashSet<Replica>();;
+ Set<Replica> remoteReplicas = new HashSet<>();;
int numberOfRemoteReplicas = getReplicationFactor() - 1;
//Using chained-declustering
@@ -161,7 +173,7 @@
}
public Set<String> getRemoteReplicasIds(String nodeId) {
- Set<String> remoteReplicasIds = new HashSet<String>();
+ Set<String> remoteReplicasIds = new HashSet<>();
Set<Replica> remoteReplicas = getRemoteReplicas(nodeId);
for (Replica replica : remoteReplicas) {
@@ -176,7 +188,7 @@
}
public Set<String> getNodeReplicasIds(String nodeId) {
- Set<String> replicaIds = new HashSet<String>();
+ Set<String> replicaIds = new HashSet<>();
replicaIds.add(nodeId);
replicaIds.addAll(getRemoteReplicasIds(nodeId));
return replicaIds;
@@ -245,4 +257,19 @@
public int getMaxRemoteRecoveryAttempts() {
return MAX_REMOTE_RECOVERY_ATTEMPTS;
}
+
+ public int getLogBufferPageSize() {
+ return accessor.getProperty(REPLICATION_LOG_BUFFER_PAGE_SIZE_KEY, REPLICATION_LOG_BUFFER_PAGE_SIZE_DEFAULT,
+ PropertyInterpreters.getIntegerPropertyInterpreter());
+ }
+
+ public int getLogBufferNumOfPages() {
+ return accessor.getProperty(REPLICATION_LOG_BUFFER_NUM_PAGES_KEY, REPLICATION_LOG_BUFFER_NUM_PAGES_DEFAULT,
+ PropertyInterpreters.getIntegerPropertyInterpreter());
+ }
+
+ public int getLogBatchSize() {
+ return accessor.getProperty(REPLICATION_LOG_BATCH_SIZE_KEY, REPLICATION_LOG_BATCH_SIZE_DEFAULT,
+ PropertyInterpreters.getIntegerPropertyInterpreter());
+ }
}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/IRemoteRecoveryManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/IRemoteRecoveryManager.java
index a2738e8..9f9d74b 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/IRemoteRecoveryManager.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/IRemoteRecoveryManager.java
@@ -43,6 +43,7 @@
* Requests the remaining LSM disk components files from active remote replicas.
*
* @throws IOException
+ * @throws InterruptedException
*/
- public void completeFailbackProcess() throws IOException;
+ public void completeFailbackProcess() throws IOException, InterruptedException;
}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/IReplicationManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/IReplicationManager.java
index 755fbbd..6bd1505 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/IReplicationManager.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/IReplicationManager.java
@@ -19,6 +19,7 @@
package org.apache.asterix.common.replication;
import java.io.IOException;
+import java.nio.ByteBuffer;
import java.util.Set;
import org.apache.asterix.common.transactions.ILogRecord;
@@ -31,8 +32,9 @@
*
* @param logRecord
* The log record to be replicated,
+ * @throws InterruptedException
*/
- public void replicateLog(ILogRecord logRecord);
+ public void replicateLog(ILogRecord logRecord) throws InterruptedException;
/**
* Checks whether a log record has been replicated
@@ -79,8 +81,10 @@
/**
* Starts processing of ASYNC replication jobs as well as Txn logs.
+ *
+ * @throws InterruptedException
*/
- public void startReplicationThreads();
+ public void startReplicationThreads() throws InterruptedException;
/**
* Checks and sets each remote replica state.
@@ -114,4 +118,13 @@
* @throws IOException
*/
public void requestFlushLaggingReplicaIndexes(long nonSharpCheckpointTargetLSN) throws IOException;
+
+ /**
+ * Transfers the contents of the {@code buffer} to active remote replicas.
+ * The transfer starts from the {@code buffer} current position to its limit.
+ * After the transfer, the {@code buffer} position will be its limit.
+ *
+ * @param buffer
+ */
+ public void replicateTxnLogBatch(ByteBuffer buffer);
}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/Replica.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/Replica.java
index 4c3f728..b8fe4b2 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/Replica.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/Replica.java
@@ -62,8 +62,7 @@
public InetSocketAddress getAddress(AsterixReplicationProperties asterixReplicationProperties) {
String replicaIPAddress = node.getClusterIp();
int replicationPort = asterixReplicationProperties.getDataReplicationPort(node.getId());
- InetSocketAddress replicaAddress = InetSocketAddress.createUnresolved(replicaIPAddress, replicationPort);
- return replicaAddress;
+ return InetSocketAddress.createUnresolved(replicaIPAddress, replicationPort);
}
public static Replica create(DataInput input) throws IOException {
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogRecord.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogRecord.java
index 3738cd1..1992a00 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogRecord.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogRecord.java
@@ -110,9 +110,7 @@
public String getNodeId();
- public int writeRemoteRecoveryLog(ByteBuffer buffer);
-
- public RecordReadStatus readRemoteLog(ByteBuffer buffer, boolean remoteRecoveryLog);
+ public void readRemoteLog(ByteBuffer buffer);
public void setReplicationThread(IReplicationThread replicationThread);
@@ -120,11 +118,7 @@
public byte getLogSource();
- public int getSerializedLogSize();
-
- public void writeLogRecord(ByteBuffer buffer, long appendLSN);
-
- public ByteBuffer getSerializedLog();
+ public int getRemoteLogSize();
public void setNodeId(String nodeId);
@@ -138,4 +132,6 @@
* @return a flag indicating whether the log record should be sent to remote replicas
*/
public boolean isReplicated();
+
+ public void writeRemoteLogRecord(ByteBuffer buffer);
}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/LogRecord.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/LogRecord.java
index fd56913..4823a92 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/LogRecord.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/LogRecord.java
@@ -29,7 +29,7 @@
import org.apache.hyracks.storage.am.common.tuples.SimpleTupleReference;
import org.apache.hyracks.storage.am.common.tuples.SimpleTupleWriter;
-/*
+/**
* == LogRecordFormat ==
* ---------------------------
* [Header1] (6 bytes) : for all log types
@@ -44,8 +44,7 @@
* PKValueSize(4)
* PKValue(PKValueSize)
* ---------------------------
- * [Header3] (20 bytes) : only for update log type
- * PrevLSN(8)
+ * [Header3] (12 bytes) : only for update log type
* ResourceId(8) //stored in .metadata of the corresponding index in NC node
* LogRecordSize(4)
* ---------------------------
@@ -61,17 +60,35 @@
* = LogSize =
* 1) JOB_COMMIT_LOG_SIZE: 14 bytes (Header1(6) + Tail(8))
* 2) ENTITY_COMMIT || UPSERT_ENTITY_COMMIT: (Header1(6) + Header2(16) + Tail(8)) + PKValueSize
- * --> ENTITY_COMMIT_LOG_BASE_SIZE = 30
+ * --> ENTITY_COMMIT_LOG_BASE_SIZE = 30
* 3) UPDATE: (Header1(6) + Header2(16) + + Header3(20) + Body(9) + Tail(8)) + PKValueSize + NewValueSize
- * --> UPDATE_LOG_BASE_SIZE = 59
+ * --> UPDATE_LOG_BASE_SIZE = 59
* 4) FLUSH: 18 bytes (Header1(6) + DatasetId(4) + Tail(8))
* 5) WAIT_LOG_SIZE: 14 bytes (Header1(6) + Tail(8))
- * --> WAIT_LOG only requires LogType Field, but in order to conform the log reader protocol
- * it also includes LogSource and JobId fields.
+ * --> WAIT_LOG only requires LogType Field, but in order to conform the log reader protocol
+ * it also includes LogSource and JobId fields.
*/
public class LogRecord implements ILogRecord {
+ private static final int LOG_SOURCE_LEN = Byte.BYTES;
+ private static final int TYPE_LEN = Byte.BYTES;
+ public static final int PKHASH_LEN = Integer.BYTES;
+ public static final int PKSZ_LEN = Integer.BYTES;
+ private static final int RS_PARTITION_LEN = Integer.BYTES;
+ private static final int RSID_LEN = Long.BYTES;
+ private static final int LOGRCD_SZ_LEN = Integer.BYTES;
+ private static final int FLDCNT_LEN = Integer.BYTES;
+ private static final int NEWOP_LEN = Byte.BYTES;
+ private static final int NEWVALSZ_LEN = Integer.BYTES;
+ private static final int CHKSUM_LEN = Long.BYTES;
+
+ private static final int ALL_RECORD_HEADER_LEN = LOG_SOURCE_LEN + TYPE_LEN + JobId.BYTES;
+ private static final int ENTITYCOMMIT_UPDATE_HEADER_LEN = RS_PARTITION_LEN + DatasetId.BYTES + PKHASH_LEN
+ + PKSZ_LEN;
+ private static final int UPDATE_LSN_HEADER = RSID_LEN + LOGRCD_SZ_LEN;
+ private static final int UPDATE_BODY_HEADER = FLDCNT_LEN + NEWOP_LEN + NEWVALSZ_LEN;
+
// ------------- fields in a log record (begin) ------------//
private byte logSource;
private byte logType;
@@ -101,9 +118,8 @@
private int[] PKFields;
private PrimaryIndexOperationTracker opTracker;
private IReplicationThread replicationThread;
- private ByteBuffer serializedLog;
/**
- * The fields (numOfFlushedIndexes and nodeId) are used for serialized flush logs only
+ * The fields (numOfFlushedIndexes and nodeId) are used for remote flush logs only
* to indicate the source of the log and how many indexes were flushed using its LSN.
*/
private int numOfFlushedIndexes;
@@ -119,25 +135,6 @@
logSource = LogSource.LOCAL;
}
- private final static int LOG_SOURCE_LEN = Byte.BYTES;
- private final static int TYPE_LEN = Byte.BYTES;
- public final static int PKHASH_LEN = Integer.BYTES;
- public final static int PKSZ_LEN = Integer.BYTES;
- private final static int RS_PARTITION_LEN = Integer.BYTES;
- private final static int RSID_LEN = Long.BYTES;
- private final static int LOGRCD_SZ_LEN = Integer.BYTES;
- private final static int FLDCNT_LEN = Integer.BYTES;
- private final static int NEWOP_LEN = Byte.BYTES;
- private final static int NEWVALSZ_LEN = Integer.BYTES;
- private final static int CHKSUM_LEN = Long.BYTES;
-
- private final static int ALL_RECORD_HEADER_LEN = LOG_SOURCE_LEN + TYPE_LEN + JobId.BYTES;
- private final static int ENTITYCOMMIT_UPDATE_HEADER_LEN = RS_PARTITION_LEN + DatasetId.BYTES + PKHASH_LEN
- + PKSZ_LEN;
- private final static int UPDATE_LSN_HEADER = RSID_LEN + LOGRCD_SZ_LEN;
- private final static int UPDATE_BODY_HEADER = FLDCNT_LEN + NEWOP_LEN + NEWVALSZ_LEN;
- private final static int REMOTE_FLUSH_LOG_EXTRA_FIELDS_LEN = Long.BYTES + Integer.BYTES + Integer.BYTES;
-
private void writeLogRecordCommonFields(ByteBuffer buffer) {
buffer.put(logSource);
buffer.put(logType);
@@ -173,39 +170,15 @@
buffer.putLong(checksum);
}
- // this method is used when replication is enabled to include the log record LSN in the serialized version
@Override
- public void writeLogRecord(ByteBuffer buffer, long appendLSN) {
- int beginOffset = buffer.position();
+ public void writeRemoteLogRecord(ByteBuffer buffer) {
writeLogRecordCommonFields(buffer);
-
- if (replicated) {
- //copy the serialized log to send it to replicas
- int serializedLogSize = getSerializedLogSize();
- if (serializedLog == null || serializedLog.capacity() < serializedLogSize) {
- serializedLog = ByteBuffer.allocate(serializedLogSize);
- } else {
- serializedLog.clear();
- }
-
- int currentPosition = buffer.position();
- int currentLogSize = (currentPosition - beginOffset);
-
- buffer.position(beginOffset);
- buffer.get(serializedLog.array(), 0, currentLogSize);
- serializedLog.position(currentLogSize);
- if (logType == LogType.FLUSH) {
- serializedLog.putLong(appendLSN);
- serializedLog.putInt(numOfFlushedIndexes);
- serializedLog.putInt(nodeId.length());
- serializedLog.put(nodeId.getBytes());
- }
- serializedLog.flip();
- buffer.position(currentPosition);
+ if (logType == LogType.FLUSH) {
+ buffer.putLong(LSN);
+ buffer.putInt(numOfFlushedIndexes);
+ buffer.putInt(nodeId.length());
+ buffer.put(nodeId.getBytes());
}
-
- checksum = generateChecksum(buffer, beginOffset, logSize - CHKSUM_LEN);
- buffer.putLong(checksum);
}
private void writePKValue(ByteBuffer buffer) {
@@ -221,8 +194,12 @@
}
private void writeTuple(ByteBuffer buffer, ITupleReference tuple, int size) {
- tupleWriter.writeTuple(tuple, buffer.array(), buffer.position());
- // writeTuple() doesn't change the position of the buffer.
+ if (logSource == LogSource.LOCAL) {
+ tupleWriter.writeTuple(tuple, buffer.array(), buffer.position());
+ } else {
+ //since the tuple is already serialized in remote logs, just copy it from beginning to end.
+ System.arraycopy(tuple.getFieldData(0), 0, buffer.array(), buffer.position(), size);
+ }
buffer.position(buffer.position() + size);
}
@@ -323,47 +300,19 @@
}
@Override
- public RecordReadStatus readRemoteLog(ByteBuffer buffer, boolean remoteRecoveryLog) {
- int beginOffset = buffer.position();
-
+ public void readRemoteLog(ByteBuffer buffer) {
//read common fields
- RecordReadStatus status = readLogCommonFields(buffer);
- if (status != RecordReadStatus.OK) {
- buffer.position(beginOffset);
- return status;
- }
+ readLogCommonFields(buffer);
if (logType == LogType.FLUSH) {
- if (buffer.remaining() >= REMOTE_FLUSH_LOG_EXTRA_FIELDS_LEN) {
- LSN = buffer.getLong();
- numOfFlushedIndexes = buffer.getInt();
- //read serialized node id
- int nodeIdLength = buffer.getInt();
- if (buffer.remaining() >= nodeIdLength) {
- byte[] nodeIdBytes = new byte[nodeIdLength];
- buffer.get(nodeIdBytes);
- nodeId = new String(nodeIdBytes);
- } else {
- buffer.position(beginOffset);
- return RecordReadStatus.TRUNCATED;
- }
- } else {
- buffer.position(beginOffset);
- return RecordReadStatus.TRUNCATED;
- }
+ LSN = buffer.getLong();
+ numOfFlushedIndexes = buffer.getInt();
+ //read serialized node id
+ int nodeIdLength = buffer.getInt();
+ byte[] nodeIdBytes = new byte[nodeIdLength];
+ buffer.get(nodeIdBytes);
+ nodeId = new String(nodeIdBytes);
}
-
- //remote recovery logs need to have the LSN to check which should be replayed
- if (remoteRecoveryLog) {
- if (buffer.remaining() >= Long.BYTES) {
- LSN = buffer.getLong();
- } else {
- buffer.position(beginOffset);
- return RecordReadStatus.TRUNCATED;
- }
- }
-
- return RecordReadStatus.OK;
}
private ITupleReference readPKValue(ByteBuffer buffer) {
@@ -445,16 +394,6 @@
return builder.toString();
}
- @Override
- public int writeRemoteRecoveryLog(ByteBuffer buffer) {
- int bufferBegin = buffer.position();
- writeLogRecordCommonFields(buffer);
- //FLUSH logs should not included in remote recovery
- //LSN must be included in all remote recovery logs
- buffer.putLong(LSN);
- return buffer.position() - bufferBegin;
- }
-
////////////////////////////////////////////
// getter and setter methods
////////////////////////////////////////////
@@ -535,18 +474,18 @@
}
@Override
- public int getSerializedLogSize() {
- int serilizedSize = logSize;
+ public int getRemoteLogSize() {
+ int remoteLogSize = logSize;
if (logType == LogType.FLUSH) {
//LSN
- serilizedSize += Long.BYTES;
+ remoteLogSize += Long.BYTES;
//num of indexes
- serilizedSize += Integer.BYTES;
+ remoteLogSize += Integer.BYTES;
//serialized node id String
- serilizedSize += Integer.BYTES + nodeId.length();
+ remoteLogSize += Integer.BYTES + nodeId.length();
}
- serilizedSize -= CHKSUM_LEN;
- return serilizedSize;
+ remoteLogSize -= CHKSUM_LEN;
+ return remoteLogSize;
}
@Override
@@ -631,15 +570,6 @@
}
@Override
- public ByteBuffer getSerializedLog() {
- return serializedLog;
- }
-
- public void setSerializedLog(ByteBuffer serializedLog) {
- this.serializedLog = serializedLog;
- }
-
- @Override
public String getNodeId() {
return nodeId;
}
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/logging/ReplicationLogBuffer.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/logging/ReplicationLogBuffer.java
index 588968c..283f69f 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/logging/ReplicationLogBuffer.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/logging/ReplicationLogBuffer.java
@@ -18,44 +18,37 @@
*/
package org.apache.asterix.replication.logging;
-import java.io.IOException;
import java.nio.ByteBuffer;
-import java.nio.channels.SocketChannel;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.Map.Entry;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.asterix.common.transactions.ILogRecord;
-import org.apache.asterix.replication.functions.ReplicationProtocol;
-import org.apache.asterix.replication.management.NetworkingUtil;
import org.apache.asterix.replication.management.ReplicationManager;
public class ReplicationLogBuffer {
private final int logBufferSize;
private final AtomicBoolean full;
private int appendOffset;
- private int flushOffset;
+ private int replicationOffset;
private final ByteBuffer appendBuffer;
- private final ByteBuffer flushBuffer;
+ private final ByteBuffer replicationBuffer;
private boolean stop;
- private Map<String, SocketChannel> replicaSockets;
private ReplicationManager replicationManager;
+ private final int batchSize;
- public ReplicationLogBuffer(ReplicationManager replicationManager, int logBufferSize) {
+ public ReplicationLogBuffer(ReplicationManager replicationManager, int logBufferSize, int batchSize) {
this.replicationManager = replicationManager;
this.logBufferSize = logBufferSize;
+ this.batchSize = batchSize;
appendBuffer = ByteBuffer.allocate(logBufferSize);
- flushBuffer = appendBuffer.duplicate();
+ replicationBuffer = appendBuffer.duplicate();
full = new AtomicBoolean(false);
appendOffset = 0;
- flushOffset = 0;
+ replicationOffset = 0;
}
public void append(ILogRecord logRecord) {
- appendBuffer.putInt(ReplicationProtocol.ReplicationRequestType.REPLICATE_LOG.ordinal());
- appendBuffer.putInt(logRecord.getSerializedLogSize());
- appendBuffer.put(logRecord.getSerializedLog());
+ appendBuffer.putInt(logRecord.getRemoteLogSize());
+ logRecord.writeRemoteLogRecord(appendBuffer);
synchronized (this) {
appendOffset += getLogReplicationSize(logRecord);
@@ -63,10 +56,6 @@
}
}
- public void setReplicationSockets(Map<String, SocketChannel> replicaSockets) {
- this.replicaSockets = replicaSockets;
- }
-
public synchronized void isFull(boolean full) {
this.full.set(full);
this.notify();
@@ -77,18 +66,18 @@
}
private static int getLogReplicationSize(ILogRecord logRecord) {
- //request type + request length + serialized log length
- return Integer.BYTES + Integer.BYTES + logRecord.getSerializedLogSize();
+ //log length (4 bytes) + remote log size
+ return Integer.BYTES + logRecord.getRemoteLogSize();
}
public void reset() {
appendBuffer.position(0);
appendBuffer.limit(logBufferSize);
- flushBuffer.position(0);
- flushBuffer.limit(logBufferSize);
+ replicationBuffer.position(0);
+ replicationBuffer.limit(logBufferSize);
full.set(false);
appendOffset = 0;
- flushOffset = 0;
+ replicationOffset = 0;
stop = false;
}
@@ -96,57 +85,66 @@
int endOffset;
while (!full.get()) {
synchronized (this) {
- if (appendOffset - flushOffset == 0 && !full.get()) {
+ if (appendOffset - replicationOffset == 0 && !full.get()) {
try {
if (stop) {
break;
}
this.wait();
} catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
continue;
}
}
endOffset = appendOffset;
}
- internalFlush(flushOffset, endOffset);
+ internalFlush(replicationOffset, endOffset);
}
-
- internalFlush(flushOffset, appendOffset);
+ internalFlush(replicationOffset, appendOffset);
}
private void internalFlush(int beginOffset, int endOffset) {
if (endOffset > beginOffset) {
- int begingPos = flushBuffer.position();
- flushBuffer.limit(endOffset);
- sendRequest(replicaSockets, flushBuffer);
- flushBuffer.position(begingPos + (endOffset - beginOffset));
- flushOffset = endOffset;
+ int begingPos = replicationBuffer.position();
+ replicationBuffer.limit(endOffset);
+ transferBuffer(replicationBuffer);
+ replicationBuffer.position(begingPos + (endOffset - beginOffset));
+ replicationOffset = endOffset;
}
}
- private void sendRequest(Map<String, SocketChannel> replicaSockets, ByteBuffer requestBuffer) {
- Iterator<Map.Entry<String, SocketChannel>> iterator = replicaSockets.entrySet().iterator();
- int begin = requestBuffer.position();
- while (iterator.hasNext()) {
- Entry<String, SocketChannel> replicaSocket = iterator.next();
- SocketChannel clientSocket = replicaSocket.getValue();
- try {
- NetworkingUtil.transferBufferToChannel(clientSocket, requestBuffer);
- } catch (IOException e) {
- if (clientSocket.isOpen()) {
- try {
- clientSocket.close();
- } catch (IOException e2) {
- e2.printStackTrace();
- }
- }
- replicationManager.reportFailedReplica(replicaSocket.getKey());
- iterator.remove();
- } finally {
- requestBuffer.position(begin);
- }
+ private void transferBuffer(ByteBuffer buffer) {
+ if (buffer.remaining() <= batchSize) {
+ //the current batch can be sent as it is
+ replicationManager.replicateTxnLogBatch(buffer);
+ return;
}
+ /**
+ * break the batch into smaller batches
+ */
+ int totalTransferLimit = buffer.limit();
+ while (buffer.hasRemaining()) {
+ if (buffer.remaining() > batchSize) {
+ //mark the beginning of this batch
+ buffer.mark();
+ int currentBatchSize = 0;
+ while (currentBatchSize < batchSize) {
+ int logSize = replicationBuffer.getInt();
+ //add the size of the log record itself + 4 bytes for its size
+ currentBatchSize += logSize + Integer.BYTES;
+ //go to the beginning of the next log
+ buffer.position(buffer.position() + logSize);
+ }
+ //set the limit to the end of this batch
+ buffer.limit(buffer.position());
+ //return to the beginning of the batch position
+ buffer.reset();
+ }
+ replicationManager.replicateTxnLogBatch(buffer);
+ //return the original limit to check the new remaining size
+ buffer.limit(totalTransferLimit);
+ }
}
public boolean isStop() {
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/logging/ReplicationLogFlusher.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/logging/TxnLogReplicator.java
similarity index 71%
rename from asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/logging/ReplicationLogFlusher.java
rename to asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/logging/TxnLogReplicator.java
index 3312cb1..118fde6 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/logging/ReplicationLogFlusher.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/logging/TxnLogReplicator.java
@@ -21,24 +21,22 @@
import java.util.concurrent.Callable;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.logging.Level;
import java.util.logging.Logger;
-import org.apache.asterix.common.transactions.LogRecord;
-
/**
* This class is responsible for sending transactions logs to remote replicas.
*/
-public class ReplicationLogFlusher implements Callable<Boolean> {
- private final static Logger LOGGER = Logger.getLogger(ReplicationLogFlusher.class.getName());
- private final static ReplicationLogBuffer POISON_PILL = new ReplicationLogBuffer(null,
- LogRecord.JOB_TERMINATE_LOG_SIZE);
+public class TxnLogReplicator implements Callable<Boolean> {
+ private static final Logger LOGGER = Logger.getLogger(TxnLogReplicator.class.getName());
+ private static final ReplicationLogBuffer POISON_PILL = new ReplicationLogBuffer(null, 0, 0);
private final LinkedBlockingQueue<ReplicationLogBuffer> emptyQ;
private final LinkedBlockingQueue<ReplicationLogBuffer> flushQ;
private ReplicationLogBuffer flushPage;
private final AtomicBoolean isStarted;
private final AtomicBoolean terminateFlag;
- public ReplicationLogFlusher(LinkedBlockingQueue<ReplicationLogBuffer> emptyQ,
+ public TxnLogReplicator(LinkedBlockingQueue<ReplicationLogBuffer> emptyQ,
LinkedBlockingQueue<ReplicationLogBuffer> flushQ) {
this.emptyQ = emptyQ;
this.flushQ = flushQ;
@@ -54,7 +52,7 @@
try {
isStarted.wait();
} catch (InterruptedException e) {
- //ignore
+ Thread.currentThread().interrupt();
}
}
}
@@ -74,34 +72,37 @@
@Override
public Boolean call() {
- Thread.currentThread().setName("Replication Log Flusher");
+ Thread.currentThread().setName("TxnLog Replicator");
synchronized (isStarted) {
isStarted.set(true);
isStarted.notify();
}
- try {
- while (true) {
+
+ while (true) {
+ try {
+ if (terminateFlag.get()) {
+ return true;
+ }
+
flushPage = null;
- try {
- flushPage = flushQ.take();
- if (flushPage == POISON_PILL || terminateFlag.get()) {
- return true;
- }
- } catch (InterruptedException e) {
- if (flushPage == null) {
- continue;
- }
+ flushPage = flushQ.take();
+ if (flushPage == POISON_PILL) {
+ continue;
}
flushPage.flush();
// TODO: pool large pages
if (flushPage.getLogBufferSize() == flushPage.getReplicationManager().getLogPageSize()) {
emptyQ.offer(flushPage);
}
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ } catch (Exception e) {
+ if (LOGGER.isLoggable(Level.SEVERE)) {
+ LOGGER.log(Level.SEVERE, "TxnLogReplicator is terminating abnormally. Logs Replication Stopped.",
+ e);
+ }
+ throw e;
}
- } catch (Exception e) {
- e.printStackTrace();
- LOGGER.severe("ReplicationLogFlusher is terminating abnormally. Logs Replication Stopped.");
- throw e;
}
}
}
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/NetworkingUtil.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/NetworkingUtil.java
index 6023cb1..62c1e4a 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/NetworkingUtil.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/NetworkingUtil.java
@@ -21,6 +21,7 @@
import java.io.EOFException;
import java.io.IOException;
import java.net.InetAddress;
+import java.net.InetSocketAddress;
import java.net.NetworkInterface;
import java.net.SocketException;
import java.nio.ByteBuffer;
@@ -31,6 +32,10 @@
public class NetworkingUtil {
+ private NetworkingUtil() {
+ throw new AssertionError("This util class should not be initialized.");
+ }
+
public static void readBytes(SocketChannel socketChannel, ByteBuffer byteBuffer, int length) throws IOException {
byteBuffer.clear();
byteBuffer.limit(length);
@@ -88,7 +93,8 @@
return hostName;
}
- public static void transferBufferToChannel(SocketChannel socketChannel, ByteBuffer requestBuffer) throws IOException {
+ public static void transferBufferToChannel(SocketChannel socketChannel, ByteBuffer requestBuffer)
+ throws IOException {
while (requestBuffer.hasRemaining()) {
socketChannel.write(requestBuffer);
}
@@ -107,4 +113,10 @@
long fileSize = fileChannel.size();
fileChannel.transferFrom(socketChannel, pos, fileSize);
}
+
+ public static InetSocketAddress getSocketAddress(SocketChannel socketChannel) {
+ String hostAddress = socketChannel.socket().getInetAddress().getHostAddress();
+ int port = socketChannel.socket().getPort();
+ return InetSocketAddress.createUnresolved(hostAddress, port);
+ }
}
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationChannel.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationChannel.java
index a152f6c..cabfc77 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationChannel.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationChannel.java
@@ -83,6 +83,7 @@
public class ReplicationChannel extends Thread implements IReplicationChannel {
private static final Logger LOGGER = Logger.getLogger(ReplicationChannel.class.getName());
+ private static final int LOG_REPLICATION_END_HANKSHAKE_LOG_SIZE = 1;
private final ExecutorService replicationThreads;
private final String localNodeID;
private final ILogManager logManager;
@@ -91,8 +92,8 @@
private ServerSocketChannel serverSocketChannel = null;
private final IReplicationManager replicationManager;
private final AsterixReplicationProperties replicationProperties;
- private final IAsterixAppRuntimeContextProvider asterixAppRuntimeContextProvider;
- private final static int INTIAL_BUFFER_SIZE = StorageUtil.getSizeInBytes(4, StorageUnit.KILOBYTE);
+ private final IAsterixAppRuntimeContextProvider appContextProvider;
+ private static final int INTIAL_BUFFER_SIZE = StorageUtil.getSizeInBytes(4, StorageUnit.KILOBYTE);
private final LinkedBlockingQueue<LSMComponentLSNSyncTask> lsmComponentRemoteLSN2LocalLSNMappingTaskQ;
private final LinkedBlockingQueue<LogRecord> pendingNotificationRemoteLogsQ;
private final Map<String, LSMComponentProperties> lsmComponentId2PropertiesMap;
@@ -110,10 +111,10 @@
this.replicaResourcesManager = (ReplicaResourcesManager) replicaResoucesManager;
this.replicationManager = replicationManager;
this.replicationProperties = replicationProperties;
- this.asterixAppRuntimeContextProvider = asterixAppRuntimeContextProvider;
- lsmComponentRemoteLSN2LocalLSNMappingTaskQ = new LinkedBlockingQueue<LSMComponentLSNSyncTask>();
+ this.appContextProvider = asterixAppRuntimeContextProvider;
+ lsmComponentRemoteLSN2LocalLSNMappingTaskQ = new LinkedBlockingQueue<>();
pendingNotificationRemoteLogsQ = new LinkedBlockingQueue<>();
- lsmComponentId2PropertiesMap = new ConcurrentHashMap<String, LSMComponentProperties>();
+ lsmComponentId2PropertiesMap = new ConcurrentHashMap<>();
replicaUniqueLSN2RemoteMapping = new ConcurrentHashMap<>();
lsmComponentLSNMappingService = new LSMComponentsSyncService();
replicationNotifier = new ReplicationNotifier();
@@ -166,16 +167,17 @@
//clean up when all the LSM component files have been received.
if (remainingFile == 0) {
- if (lsmCompProp.getOpType() == LSMOperationType.FLUSH && lsmCompProp.getReplicaLSN() != null) {
- //if this LSN wont be used for any other index, remove it
- if (replicaUniqueLSN2RemoteMapping.containsKey(lsmCompProp.getNodeUniqueLSN())) {
- int remainingIndexes = replicaUniqueLSN2RemoteMapping
- .get(lsmCompProp.getNodeUniqueLSN()).numOfFlushedIndexes.decrementAndGet();
- if (remainingIndexes == 0) {
- //Note: there is a chance that this is never deleted because some index in the dataset was not flushed because it is empty.
- //This could be solved by passing only the number of successfully flushed indexes
- replicaUniqueLSN2RemoteMapping.remove(lsmCompProp.getNodeUniqueLSN());
- }
+ if (lsmCompProp.getOpType() == LSMOperationType.FLUSH && lsmCompProp.getReplicaLSN() != null
+ && replicaUniqueLSN2RemoteMapping.containsKey(lsmCompProp.getNodeUniqueLSN())) {
+ int remainingIndexes = replicaUniqueLSN2RemoteMapping
+ .get(lsmCompProp.getNodeUniqueLSN()).numOfFlushedIndexes.decrementAndGet();
+ if (remainingIndexes == 0) {
+ /**
+ * Note: there is a chance that this will never be removed because some
+ * index in the dataset was not flushed because it is empty. This could
+ * be solved by passing only the number of successfully flushed indexes.
+ */
+ replicaUniqueLSN2RemoteMapping.remove(lsmCompProp.getNodeUniqueLSN());
}
}
@@ -242,20 +244,23 @@
case FLUSH_INDEX:
handleFlushIndex();
break;
- default: {
+ default:
throw new IllegalStateException("Unknown replication request");
- }
}
replicationFunction = ReplicationProtocol.getRequestType(socketChannel, inBuffer);
}
} catch (Exception e) {
- e.printStackTrace();
+ if (LOGGER.isLoggable(Level.WARNING)) {
+ LOGGER.log(Level.WARNING, "Unexpectedly error during replication.", e);
+ }
} finally {
if (socketChannel.isOpen()) {
try {
socketChannel.close();
} catch (IOException e) {
- e.printStackTrace();
+ if (LOGGER.isLoggable(Level.WARNING)) {
+ LOGGER.log(Level.WARNING, "Filed to close replication socket.", e);
+ }
}
}
}
@@ -263,15 +268,17 @@
private void handleFlushIndex() throws IOException {
inBuffer = ReplicationProtocol.readRequest(socketChannel, inBuffer);
- //1. read which indexes are requested to be flushed from remote replica
+ //read which indexes are requested to be flushed from remote replica
ReplicaIndexFlushRequest request = ReplicationProtocol.readReplicaIndexFlushRequest(inBuffer);
Set<Long> requestedIndexesToBeFlushed = request.getLaggingRescouresIds();
- //2. check which indexes can be flushed (open indexes) and which cannot be flushed (closed or have empty memory component)
- IDatasetLifecycleManager datasetLifeCycleManager = asterixAppRuntimeContextProvider
- .getDatasetLifecycleManager();
+ /**
+ * check which indexes can be flushed (open indexes) and which cannot be
+ * flushed (closed or have empty memory component).
+ */
+ IDatasetLifecycleManager datasetLifeCycleManager = appContextProvider.getDatasetLifecycleManager();
List<IndexInfo> openIndexesInfo = datasetLifeCycleManager.getOpenIndexesInfo();
- Set<Integer> datasetsToForceFlush = new HashSet<Integer>();
+ Set<Integer> datasetsToForceFlush = new HashSet<>();
for (IndexInfo iInfo : openIndexesInfo) {
if (requestedIndexesToBeFlushed.contains(iInfo.getResourceId())) {
AbstractLSMIOOperationCallback ioCallback = (AbstractLSMIOOperationCallback) iInfo.getIndex()
@@ -281,7 +288,10 @@
//remove index to indicate that it will be flushed
requestedIndexesToBeFlushed.remove(iInfo.getResourceId());
} else if (!((AbstractLSMIndex) iInfo.getIndex()).isCurrentMutableComponentEmpty()) {
- //if an index has something to be flushed, then the request to flush it will succeed and we need to schedule it to be flushed.
+ /**
+ * if an index has something to be flushed, then the request to flush it
+ * will succeed and we need to schedule it to be flushed.
+ */
datasetsToForceFlush.add(iInfo.getDatasetId());
//remove index to indicate that it will be flushed
requestedIndexesToBeFlushed.remove(iInfo.getResourceId());
@@ -289,13 +299,13 @@
}
}
- //3. force flush datasets requested to be flushed
+ //schedule flush for datasets requested to be flushed
for (int datasetId : datasetsToForceFlush) {
datasetLifeCycleManager.flushDataset(datasetId, true);
}
//the remaining indexes in the requested set are those which cannot be flushed.
- //4. respond back to the requester that those indexes cannot be flushed
+ //respond back to the requester that those indexes cannot be flushed
ReplicaIndexFlushRequest laggingIndexesResponse = new ReplicaIndexFlushRequest(requestedIndexesToBeFlushed);
outBuffer = ReplicationProtocol.writeGetReplicaIndexFlushRequest(outBuffer, laggingIndexesResponse);
NetworkingUtil.transferBufferToChannel(socketChannel, outBuffer);
@@ -363,7 +373,7 @@
List<String> filesList;
Set<String> replicaIds = request.getReplicaIds();
Set<String> requesterExistingFiles = request.getExistingFiles();
- Map<String, ClusterPartition[]> nodePartitions = ((IAsterixPropertiesProvider) asterixAppRuntimeContextProvider
+ Map<String, ClusterPartition[]> nodePartitions = ((IAsterixPropertiesProvider) appContextProvider
.getAppContext()).getMetadataProperties().getNodePartitions();
for (String replicaId : replicaIds) {
//get replica partitions
@@ -414,50 +424,70 @@
}
private void handleLogReplication() throws IOException, ACIDException {
- inBuffer = ReplicationProtocol.readRequest(socketChannel, inBuffer);
+ //set initial buffer size to a log buffer page size
+ inBuffer = ByteBuffer.allocate(logManager.getLogPageSize());
+ while (true) {
+ //read a batch of logs
+ inBuffer = ReplicationProtocol.readRequest(socketChannel, inBuffer);
+ //check if it is end of handshake (a single byte log)
+ if (inBuffer.remaining() == LOG_REPLICATION_END_HANKSHAKE_LOG_SIZE) {
+ break;
+ }
- //Deserialize log
- remoteLog.readRemoteLog(inBuffer, false);
- remoteLog.setLogSource(LogSource.REMOTE);
-
- switch (remoteLog.getLogType()) {
- case LogType.UPDATE:
- case LogType.ENTITY_COMMIT:
- case LogType.UPSERT_ENTITY_COMMIT:
- //if the log partition belongs to a partitions hosted on this node, replicate it
- if (nodeHostedPartitions.contains(remoteLog.getResourcePartition())) {
- logManager.log(remoteLog);
- }
- break;
- case LogType.JOB_COMMIT:
- case LogType.ABORT:
- LogRecord jobTerminationLog = new LogRecord();
- TransactionUtil.formJobTerminateLogRecord(jobTerminationLog, remoteLog.getJobId(),
- remoteLog.getLogType() == LogType.JOB_COMMIT);
- jobTerminationLog.setReplicationThread(this);
- jobTerminationLog.setLogSource(LogSource.REMOTE);
- logManager.log(jobTerminationLog);
- break;
- case LogType.FLUSH:
- //store mapping information for flush logs to use them in incoming LSM components.
- RemoteLogMapping flushLogMap = new RemoteLogMapping();
- flushLogMap.setRemoteNodeID(remoteLog.getNodeId());
- flushLogMap.setRemoteLSN(remoteLog.getLSN());
- logManager.log(remoteLog);
- //the log LSN value is updated by logManager.log(.) to a local value
- flushLogMap.setLocalLSN(remoteLog.getLSN());
- flushLogMap.numOfFlushedIndexes.set(remoteLog.getNumOfFlushedIndexes());
- replicaUniqueLSN2RemoteMapping.put(flushLogMap.getNodeUniqueLSN(), flushLogMap);
- synchronized (flushLogslock) {
- flushLogslock.notify();
- }
- break;
- default:
- LOGGER.severe("Unsupported LogType: " + remoteLog.getLogType());
+ processLogsBatch(inBuffer);
}
}
- //this method is called sequentially by LogPage (notifyReplicationTerminator) for JOB_COMMIT and JOB_ABORT log types.
+ private void processLogsBatch(ByteBuffer buffer) throws ACIDException {
+ while (buffer.hasRemaining()) {
+ //get rid of log size
+ inBuffer.getInt();
+ //Deserialize log
+ remoteLog.readRemoteLog(inBuffer);
+ remoteLog.setLogSource(LogSource.REMOTE);
+
+ switch (remoteLog.getLogType()) {
+ case LogType.UPDATE:
+ case LogType.ENTITY_COMMIT:
+ case LogType.UPSERT_ENTITY_COMMIT:
+ //if the log partition belongs to a partitions hosted on this node, replicate it
+ if (nodeHostedPartitions.contains(remoteLog.getResourcePartition())) {
+ logManager.log(remoteLog);
+ }
+ break;
+ case LogType.JOB_COMMIT:
+ case LogType.ABORT:
+ LogRecord jobTerminationLog = new LogRecord();
+ TransactionUtil.formJobTerminateLogRecord(jobTerminationLog, remoteLog.getJobId(),
+ remoteLog.getLogType() == LogType.JOB_COMMIT);
+ jobTerminationLog.setReplicationThread(this);
+ jobTerminationLog.setLogSource(LogSource.REMOTE);
+ logManager.log(jobTerminationLog);
+ break;
+ case LogType.FLUSH:
+ //store mapping information for flush logs to use them in incoming LSM components.
+ RemoteLogMapping flushLogMap = new RemoteLogMapping();
+ flushLogMap.setRemoteNodeID(remoteLog.getNodeId());
+ flushLogMap.setRemoteLSN(remoteLog.getLSN());
+ logManager.log(remoteLog);
+ //the log LSN value is updated by logManager.log(.) to a local value
+ flushLogMap.setLocalLSN(remoteLog.getLSN());
+ flushLogMap.numOfFlushedIndexes.set(remoteLog.getNumOfFlushedIndexes());
+ replicaUniqueLSN2RemoteMapping.put(flushLogMap.getNodeUniqueLSN(), flushLogMap);
+ synchronized (flushLogslock) {
+ flushLogslock.notify();
+ }
+ break;
+ default:
+ LOGGER.severe("Unsupported LogType: " + remoteLog.getLogType());
+ }
+ }
+ }
+
+ /**
+ * this method is called sequentially by LogPage (notifyReplicationTerminator)
+ * for JOB_COMMIT and JOB_ABORT log types.
+ */
@Override
public void notifyLogReplicationRequester(LogRecord logRecord) {
pendingNotificationRemoteLogsQ.offer(logRecord);
@@ -480,24 +510,27 @@
try {
LogRecord logRecord = pendingNotificationRemoteLogsQ.take();
//send ACK to requester
- try {
- logRecord.getReplicationThread().getReplicationClientSocket().socket().getOutputStream()
- .write((localNodeID + ReplicationProtocol.JOB_REPLICATION_ACK + logRecord.getJobId()
- + System.lineSeparator()).getBytes());
- } catch (IOException e) {
- LOGGER.severe("Failed to send job replication ACK " + logRecord.getLogRecordForDisplay());
+ logRecord.getReplicationThread().getReplicationClientSocket().socket().getOutputStream()
+ .write((localNodeID + ReplicationProtocol.JOB_REPLICATION_ACK + logRecord.getJobId()
+ + System.lineSeparator()).getBytes());
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ } catch (IOException e) {
+ if (LOGGER.isLoggable(Level.WARNING)) {
+ LOGGER.log(Level.WARNING, "Failed to send job replication ACK", e);
}
- } catch (InterruptedException e1) {
- LOGGER.severe("ReplicationNotifier Thread interrupted.");
}
}
}
}
/**
- * This thread is responsible for synchronizing the LSN of the received LSM components to a local LSN.
+ * This thread is responsible for synchronizing the LSN of
+ * the received LSM components to a local LSN.
*/
private class LSMComponentsSyncService extends Thread {
+ private static final int BULKLOAD_LSN = 0;
+
@Override
public void run() {
Thread.currentThread().setName("LSMComponentsSyncService Thread");
@@ -506,23 +539,24 @@
try {
LSMComponentLSNSyncTask syncTask = lsmComponentRemoteLSN2LocalLSNMappingTaskQ.take();
LSMComponentProperties lsmCompProp = lsmComponentId2PropertiesMap.get(syncTask.getComponentId());
- try {
- syncLSMComponentFlushLSN(lsmCompProp, syncTask);
- updateLSMComponentRemainingFiles(lsmCompProp.getComponentId());
- } catch (Exception e) {
- e.printStackTrace();
- }
+ syncLSMComponentFlushLSN(lsmCompProp, syncTask);
+ updateLSMComponentRemainingFiles(lsmCompProp.getComponentId());
} catch (InterruptedException e) {
- //ignore
+ Thread.currentThread().interrupt();
+ } catch (Exception e) {
+ if (LOGGER.isLoggable(Level.SEVERE)) {
+ LOGGER.log(Level.SEVERE, "Unexpected exception during LSN synchronization", e);
+ }
}
+
}
}
private void syncLSMComponentFlushLSN(LSMComponentProperties lsmCompProp, LSMComponentLSNSyncTask syncTask)
- throws Exception {
+ throws InterruptedException, IOException {
long remoteLSN = lsmCompProp.getOriginalLSN();
//LSN=0 (bulkload) does not need to be updated and there is no flush log corresponding to it
- if (remoteLSN == 0) {
+ if (remoteLSN == BULKLOAD_LSN) {
//since this is the first LSM component of this index,
//then set the mapping in the LSN_MAP to the current log LSN because
//no other log could've been received for this index since bulkload replication is synchronous.
@@ -536,16 +570,21 @@
if (lsmCompProp.getOpType() == LSMOperationType.FLUSH) {
//need to look up LSN mapping from memory
RemoteLogMapping remoteLogMap = replicaUniqueLSN2RemoteMapping.get(lsmCompProp.getNodeUniqueLSN());
- if (remoteLogMap == null) {
+ //wait until flush log arrives, and verify the LSM component file still exists
+ //The component file could be deleted if its NC fails.
+ while (remoteLogMap == null && Files.exists(path)) {
synchronized (flushLogslock) {
- remoteLogMap = replicaUniqueLSN2RemoteMapping.get(lsmCompProp.getNodeUniqueLSN());
- //wait until flush log arrives, and verify the LSM component file still exists
- //The component file could be deleted if its NC fails.
- while (remoteLogMap == null && Files.exists(path)) {
- flushLogslock.wait();
- remoteLogMap = replicaUniqueLSN2RemoteMapping.get(lsmCompProp.getNodeUniqueLSN());
- }
+ flushLogslock.wait();
}
+ remoteLogMap = replicaUniqueLSN2RemoteMapping.get(lsmCompProp.getNodeUniqueLSN());
+ }
+
+ /**
+ * file has been deleted due to its remote primary replica failure
+ * before its LSN could've been synchronized.
+ */
+ if (remoteLogMap == null) {
+ return;
}
lsmCompProp.setReplicaLSN(remoteLogMap.getLocalLSN());
} else if (lsmCompProp.getOpType() == LSMOperationType.MERGE) {
@@ -554,13 +593,14 @@
.getReplicaIndexLSNMap(lsmCompProp.getReplicaComponentPath(replicaResourcesManager));
Long mappingLSN = lsmMap.get(lsmCompProp.getOriginalLSN());
if (mappingLSN == null) {
- /*
- * this shouldn't happen unless this node just recovered and the first component it received
- * is a merged component due to an on-going merge operation while recovery on the remote replica.
- * In this case, we use the current append LSN since no new records exist for this index,
- * otherwise they would've been flushed.
- * This could be prevented by waiting for any IO to finish on the remote replica during recovery.
- *
+ /**
+ * this shouldn't happen unless this node just recovered and
+ * the first component it received is a merged component due
+ * to an on-going merge operation while recovery on the remote
+ * replica. In this case, we use the current append LSN since
+ * no new records exist for this index, otherwise they would've
+ * been flushed. This could be prevented by waiting for any IO
+ * to finish on the remote replica during recovery.
*/
mappingLSN = logManager.getAppendLSN();
}
@@ -569,9 +609,10 @@
}
if (Files.notExists(path)) {
- /*
- * This could happen when a merged component arrives and deletes the flushed
- * component (which we are trying to update) before its flush log arrives since logs and components are received
+ /**
+ * This could happen when a merged component arrives and deletes
+ * the flushed component (which we are trying to update) before
+ * its flush log arrives since logs and components are received
* on different threads.
*/
return;
@@ -594,4 +635,4 @@
}
}
}
-}
\ No newline at end of file
+}
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationManager.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationManager.java
index ee872a5..5ba6ad2 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationManager.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationManager.java
@@ -71,7 +71,7 @@
import org.apache.asterix.replication.functions.ReplicationProtocol;
import org.apache.asterix.replication.functions.ReplicationProtocol.ReplicationRequestType;
import org.apache.asterix.replication.logging.ReplicationLogBuffer;
-import org.apache.asterix.replication.logging.ReplicationLogFlusher;
+import org.apache.asterix.replication.logging.TxnLogReplicator;
import org.apache.asterix.replication.storage.LSMComponentProperties;
import org.apache.asterix.replication.storage.LSMIndexFileProperties;
import org.apache.asterix.replication.storage.ReplicaResourcesManager;
@@ -86,6 +86,8 @@
import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexReplicationJob;
import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndex;
+import org.apache.hyracks.util.StorageUtil;
+import org.apache.hyracks.util.StorageUtil.StorageUnit;
/**
* This class is used to process replication jobs and maintain remote replicas states
@@ -93,7 +95,7 @@
public class ReplicationManager implements IReplicationManager {
private static final Logger LOGGER = Logger.getLogger(ReplicationManager.class.getName());
- private final int INITIAL_REPLICATION_FACTOR = 1;
+ private static final int INITIAL_REPLICATION_FACTOR = 1;
private final String nodeId;
private ExecutorService replicationListenerThreads;
private final Map<Integer, Set<String>> jobCommitAcks;
@@ -114,7 +116,7 @@
private final AtomicBoolean replicationSuspended;
private AtomicBoolean terminateJobsReplication;
private AtomicBoolean jobsReplicationSuspended;
- private final static int INITIAL_BUFFER_SIZE = 4000; //4KB
+ private static final int INITIAL_BUFFER_SIZE = StorageUtil.getSizeInBytes(4, StorageUnit.KILOBYTE);
private final Set<String> shuttingDownReplicaIds;
//replication threads
private ReplicationJobsProccessor replicationJobsProcessor;
@@ -128,9 +130,10 @@
private LinkedBlockingQueue<ReplicationLogBuffer> emptyLogBuffersQ;
private LinkedBlockingQueue<ReplicationLogBuffer> pendingFlushLogBuffersQ;
protected ReplicationLogBuffer currentTxnLogBuffer;
- private ReplicationLogFlusher txnlogsReplicator;
+ private TxnLogReplicator txnlogReplicator;
private Future<? extends Object> txnLogReplicatorTask;
- private Map<String, SocketChannel> logsReplicaSockets = null;
+ private SocketChannel[] logsRepSockets;
+ private final ByteBuffer txnLogsBatchSizeBuffer = ByteBuffer.allocate(Integer.BYTES);
//TODO this class needs to be refactored by moving its private classes to separate files
//and possibly using MessageBroker to send/receive remote replicas events.
@@ -143,15 +146,15 @@
this.asterixAppRuntimeContextProvider = asterixAppRuntimeContextProvider;
this.hostIPAddressFirstOctet = replicationProperties.getReplicaIPAddress(nodeId).substring(0, 3);
this.logManager = logManager;
- replicationJobsQ = new LinkedBlockingQueue<IReplicationJob>();
- replicaEventsQ = new LinkedBlockingQueue<ReplicaEvent>();
+ replicationJobsQ = new LinkedBlockingQueue<>();
+ replicaEventsQ = new LinkedBlockingQueue<>();
terminateJobsReplication = new AtomicBoolean(false);
jobsReplicationSuspended = new AtomicBoolean(true);
replicationSuspended = new AtomicBoolean(true);
- replicas = new HashMap<String, Replica>();
- jobCommitAcks = new ConcurrentHashMap<Integer, Set<String>>();
- replicationJobsPendingAcks = new ConcurrentHashMap<Integer, ILogRecord>();
- shuttingDownReplicaIds = new HashSet<String>();
+ replicas = new HashMap<>();
+ jobCommitAcks = new ConcurrentHashMap<>();
+ replicationJobsPendingAcks = new ConcurrentHashMap<>();
+ shuttingDownReplicaIds = new HashSet<>();
dataBuffer = ByteBuffer.allocate(INITIAL_BUFFER_SIZE);
//Used as async listeners from replicas
@@ -179,13 +182,14 @@
clientPartitonsSet.addAll(clientPartitions);
replica2PartitionsMap.put(replica.getId(), clientPartitonsSet);
}
- int numLogBuffers = logManager.getNumLogPages();
- emptyLogBuffersQ = new LinkedBlockingQueue<ReplicationLogBuffer>(numLogBuffers);
- pendingFlushLogBuffersQ = new LinkedBlockingQueue<ReplicationLogBuffer>(numLogBuffers);
+ int numLogBuffers = replicationProperties.getLogBufferNumOfPages();
+ emptyLogBuffersQ = new LinkedBlockingQueue<>(numLogBuffers);
+ pendingFlushLogBuffersQ = new LinkedBlockingQueue<>(numLogBuffers);
- int logBufferSize = logManager.getLogPageSize();
+ int logBufferSize = replicationProperties.getLogBufferPageSize();
for (int i = 0; i < numLogBuffers; i++) {
- emptyLogBuffersQ.offer(new ReplicationLogBuffer(this, logBufferSize));
+ emptyLogBuffersQ
+ .offer(new ReplicationLogBuffer(this, logBufferSize, replicationProperties.getLogBatchSize()));
}
}
@@ -200,7 +204,7 @@
try {
replicationSuspended.wait();
} catch (InterruptedException e) {
- //ignore
+ Thread.currentThread().interrupt();
}
}
}
@@ -209,16 +213,12 @@
}
@Override
- public void replicateLog(ILogRecord logRecord) {
+ public void replicateLog(ILogRecord logRecord) throws InterruptedException {
if (logRecord.getLogType() == LogType.JOB_COMMIT || logRecord.getLogType() == LogType.ABORT) {
//if replication is suspended, wait until it is resumed.
while (replicationSuspended.get()) {
synchronized (replicationSuspended) {
- try {
- replicationSuspended.wait();
- } catch (InterruptedException e) {
- //ignore
- }
+ replicationSuspended.wait();
}
}
Set<String> replicaIds = Collections.synchronizedSet(new HashSet<String>());
@@ -232,29 +232,23 @@
protected void getAndInitNewLargePage(int pageSize) {
// for now, alloc a new buffer for each large page
// TODO: consider pooling large pages
- currentTxnLogBuffer = new ReplicationLogBuffer(this, pageSize);
- currentTxnLogBuffer.setReplicationSockets(logsReplicaSockets);
+ currentTxnLogBuffer = new ReplicationLogBuffer(this, pageSize, replicationProperties.getLogBufferPageSize());
pendingFlushLogBuffersQ.offer(currentTxnLogBuffer);
}
- protected void getAndInitNewPage() {
+ protected void getAndInitNewPage() throws InterruptedException {
currentTxnLogBuffer = null;
while (currentTxnLogBuffer == null) {
- try {
- currentTxnLogBuffer = emptyLogBuffersQ.take();
- } catch (InterruptedException e) {
- //ignore
- }
+ currentTxnLogBuffer = emptyLogBuffersQ.take();
}
currentTxnLogBuffer.reset();
- currentTxnLogBuffer.setReplicationSockets(logsReplicaSockets);
pendingFlushLogBuffersQ.offer(currentTxnLogBuffer);
}
- private synchronized void appendToLogBuffer(ILogRecord logRecord) {
+ private synchronized void appendToLogBuffer(ILogRecord logRecord) throws InterruptedException {
if (!currentTxnLogBuffer.hasSpace(logRecord)) {
currentTxnLogBuffer.isFull(true);
- if (logRecord.getLogSize() > logManager.getLogPageSize()) {
+ if (logRecord.getLogSize() > getLogPageSize()) {
getAndInitNewLargePage(logRecord.getLogSize());
} else {
getAndInitNewPage();
@@ -326,7 +320,10 @@
long fileSize = fileChannel.size();
if (LSMComponentJob != null) {
- //since this is LSM_COMPONENT REPLICATE job, the job will contain only the component being replicated.
+ /**
+ * since this is LSM_COMPONENT REPLICATE job, the job will contain
+ * only the component being replicated.
+ */
ILSMComponent diskComponent = LSMComponentJob.getLSMIndexOperationContext()
.getComponentsToBeReplicated().get(0);
long LSNByteOffset = AsterixLSMIndexUtil.getComponentFileLSNOffset(
@@ -362,7 +359,7 @@
}
}
} catch (IOException e) {
- reportFailedReplica(entry.getKey());
+ handleReplicationFailure(socketChannel, e);
iterator.remove();
} finally {
requestBuffer.position(0);
@@ -392,7 +389,7 @@
waitForResponse(socketChannel, responseBuffer);
}
} catch (IOException e) {
- reportFailedReplica(entry.getKey());
+ handleReplicationFailure(socketChannel, e);
iterator.remove();
} finally {
requestBuffer.position(0);
@@ -458,7 +455,7 @@
}
/**
- * Suspends proccessing replication jobs.
+ * Suspends processing replication jobs/logs.
*
* @param force
* a flag indicates if replication should be suspended right away or when the pending jobs are completed.
@@ -477,60 +474,134 @@
try {
jobsReplicationSuspended.wait();
} catch (InterruptedException e) {
- //ignore
+ Thread.currentThread().interrupt();
}
}
}
}
//suspend logs replication
- if (txnlogsReplicator != null) {
- terminateTxnLogsReplicator();
+ if (txnlogReplicator != null) {
+ endTxnLogReplicationHandshake();
}
}
/**
* Opens a new connection with Active remote replicas and starts a listen thread per connection.
*/
- private void establishTxnLogsReplicationConnection() {
- logsReplicaSockets = getActiveRemoteReplicasSockets();
+ private void establishTxnLogReplicationHandshake() {
+ Map<String, SocketChannel> activeRemoteReplicasSockets = getActiveRemoteReplicasSockets();
+ logsRepSockets = new SocketChannel[activeRemoteReplicasSockets.size()];
+ int i = 0;
//start a listener thread per connection
- for (Entry<String, SocketChannel> entry : logsReplicaSockets.entrySet()) {
+ for (Entry<String, SocketChannel> entry : activeRemoteReplicasSockets.entrySet()) {
+ logsRepSockets[i] = entry.getValue();
replicationListenerThreads
.execute(new TxnLogsReplicationResponseListener(entry.getKey(), entry.getValue()));
+ i++;
+ }
+
+ /**
+ * establish log replication handshake
+ */
+ ByteBuffer handshakeBuffer = ByteBuffer.allocate(ReplicationProtocol.REPLICATION_REQUEST_TYPE_SIZE)
+ .putInt(ReplicationProtocol.ReplicationRequestType.REPLICATE_LOG.ordinal());
+ handshakeBuffer.flip();
+ //send handshake request
+ for (SocketChannel replicaSocket : logsRepSockets) {
+ try {
+ NetworkingUtil.transferBufferToChannel(replicaSocket, handshakeBuffer);
+ } catch (IOException e) {
+ handleReplicationFailure(replicaSocket, e);
+ } finally {
+ handshakeBuffer.position(0);
+ }
}
}
+ private void handleReplicationFailure(SocketChannel socketChannel, Throwable t) {
+ if (LOGGER.isLoggable(Level.WARNING)) {
+ LOGGER.log(Level.WARNING, "Could not complete replication request.", t);
+ }
+ if (socketChannel.isOpen()) {
+ try {
+ socketChannel.close();
+ } catch (IOException e) {
+ if (LOGGER.isLoggable(Level.WARNING)) {
+ LOGGER.log(Level.WARNING, "Could not close socket.", e);
+ }
+ }
+ }
+ reportFailedReplica(getReplicaIdBySocket(socketChannel));
+ }
+
/**
- * Stops ReplicationFlusherThread and closes the sockets used to replicate logs.
+ * Stops TxnLogReplicator and closes the sockets used to replicate logs.
*/
- private void terminateTxnLogsReplicator() {
- LOGGER.log(Level.INFO, "Terminating ReplicationLogFlusher thread ...");
- txnlogsReplicator.terminate();
+ private void endTxnLogReplicationHandshake() {
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Terminating TxnLogReplicator thread ...");
+ }
+ txnlogReplicator.terminate();
try {
txnLogReplicatorTask.get();
} catch (ExecutionException | InterruptedException e) {
- LOGGER.log(Level.WARNING, "RepicationLogFlusher thread terminated abnormally");
- e.printStackTrace();
+ if (LOGGER.isLoggable(Level.SEVERE)) {
+ LOGGER.log(Level.SEVERE, "TxnLogReplicator thread terminated abnormally", e);
+ }
}
- LOGGER.log(Level.INFO, "LogFlusher thread is terminated.");
- if (logsReplicaSockets != null) {
- //wait for any ACK to arrive before closing sockets.
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("TxnLogReplicator thread was terminated.");
+ }
+
+ /**
+ * End log replication handshake (by sending a dummy log with a single byte)
+ */
+ ByteBuffer endLogRepHandshake = ByteBuffer.allocate(Integer.SIZE + 1).putInt(1).put((byte) 0);
+ endLogRepHandshake.flip();
+ for (SocketChannel replicaSocket : logsRepSockets) {
+ try {
+ NetworkingUtil.transferBufferToChannel(replicaSocket, endLogRepHandshake);
+ } catch (IOException e) {
+ handleReplicationFailure(replicaSocket, e);
+ } finally {
+ endLogRepHandshake.position(0);
+ }
+ }
+
+ //wait for any ACK to arrive before closing sockets.
+ if (logsRepSockets != null) {
synchronized (jobCommitAcks) {
- while (jobCommitAcks.size() != 0) {
- try {
+ try {
+ while (jobCommitAcks.size() != 0) {
jobCommitAcks.wait();
- } catch (InterruptedException e) {
- //ignore
}
+ } catch (InterruptedException e) {
+ if (LOGGER.isLoggable(Level.SEVERE)) {
+ LOGGER.log(Level.SEVERE, "Interrupted while waiting for jobs ACK", e);
+ }
+ Thread.currentThread().interrupt();
}
}
-
- //close log replication sockets
- closeReplicaSockets(logsReplicaSockets);
- logsReplicaSockets = null;
}
+
+ /**
+ * Close log replication sockets
+ */
+ ByteBuffer goodbyeBuffer = ReplicationProtocol.getGoodbyeBuffer();
+ for (SocketChannel replicaSocket : logsRepSockets) {
+ try {
+ //send goodbye to remote replica
+ NetworkingUtil.transferBufferToChannel(replicaSocket, goodbyeBuffer);
+ replicaSocket.close();
+ } catch (IOException e) {
+ handleReplicationFailure(replicaSocket, e);
+ } finally {
+ goodbyeBuffer.position(0);
+ }
+ }
+ logsRepSockets = null;
}
/**
@@ -567,14 +638,7 @@
try {
NetworkingUtil.transferBufferToChannel(clientSocket, requestBuffer);
} catch (IOException e) {
- if (clientSocket.isOpen()) {
- try {
- clientSocket.close();
- } catch (IOException e2) {
- e2.printStackTrace();
- }
- }
- reportFailedReplica(replicaSocket.getKey());
+ handleReplicationFailure(clientSocket, e);
iterator.remove();
} finally {
requestBuffer.position(0);
@@ -600,7 +664,7 @@
try {
clientSocket.close();
} catch (IOException e) {
- e.printStackTrace();
+ handleReplicationFailure(clientSocket, e);
}
}
}
@@ -636,7 +700,7 @@
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
- e.printStackTrace();
+ Thread.currentThread().interrupt();
}
}
}
@@ -651,8 +715,10 @@
* The new state of the replica.
* @param suspendReplication
* a flag indicating whether to suspend replication on state change or not.
+ * @throws InterruptedException
*/
- public synchronized void updateReplicaState(String replicaId, ReplicaState newState, boolean suspendReplication) {
+ public synchronized void updateReplicaState(String replicaId, ReplicaState newState, boolean suspendReplication)
+ throws InterruptedException {
Replica replica = replicas.get(replicaId);
if (replica.getState() == newState) {
@@ -680,10 +746,8 @@
if (newState == ReplicaState.ACTIVE) {
replicationFactor++;
- } else if (newState == ReplicaState.DEAD) {
- if (replicationFactor > INITIAL_REPLICATION_FACTOR) {
- replicationFactor--;
- }
+ } else if (newState == ReplicaState.DEAD && replicationFactor > INITIAL_REPLICATION_FACTOR) {
+ replicationFactor--;
}
LOGGER.log(Level.WARNING, "Replica " + replicaId + " state changed to: " + newState.name()
@@ -702,22 +766,24 @@
* The remote replica id the ACK received from.
*/
private void addAckToJob(int jobId, String replicaId) {
- //add ACK to the job
- if (jobCommitAcks.containsKey(jobId)) {
- Set<String> replicaIds = jobCommitAcks.get(jobId);
- replicaIds.add(replicaId);
- } else {
- throw new IllegalStateException("Job ID not found in pending job commits " + jobId);
- }
+ synchronized (jobCommitAcks) {
+ //add ACK to the job
+ if (jobCommitAcks.containsKey(jobId)) {
+ Set<String> replicaIds = jobCommitAcks.get(jobId);
+ replicaIds.add(replicaId);
+ } else {
+ if (LOGGER.isLoggable(Level.WARNING)) {
+ LOGGER.warning("Invalid job replication ACK received for jobId(" + jobId + ")");
+ }
+ return;
+ }
- //if got ACKs from all remote replicas, notify pending jobs if any
- if (jobCommitAcks.get(jobId).size() == replicationFactor) {
- synchronized (replicationJobsPendingAcks) {
- if (replicationJobsPendingAcks.containsKey(jobId)) {
- ILogRecord pendingLog = replicationJobsPendingAcks.get(jobId);
- synchronized (pendingLog) {
- pendingLog.notify();
- }
+ //if got ACKs from all remote replicas, notify pending jobs if any
+
+ if (jobCommitAcks.get(jobId).size() == replicationFactor && replicationJobsPendingAcks.containsKey(jobId)) {
+ ILogRecord pendingLog = replicationJobsPendingAcks.get(jobId);
+ synchronized (pendingLog) {
+ pendingLog.notify();
}
}
}
@@ -725,26 +791,25 @@
@Override
public boolean hasBeenReplicated(ILogRecord logRecord) {
- if (jobCommitAcks.containsKey(logRecord.getJobId())) {
- //check if all ACKs have been received
- if (jobCommitAcks.get(logRecord.getJobId()).size() == replicationFactor) {
- jobCommitAcks.remove(logRecord.getJobId());
+ int jobId = logRecord.getJobId();
+ if (jobCommitAcks.containsKey(jobId)) {
+ synchronized (jobCommitAcks) {
+ //check if all ACKs have been received
+ if (jobCommitAcks.get(jobId).size() == replicationFactor) {
+ jobCommitAcks.remove(jobId);
- if (replicationJobsPendingAcks.containsKey(logRecord.getJobId())) {
- replicationJobsPendingAcks.remove(logRecord);
- }
+ //remove from pending jobs if exists
+ replicationJobsPendingAcks.remove(jobId);
- //notify any threads waiting for all jobs to finish
- if (jobCommitAcks.size() == 0) {
- synchronized (jobCommitAcks) {
+ //notify any threads waiting for all jobs to finish
+ if (jobCommitAcks.size() == 0) {
jobCommitAcks.notifyAll();
}
+ return true;
+ } else {
+ replicationJobsPendingAcks.putIfAbsent(jobId, logRecord);
+ return false;
}
-
- return true;
- } else {
- replicationJobsPendingAcks.putIfAbsent(logRecord.getJobId(), logRecord);
- return false;
}
}
@@ -753,13 +818,16 @@
}
private Map<String, SocketChannel> getActiveRemoteReplicasSockets() {
- Map<String, SocketChannel> replicaNodesSockets = new HashMap<String, SocketChannel>();
+ Map<String, SocketChannel> replicaNodesSockets = new HashMap<>();
for (Replica replica : replicas.values()) {
if (replica.getState() == ReplicaState.ACTIVE) {
try {
SocketChannel sc = getReplicaSocket(replica.getId());
replicaNodesSockets.put(replica.getId(), sc);
} catch (IOException e) {
+ if (LOGGER.isLoggable(Level.WARNING)) {
+ LOGGER.log(Level.WARNING, "Could not get replica socket", e);
+ }
reportFailedReplica(replica.getId());
}
}
@@ -776,7 +844,7 @@
* @throws IOException
*/
private SocketChannel getReplicaSocket(String replicaId) throws IOException {
- Replica replica = replicas.get(replicaId);
+ Replica replica = replicationProperties.getReplicaById(replicaId);
SocketChannel sc = SocketChannel.open();
sc.configureBlocking(true);
InetSocketAddress address = replica.getAddress(replicationProperties);
@@ -786,7 +854,7 @@
@Override
public Set<String> getDeadReplicasIds() {
- Set<String> replicasIds = new HashSet<String>();
+ Set<String> replicasIds = new HashSet<>();
for (Replica replica : replicas.values()) {
if (replica.getState() == ReplicaState.DEAD) {
replicasIds.add(replica.getNode().getId());
@@ -797,7 +865,7 @@
@Override
public Set<String> getActiveReplicasIds() {
- Set<String> replicasIds = new HashSet<String>();
+ Set<String> replicasIds = new HashSet<>();
for (Replica replica : replicas.values()) {
if (replica.getState() == ReplicaState.ACTIVE) {
replicasIds.add(replica.getNode().getId());
@@ -823,40 +891,35 @@
/**
* Called during NC shutdown to notify remote replicas about the shutdown
- * and wait for remote replicas shutdown notification then closes the local replication channel.
+ * and wait for remote replicas shutdown notification then closes the local
+ * replication channel.
*/
@Override
public void stop(boolean dumpState, OutputStream ouputStream) throws IOException {
- try {
- //stop replication thread afters all jobs/logs have been processed
- suspendReplication(false);
- //send shutdown event to remote replicas
- sendShutdownNotifiction();
- //wait until all shutdown events come from all remote replicas
- synchronized (shuttingDownReplicaIds) {
- while (!shuttingDownReplicaIds.containsAll(getActiveReplicasIds())) {
- try {
- shuttingDownReplicaIds.wait(1000);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
+ //stop replication thread afters all jobs/logs have been processed
+ suspendReplication(false);
+ //send shutdown event to remote replicas
+ sendShutdownNotifiction();
+ //wait until all shutdown events come from all remote replicas
+ synchronized (shuttingDownReplicaIds) {
+ while (!shuttingDownReplicaIds.containsAll(getActiveReplicasIds())) {
+ try {
+ shuttingDownReplicaIds.wait(1000);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
}
}
- LOGGER.log(Level.INFO, "Got shutdown notification from all remote replicas");
- //close replication channel
- asterixAppRuntimeContextProvider.getAppContext().getReplicationChannel().close();
-
- LOGGER.log(Level.INFO, "Replication manager stopped.");
- } catch (Exception e) {
- e.printStackTrace();
}
+ LOGGER.log(Level.INFO, "Got shutdown notification from all remote replicas");
+ //close replication channel
+ asterixAppRuntimeContextProvider.getAppContext().getReplicationChannel().close();
+
+ LOGGER.log(Level.INFO, "Replication manager stopped.");
}
@Override
public void reportReplicaEvent(ReplicaEvent event) {
- synchronized (replicaEventsQ) {
- replicaEventsQ.offer(event);
- }
+ replicaEventsQ.offer(event);
}
/**
@@ -867,6 +930,9 @@
*/
public void reportFailedReplica(String replicaId) {
Replica replica = replicas.get(replicaId);
+ if (replica == null) {
+ return;
+ }
if (replica.getState() == ReplicaState.DEAD) {
return;
}
@@ -878,16 +944,28 @@
reportReplicaEvent(event);
}
+ private String getReplicaIdBySocket(SocketChannel socketChannel) {
+ InetSocketAddress socketAddress = NetworkingUtil.getSocketAddress(socketChannel);
+ for (Replica replica : replicas.values()) {
+ InetSocketAddress replicaAddress = replica.getAddress(replicationProperties);
+ if (replicaAddress.getHostName().equals(socketAddress.getHostName())
+ && replicaAddress.getPort() == socketAddress.getPort()) {
+ return replica.getId();
+ }
+ }
+ return null;
+ }
+
@Override
- public void startReplicationThreads() {
+ public void startReplicationThreads() throws InterruptedException {
replicationJobsProcessor = new ReplicationJobsProccessor();
//start/continue processing jobs/logs
- if (logsReplicaSockets == null) {
- establishTxnLogsReplicationConnection();
+ if (logsRepSockets == null) {
+ establishTxnLogReplicationHandshake();
getAndInitNewPage();
- txnlogsReplicator = new ReplicationLogFlusher(emptyLogBuffersQ, pendingFlushLogBuffersQ);
- txnLogReplicatorTask = asterixAppRuntimeContextProvider.getThreadExecutor().submit(txnlogsReplicator);
+ txnlogReplicator = new TxnLogReplicator(emptyLogBuffersQ, pendingFlushLogBuffersQ);
+ txnLogReplicatorTask = asterixAppRuntimeContextProvider.getThreadExecutor().submit(txnlogReplicator);
}
replicationJobsProcessor.start();
@@ -936,7 +1014,11 @@
ReplicationProtocol.sendGoodbye(socketChannel);
}
- //4. update the LSN_MAP for indexes that were not flushed to the current append LSN to indicate no operations happend.
+ /**
+ * 4. update the LSN_MAP for indexes that were not flushed
+ * to the current append LSN to indicate no operations happened
+ * since the checkpoint start.
+ */
if (laggingIndexesResponse != null) {
for (Long resouceId : laggingIndexesResponse.getLaggingRescouresIds()) {
String indexPath = laggingIndexes.get(resouceId);
@@ -955,7 +1037,7 @@
long maxRemoteLSN = 0;
ReplicationProtocol.writeGetReplicaMaxLSNRequest(dataBuffer);
- Map<String, SocketChannel> replicaSockets = new HashMap<String, SocketChannel>();
+ Map<String, SocketChannel> replicaSockets = new HashMap<>();
try {
for (String replicaId : remoteReplicas) {
replicaSockets.put(replicaId, getReplicaSocket(replicaId));
@@ -1037,7 +1119,42 @@
}
public int getLogPageSize() {
- return logManager.getLogPageSize();
+ return replicationProperties.getLogBufferPageSize();
+ }
+
+ @Override
+ public void replicateTxnLogBatch(final ByteBuffer buffer) {
+ //if replication is suspended, wait until it is resumed
+ try {
+ while (replicationSuspended.get()) {
+ synchronized (replicationSuspended) {
+ replicationSuspended.wait();
+ }
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ //prepare the batch size buffer
+ txnLogsBatchSizeBuffer.clear();
+ txnLogsBatchSizeBuffer.putInt(buffer.remaining());
+ txnLogsBatchSizeBuffer.flip();
+
+ buffer.mark();
+ for (SocketChannel replicaSocket : logsRepSockets) {
+ try {
+ //send batch size
+ NetworkingUtil.transferBufferToChannel(replicaSocket, txnLogsBatchSizeBuffer);
+ //send log
+ NetworkingUtil.transferBufferToChannel(replicaSocket, buffer);
+ } catch (IOException e) {
+ handleReplicationFailure(replicaSocket, e);
+ } finally {
+ txnLogsBatchSizeBuffer.position(0);
+ buffer.reset();
+ }
+ }
+ //move the buffer position to the sent limit
+ buffer.position(buffer.limit());
}
//supporting classes
@@ -1068,12 +1185,12 @@
break;
}
} catch (InterruptedException e) {
- //ignore
+ Thread.currentThread().interrupt();
}
}
}
- public void handleReplicaFailure(String replicaId) {
+ public void handleReplicaFailure(String replicaId) throws InterruptedException {
Replica replica = replicas.get(replicaId);
if (replica.getState() == ReplicaState.DEAD) {
@@ -1127,12 +1244,16 @@
processJob(job, replicaSockets, reusableBuffer);
//if no more jobs to process, close sockets
- if (replicationJobsQ.size() == 0) {
+ if (replicationJobsQ.isEmpty()) {
LOGGER.log(Level.INFO, "No pending replication jobs. Closing connections to replicas");
closeSockets();
}
- } catch (Exception e) {
- e.printStackTrace();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ } catch (IOException e) {
+ if (LOGGER.isLoggable(Level.WARNING)) {
+ LOGGER.log(Level.WARNING, "Couldn't complete processing replication job", e);
+ }
}
}
@@ -1169,25 +1290,25 @@
Thread.currentThread().setName("TxnLogs Replication Listener Thread");
LOGGER.log(Level.INFO, "Started listening on socket: " + replicaSocket.socket().getRemoteSocketAddress());
- try {
- BufferedReader incomingResponse = new BufferedReader(
- new InputStreamReader(replicaSocket.socket().getInputStream()));
- String responseLine = "";
+ try (BufferedReader incomingResponse = new BufferedReader(
+ new InputStreamReader(replicaSocket.socket().getInputStream()))) {
while (true) {
- responseLine = incomingResponse.readLine();
+ String responseLine = incomingResponse.readLine();
if (responseLine == null) {
break;
}
//read ACK for job commit log
- String replicaId = ReplicationProtocol.getNodeIdFromLogAckMessage(responseLine);
+ String ackFrom = ReplicationProtocol.getNodeIdFromLogAckMessage(responseLine);
int jobId = ReplicationProtocol.getJobIdFromLogAckMessage(responseLine);
- addAckToJob(jobId, replicaId);
+ addAckToJob(jobId, ackFrom);
}
- } catch (AsynchronousCloseException e1) {
- LOGGER.log(Level.INFO, "Replication listener stopped for remote replica: " + replicaId);
- } catch (IOException e2) {
- reportFailedReplica(replicaId);
+ } catch (AsynchronousCloseException e) {
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.log(Level.INFO, "Replication listener stopped for remote replica: " + replicaId, e);
+ }
+ } catch (IOException e) {
+ handleReplicationFailure(replicaSocket, e);
}
}
}
-}
+}
\ No newline at end of file
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/recovery/RemoteRecoveryManager.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/recovery/RemoteRecoveryManager.java
index 47e60b2..4da5fd4 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/recovery/RemoteRecoveryManager.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/recovery/RemoteRecoveryManager.java
@@ -63,8 +63,8 @@
Set<String> nodes = replicationProperties.getNodeReplicationClients(localNodeId);
- Map<String, Set<String>> recoveryCandidates = new HashMap<String, Set<String>>();
- Map<String, Integer> candidatesScore = new HashMap<String, Integer>();
+ Map<String, Set<String>> recoveryCandidates = new HashMap<>();
+ Map<String, Integer> candidatesScore = new HashMap<>();
//2. identify which nodes has backup per lost node data
for (String node : nodes) {
@@ -80,7 +80,7 @@
}
//no active replicas to recover from
- if (locations.size() == 0) {
+ if (locations.isEmpty()) {
throw new IllegalStateException("Could not find any ACTIVE replica to recover " + node + " data.");
}
@@ -94,7 +94,7 @@
recoveryCandidates.put(node, locations);
}
- Map<String, Set<String>> recoveryList = new HashMap<String, Set<String>>();
+ Map<String, Set<String>> recoveryList = new HashMap<>();
//3. find best candidate to recover from per lost replica data
for (Entry<String, Set<String>> entry : recoveryCandidates.entrySet()) {
@@ -113,7 +113,7 @@
if (recoveryList.containsKey(winner)) {
recoveryList.get(winner).add(entry.getKey());
} else {
- Set<String> nodesToRecover = new HashSet<String>();
+ Set<String> nodesToRecover = new HashSet<>();
nodesToRecover.add(entry.getKey());
recoveryList.put(winner, nodesToRecover);
}
@@ -196,15 +196,16 @@
}
break;
} catch (IOException e) {
- e.printStackTrace();
- LOGGER.log(Level.WARNING, "Failed during remote recovery. Attempting again...");
+ if (LOGGER.isLoggable(Level.WARNING)) {
+ LOGGER.log(Level.WARNING, "Failed during remote recovery. Attempting again...", e);
+ }
maxRecoveryAttempts--;
}
}
}
@Override
- public void completeFailbackProcess() throws IOException {
+ public void completeFailbackProcess() throws IOException, InterruptedException {
ILogManager logManager = runtimeContext.getTransactionSubsystem().getLogManager();
ReplicaResourcesManager replicaResourcesManager = (ReplicaResourcesManager) runtimeContext
.getReplicaResourcesManager();
@@ -237,7 +238,9 @@
* in case of failure during failback completion process we need to construct a new plan
* and get all the files from the start since the remote replicas will change in the new plan.
*/
- e.printStackTrace();
+ if (LOGGER.isLoggable(Level.WARNING)) {
+ LOGGER.log(Level.WARNING, "Failed during completing failback. Restarting failback process...", e);
+ }
startFailbackProcess();
}
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogBuffer.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogBuffer.java
index 872adcd..1245674 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogBuffer.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogBuffer.java
@@ -76,9 +76,9 @@
appendOffset = 0;
flushOffset = 0;
isLastPage = false;
- syncCommitQ = new LinkedBlockingQueue<ILogRecord>(logPageSize / ILogRecord.JOB_TERMINATE_LOG_SIZE);
- flushQ = new LinkedBlockingQueue<ILogRecord>();
- remoteJobsQ = new LinkedBlockingQueue<ILogRecord>();
+ syncCommitQ = new LinkedBlockingQueue<>(logPageSize / ILogRecord.JOB_TERMINATE_LOG_SIZE);
+ flushQ = new LinkedBlockingQueue<>();
+ remoteJobsQ = new LinkedBlockingQueue<>();
reusableDsId = new DatasetId(-1);
reusableJobId = new JobId(-1);
}
@@ -113,7 +113,7 @@
@Override
public void appendWithReplication(ILogRecord logRecord, long appendLSN) {
- logRecord.writeLogRecord(appendBuffer, appendLSN);
+ logRecord.writeLogRecord(appendBuffer);
if (logRecord.getLogSource() == LogSource.LOCAL && logRecord.getLogType() != LogType.FLUSH
&& logRecord.getLogType() != LogType.WAIT) {
@@ -135,10 +135,9 @@
logRecord.isFlushed(false);
flushQ.offer(logRecord);
}
- } else if (logRecord.getLogSource() == LogSource.REMOTE) {
- if (logRecord.getLogType() == LogType.JOB_COMMIT || logRecord.getLogType() == LogType.ABORT) {
- remoteJobsQ.offer(logRecord);
- }
+ } else if (logRecord.getLogSource() == LogSource.REMOTE
+ && (logRecord.getLogType() == LogType.JOB_COMMIT || logRecord.getLogType() == LogType.ABORT)) {
+ remoteJobsQ.offer(logRecord);
}
this.notify();
}
@@ -347,11 +346,7 @@
IReplicationThread replicationThread = logRecord.getReplicationThread();
if (replicationThread != null) {
- try {
- replicationThread.notifyLogReplicationRequester(logRecord);
- } catch (Exception e) {
- e.printStackTrace();
- }
+ replicationThread.notifyLogReplicationRequester(logRecord);
}
}
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManagerWithReplication.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManagerWithReplication.java
index cacd036..0c4cb88 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManagerWithReplication.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManagerWithReplication.java
@@ -37,10 +37,6 @@
@Override
public void log(ILogRecord logRecord) throws ACIDException {
- if (logRecord.getLogSize() > logPageSize) {
- throw new IllegalStateException();
- }
-
//only locally generated logs should be replicated
logRecord.setReplicated(logRecord.getLogSource() == LogSource.LOCAL && logRecord.getLogType() != LogType.WAIT);
@@ -58,7 +54,11 @@
syncAppendToLogTail(logRecord);
if (logRecord.isReplicated()) {
- replicationManager.replicateLog(logRecord);
+ try {
+ replicationManager.replicateLog(logRecord);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
}
if (logRecord.getLogSource() == LogSource.LOCAL) {
@@ -69,7 +69,7 @@
try {
logRecord.wait();
} catch (InterruptedException e) {
- //ignore
+ Thread.currentThread().interrupt();
}
}
@@ -79,7 +79,7 @@
try {
logRecord.wait();
} catch (InterruptedException e) {
- //ignore
+ Thread.currentThread().interrupt();
}
}
}