Adapt Replication to Append Only LSM Components
This change includes the following:
- Make LSN recording in a single tree in LSM components.
- Pass LSN byte offset with every tree index file being replicated.
- Add LSN to remote recovery logs to check which logs should be replayed.
- Update ILogRecord method names to better describe their operation.
Change-Id: I8bd2656746e1c293b981d5f43e80928314ccbad0
Reviewed-on: https://asterix-gerrit.ics.uci.edu/561
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: abdullah alamoudi <bamousaa@gmail.com>
Reviewed-by: Ian Maxon <imaxon@apache.org>
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/dataflow/AsterixLSMIndexUtil.java b/asterix-common/src/main/java/org/apache/asterix/common/dataflow/AsterixLSMIndexUtil.java
index 4259a10..c7b7e3e 100644
--- a/asterix-common/src/main/java/org/apache/asterix/common/dataflow/AsterixLSMIndexUtil.java
+++ b/asterix-common/src/main/java/org/apache/asterix/common/dataflow/AsterixLSMIndexUtil.java
@@ -18,17 +18,16 @@
*/
package org.apache.asterix.common.dataflow;
-import org.apache.asterix.common.exceptions.AsterixException;
import org.apache.asterix.common.ioopcallbacks.AbstractLSMIOOperationCallback;
import org.apache.asterix.common.transactions.ILogManager;
import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent;
import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndex;
public class AsterixLSMIndexUtil {
public static void checkAndSetFirstLSN(AbstractLSMIndex lsmIndex, ILogManager logManager)
- throws HyracksDataException, AsterixException {
-
+ throws HyracksDataException {
// If the index has an empty memory component, we need to set its first LSN (For soft checkpoint)
if (lsmIndex.isCurrentMutableComponentEmpty()) {
//prevent transactions from incorrectly setting the first LSN on a modified component by checking the index is still empty
@@ -42,9 +41,10 @@
}
}
- public static boolean lsmComponentFileHasLSN(AbstractLSMIndex lsmIndex, String componentFilePath) {
+ public static long getComponentFileLSNOffset(AbstractLSMIndex lsmIndex, ILSMComponent lsmComponent,
+ String componentFilePath) throws HyracksDataException {
AbstractLSMIOOperationCallback ioOpCallback = (AbstractLSMIOOperationCallback) lsmIndex
.getIOOperationCallback();
- return ioOpCallback.componentFileHasLSN(componentFilePath);
+ return ioOpCallback.getComponentFileLSNOffset(lsmComponent, componentFilePath);
}
}
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/AbstractLSMIOOperationCallback.java b/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/AbstractLSMIOOperationCallback.java
index b3cae06..64201e2 100644
--- a/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/AbstractLSMIOOperationCallback.java
+++ b/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/AbstractLSMIOOperationCallback.java
@@ -21,13 +21,12 @@
import java.util.List;
-import org.apache.asterix.common.exceptions.AsterixException;
import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.storage.am.common.api.IMetaDataPageManager;
import org.apache.hyracks.storage.am.common.api.ITreeIndex;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback;
import org.apache.hyracks.storage.am.lsm.common.api.LSMOperationType;
-import org.apache.hyracks.storage.common.file.BufferedFileHandle;
// A single LSMIOOperationCallback per LSM index used to perform actions around Flush and Merge operations
public abstract class AbstractLSMIOOperationCallback implements ILSMIOOperationCallback {
@@ -96,7 +95,7 @@
treeIndex.getMetaManager().setLSN(componentLSN);
}
- protected long getTreeIndexLSN(ITreeIndex treeIndex) throws HyracksDataException {
+ public static long getTreeIndexLSN(ITreeIndex treeIndex) throws HyracksDataException {
return treeIndex.getMetaManager().getLSN();
}
@@ -104,7 +103,7 @@
mutableLastLSNs[writeIndex] = lastLSN;
}
- public void setFirstLSN(long firstLSN) throws AsterixException {
+ public void setFirstLSN(long firstLSN) {
// We make sure that this method is only called on an empty component so the first LSN is not set incorrectly
firstLSNs[writeIndex] = firstLSN;
}
@@ -124,7 +123,13 @@
}
return false;
}
-
- public abstract boolean componentFileHasLSN(String componentFilePath);
+ /**
+ * @param component
+ * @param componentFilePath
+ * @return The LSN byte offset in the LSM disk component if the index is valid, otherwise {@link IMetaDataPageManager#INVALID_LSN_OFFSET}.
+ * @throws HyracksDataException
+ */
+ public abstract long getComponentFileLSNOffset(ILSMComponent component, String componentFilePath)
+ throws HyracksDataException;
}
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMBTreeIOOperationCallback.java b/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMBTreeIOOperationCallback.java
index 8b4fa01..b522687 100644
--- a/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMBTreeIOOperationCallback.java
+++ b/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMBTreeIOOperationCallback.java
@@ -23,6 +23,7 @@
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.storage.am.btree.impls.BTree;
+import org.apache.hyracks.storage.am.common.api.IMetaDataPageManager;
import org.apache.hyracks.storage.am.lsm.btree.impls.LSMBTreeDiskComponent;
import org.apache.hyracks.storage.am.lsm.btree.impls.LSMBTreeFileManager;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent;
@@ -57,17 +58,18 @@
long maxLSN = -1;
for (ILSMComponent c : diskComponents) {
BTree btree = ((LSMBTreeDiskComponent) c).getBTree();
- maxLSN = Math.max(getTreeIndexLSN(btree), maxLSN);
+ maxLSN = Math.max(AbstractLSMIOOperationCallback.getTreeIndexLSN(btree), maxLSN);
}
return maxLSN;
}
@Override
- public boolean componentFileHasLSN(String componentFilePath) {
- if (componentFilePath.endsWith(LSMBTreeFileManager.BTREE_STRING)) {
- return true;
+ public long getComponentFileLSNOffset(ILSMComponent diskComponent, String diskComponentFilePath)
+ throws HyracksDataException {
+ if (diskComponentFilePath.endsWith(LSMBTreeFileManager.BTREE_STRING)) {
+ LSMBTreeDiskComponent btreeComponent = (LSMBTreeDiskComponent) diskComponent;
+ return btreeComponent.getBTree().getMetaManager().getLSNOffset();
}
-
- return false;
+ return IMetaDataPageManager.INVALID_LSN_OFFSET;
}
}
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMBTreeWithBuddyIOOperationCallback.java b/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMBTreeWithBuddyIOOperationCallback.java
index 229ccd6..1d0c0a5 100644
--- a/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMBTreeWithBuddyIOOperationCallback.java
+++ b/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMBTreeWithBuddyIOOperationCallback.java
@@ -21,6 +21,7 @@
import java.util.List;
import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.storage.am.common.api.IMetaDataPageManager;
import org.apache.hyracks.storage.am.lsm.btree.impls.LSMBTreeWithBuddyDiskComponent;
import org.apache.hyracks.storage.am.lsm.btree.impls.LSMBTreeWithBuddyFileManager;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent;
@@ -34,7 +35,6 @@
if (newComponent != null) {
LSMBTreeWithBuddyDiskComponent btreeComponent = (LSMBTreeWithBuddyDiskComponent) newComponent;
putLSNIntoMetadata(btreeComponent.getBTree(), oldComponents);
- putLSNIntoMetadata(btreeComponent.getBuddyBTree(), oldComponents);
}
}
@@ -51,19 +51,18 @@
long maxLSN = -1;
for (Object o : diskComponents) {
LSMBTreeWithBuddyDiskComponent btreeComponent = (LSMBTreeWithBuddyDiskComponent) o;
- maxLSN = Math.max(getTreeIndexLSN(btreeComponent.getBTree()), maxLSN);
+ maxLSN = Math.max(AbstractLSMIOOperationCallback.getTreeIndexLSN(btreeComponent.getBTree()), maxLSN);
}
return maxLSN;
}
@Override
- public boolean componentFileHasLSN(String componentFilePath) {
- if (componentFilePath.endsWith(LSMBTreeWithBuddyFileManager.BTREE_STRING)
- || componentFilePath.endsWith(LSMBTreeWithBuddyFileManager.BUDDY_BTREE_STRING)) {
- return true;
+ public long getComponentFileLSNOffset(ILSMComponent diskComponent, String diskComponentFilePath)
+ throws HyracksDataException {
+ if (diskComponentFilePath.endsWith(LSMBTreeWithBuddyFileManager.BTREE_STRING)) {
+ LSMBTreeWithBuddyDiskComponent btreeComponent = (LSMBTreeWithBuddyDiskComponent) diskComponent;
+ return btreeComponent.getBTree().getMetaManager().getLSNOffset();
}
-
- return false;
+ return IMetaDataPageManager.INVALID_LSN_OFFSET;
}
-
}
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMInvertedIndexIOOperationCallback.java b/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMInvertedIndexIOOperationCallback.java
index 3e4ff04..faa9166 100644
--- a/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMInvertedIndexIOOperationCallback.java
+++ b/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMInvertedIndexIOOperationCallback.java
@@ -22,6 +22,7 @@
import java.util.List;
import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.storage.am.common.api.IMetaDataPageManager;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent;
import org.apache.hyracks.storage.am.lsm.common.api.LSMOperationType;
import org.apache.hyracks.storage.am.lsm.invertedindex.impls.LSMInvertedIndexDiskComponent;
@@ -55,17 +56,18 @@
long maxLSN = -1;
for (Object o : diskComponents) {
LSMInvertedIndexDiskComponent invIndexComponent = (LSMInvertedIndexDiskComponent) o;
- maxLSN = Math.max(getTreeIndexLSN(invIndexComponent.getDeletedKeysBTree()), maxLSN);
+ maxLSN = Math.max(AbstractLSMIOOperationCallback.getTreeIndexLSN(invIndexComponent.getDeletedKeysBTree()), maxLSN);
}
return maxLSN;
}
@Override
- public boolean componentFileHasLSN(String componentFilePath) {
- if (componentFilePath.endsWith(LSMInvertedIndexFileManager.DELETED_KEYS_BTREE_SUFFIX)) {
- return true;
+ public long getComponentFileLSNOffset(ILSMComponent diskComponent, String diskComponentFilePath)
+ throws HyracksDataException {
+ if (diskComponentFilePath.endsWith(LSMInvertedIndexFileManager.DELETED_KEYS_BTREE_SUFFIX)) {
+ LSMInvertedIndexDiskComponent invIndexComponent = (LSMInvertedIndexDiskComponent) diskComponent;
+ return invIndexComponent.getDeletedKeysBTree().getMetaManager().getLSNOffset();
}
-
- return false;
+ return IMetaDataPageManager.INVALID_LSN_OFFSET;
}
}
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMRTreeIOOperationCallback.java b/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMRTreeIOOperationCallback.java
index 7c483f3..4725d7a 100644
--- a/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMRTreeIOOperationCallback.java
+++ b/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMRTreeIOOperationCallback.java
@@ -22,6 +22,7 @@
import java.util.List;
import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.storage.am.common.api.IMetaDataPageManager;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent;
import org.apache.hyracks.storage.am.lsm.common.api.LSMOperationType;
import org.apache.hyracks.storage.am.lsm.rtree.impls.LSMRTreeDiskComponent;
@@ -39,7 +40,6 @@
if (newComponent != null) {
LSMRTreeDiskComponent rtreeComponent = (LSMRTreeDiskComponent) newComponent;
putLSNIntoMetadata(rtreeComponent.getRTree(), oldComponents);
- putLSNIntoMetadata(rtreeComponent.getBTree(), oldComponents);
}
}
@@ -56,18 +56,18 @@
long maxLSN = -1;
for (Object o : diskComponents) {
LSMRTreeDiskComponent rtreeComponent = (LSMRTreeDiskComponent) o;
- maxLSN = Math.max(getTreeIndexLSN(rtreeComponent.getRTree()), maxLSN);
+ maxLSN = Math.max(AbstractLSMIOOperationCallback.getTreeIndexLSN(rtreeComponent.getRTree()), maxLSN);
}
return maxLSN;
}
@Override
- public boolean componentFileHasLSN(String componentFilePath) {
- if (componentFilePath.endsWith(LSMRTreeFileManager.RTREE_STRING)
- || componentFilePath.endsWith(LSMRTreeFileManager.BTREE_STRING)) {
- return true;
+ public long getComponentFileLSNOffset(ILSMComponent diskComponent, String diskComponentFilePath)
+ throws HyracksDataException {
+ if (diskComponentFilePath.endsWith(LSMRTreeFileManager.RTREE_STRING)) {
+ LSMRTreeDiskComponent rtreeComponent = (LSMRTreeDiskComponent) diskComponent;
+ return rtreeComponent.getRTree().getMetaManager().getLSNOffset();
}
-
- return false;
+ return IMetaDataPageManager.INVALID_LSN_OFFSET;
}
}
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogRecord.java b/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogRecord.java
index 9694949..7c76e98 100644
--- a/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogRecord.java
+++ b/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogRecord.java
@@ -119,9 +119,9 @@
public void setNodeId(String nodeId);
- public int serialize(ByteBuffer buffer);
+ public int writeRemoteRecoveryLog(ByteBuffer buffer);
- public void deserialize(ByteBuffer buffer, boolean remoteRecoveryLog, String localNodeId);
+ public void readRemoteLog(ByteBuffer buffer, boolean remoteRecoveryLog, String localNodeId);
public void setReplicationThread(IReplicationThread replicationThread);
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/transactions/LogRecord.java b/asterix-common/src/main/java/org/apache/asterix/common/transactions/LogRecord.java
index bbbe59e..f837df2 100644
--- a/asterix-common/src/main/java/org/apache/asterix/common/transactions/LogRecord.java
+++ b/asterix-common/src/main/java/org/apache/asterix/common/transactions/LogRecord.java
@@ -356,7 +356,7 @@
}
@Override
- public void deserialize(ByteBuffer buffer, boolean remoteRecoveryLog, String localNodeId) {
+ public void readRemoteLog(ByteBuffer buffer, boolean remoteRecoveryLog, String localNodeId) {
readLogHeader(buffer);
if (!remoteRecoveryLog || !nodeId.equals(localNodeId)) {
readLogBody(buffer, false);
@@ -370,6 +370,11 @@
LSN = buffer.getLong();
numOfFlushedIndexes = buffer.getInt();
}
+
+ //remote recovery logs need to have the LSN to check which should be replayed
+ if (remoteRecoveryLog && nodeId.equals(localNodeId)) {
+ LSN = buffer.getLong();
+ }
}
private ITupleReference readPKValue(ByteBuffer buffer) {
@@ -499,15 +504,14 @@
}
@Override
- public int serialize(ByteBuffer buffer) {
+ public int writeRemoteRecoveryLog(ByteBuffer buffer) {
int bufferBegin = buffer.position();
writeLogRecordCommonFields(buffer);
-
if (logType == LogType.FLUSH) {
buffer.putLong(LSN);
buffer.putInt(numOfFlushedIndexes);
}
-
+ //LSN must be included in all remote recovery logs (not only FLUSH)
buffer.putLong(LSN);
return buffer.position() - bufferBegin;
}
diff --git a/asterix-replication/src/main/java/org/apache/asterix/replication/functions/AsterixReplicationProtocol.java b/asterix-replication/src/main/java/org/apache/asterix/replication/functions/AsterixReplicationProtocol.java
index f75ed6b..8e020eb 100644
--- a/asterix-replication/src/main/java/org/apache/asterix/replication/functions/AsterixReplicationProtocol.java
+++ b/asterix-replication/src/main/java/org/apache/asterix/replication/functions/AsterixReplicationProtocol.java
@@ -140,14 +140,14 @@
return bb;
}
- public static void writeReplicateLogRequest(ByteBuffer requestBuffer, ILogRecord logRecord) {
+ public static void writeRemoteRecoveryLogRequest(ByteBuffer requestBuffer, ILogRecord logRecord) {
requestBuffer.clear();
//put request type (4 bytes)
requestBuffer.putInt(ReplicationRequestType.REPLICATE_LOG.ordinal());
//leave space for log size
requestBuffer.position(requestBuffer.position() + Integer.BYTES);
- int logSize = logRecord.serialize(requestBuffer);
- //put request type (4 bytes)
+ int logSize = logRecord.writeRemoteRecoveryLog(requestBuffer);
+ //put request size (4 bytes)
requestBuffer.putInt(4, logSize);
requestBuffer.flip();
}
diff --git a/asterix-replication/src/main/java/org/apache/asterix/replication/logging/ReplicationLogBuffer.java b/asterix-replication/src/main/java/org/apache/asterix/replication/logging/ReplicationLogBuffer.java
index c0af2c8..38be05e 100644
--- a/asterix-replication/src/main/java/org/apache/asterix/replication/logging/ReplicationLogBuffer.java
+++ b/asterix-replication/src/main/java/org/apache/asterix/replication/logging/ReplicationLogBuffer.java
@@ -89,6 +89,7 @@
full.set(false);
appendOffset = 0;
flushOffset = 0;
+ stop = false;
}
public void flush() {
diff --git a/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationChannel.java b/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationChannel.java
index 5a88626..e6b2ebf 100644
--- a/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationChannel.java
+++ b/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationChannel.java
@@ -70,6 +70,7 @@
import org.apache.asterix.replication.storage.LSMComponentProperties;
import org.apache.asterix.replication.storage.ReplicaResourcesManager;
import org.apache.hyracks.api.application.INCApplicationContext;
+import org.apache.hyracks.storage.am.common.api.IMetaDataPageManager;
import org.apache.hyracks.storage.am.lsm.common.api.LSMOperationType;
import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndex;
@@ -272,7 +273,8 @@
Set<Long> requestedIndexesToBeFlushed = request.getLaggingRescouresIds();
//2. check which indexes can be flushed (open indexes) and which cannot be flushed (closed or have empty memory component)
- IDatasetLifecycleManager datasetLifeCycleManager = asterixAppRuntimeContextProvider.getDatasetLifecycleManager();
+ IDatasetLifecycleManager datasetLifeCycleManager = asterixAppRuntimeContextProvider
+ .getDatasetLifecycleManager();
List<IndexInfo> openIndexesInfo = datasetLifeCycleManager.getOpenIndexesInfo();
Set<Integer> datasetsToForceFlush = new HashSet<Integer>();
for (IndexInfo iInfo : openIndexesInfo) {
@@ -336,9 +338,9 @@
}
if (afp.isLSMComponentFile()) {
String compoentId = LSMComponentProperties.getLSMComponentID(afp.getFilePath(), afp.getNodeId());
- if (afp.isRequireLSNSync()) {
+ if (afp.getLSNByteOffset() != IMetaDataPageManager.INVALID_LSN_OFFSET) {
LSMComponentLSNSyncTask syncTask = new LSMComponentLSNSyncTask(compoentId,
- destFile.getAbsolutePath());
+ destFile.getAbsolutePath(), afp.getLSNByteOffset());
lsmComponentRemoteLSN2LocalLSNMappingTaskQ.offer(syncTask);
} else {
updateLSMComponentRemainingFiles(compoentId);
@@ -384,8 +386,8 @@
try (RandomAccessFile fromFile = new RandomAccessFile(filePath, "r");
FileChannel fileChannel = fromFile.getChannel();) {
long fileSize = fileChannel.size();
- fileProperties.initialize(filePath, fileSize, replicaId, false, false, false);
-
+ fileProperties.initialize(filePath, fileSize, replicaId, false,
+ IMetaDataPageManager.INVALID_LSN_OFFSET, false);
outBuffer = AsterixReplicationProtocol.writeFileReplicationRequest(outBuffer, fileProperties,
ReplicationRequestType.REPLICATE_FILE);
@@ -437,7 +439,7 @@
//set log source to REMOTE_RECOVERY to avoid re-logging on the recipient side
logRecord.setLogSource(LogSource.REMOTE_RECOVERY);
- AsterixReplicationProtocol.writeReplicateLogRequest(outBuffer, logRecord);
+ AsterixReplicationProtocol.writeRemoteRecoveryLogRequest(outBuffer, logRecord);
NetworkingUtil.transferBufferToChannel(socketChannel, outBuffer);
}
logRecord = logReader.next();
@@ -475,7 +477,7 @@
inBuffer = AsterixReplicationProtocol.readRequest(socketChannel, inBuffer);
//Deserialize log
- remoteLog.deserialize(inBuffer, false, localNodeID);
+ remoteLog.readRemoteLog(inBuffer, false, localNodeID);
remoteLog.setLogSource(LogSource.REMOTE);
if (remoteLog.getLogType() == LogType.JOB_COMMIT) {
@@ -547,7 +549,7 @@
LSMComponentLSNSyncTask syncTask = lsmComponentRemoteLSN2LocalLSNMappingTaskQ.take();
LSMComponentProperties lsmCompProp = lsmComponentId2PropertiesMap.get(syncTask.getComponentId());
try {
- syncLSMComponentFlushLSN(lsmCompProp, syncTask.getComponentFilePath());
+ syncLSMComponentFlushLSN(lsmCompProp, syncTask);
updateLSMComponentRemainingFiles(lsmCompProp.getComponentId());
} catch (Exception e) {
e.printStackTrace();
@@ -558,7 +560,8 @@
}
}
- private void syncLSMComponentFlushLSN(LSMComponentProperties lsmCompProp, String filePath) throws Exception {
+ private void syncLSMComponentFlushLSN(LSMComponentProperties lsmCompProp, LSMComponentLSNSyncTask syncTask)
+ throws Exception {
long remoteLSN = lsmCompProp.getOriginalLSN();
//LSN=0 (bulkload) does not need to be updated and there is no flush log corresponding to it
if (remoteLSN == 0) {
@@ -611,7 +614,7 @@
}
}
- Path path = Paths.get(filePath);
+ Path path = Paths.get(syncTask.getComponentFilePath());
if (Files.notExists(path)) {
/*
* This could happen when a merged component arrives and deletes the flushed
@@ -621,7 +624,7 @@
return;
}
- File destFile = new File(filePath);
+ File destFile = new File(syncTask.getComponentFilePath());
ByteBuffer metadataBuffer = ByteBuffer.allocate(Long.BYTES);
metadataBuffer.putLong(lsmCompProp.getReplicaLSN());
metadataBuffer.flip();
@@ -629,12 +632,12 @@
//replace the remote LSN value by the local one
try (RandomAccessFile fileOutputStream = new RandomAccessFile(destFile, "rw");
FileChannel fileChannel = fileOutputStream.getChannel()) {
+ long lsnStartOffset = syncTask.getLSNByteOffset();
while (metadataBuffer.hasRemaining()) {
- fileChannel.write(metadataBuffer, lsmCompProp.getLSNOffset());
+ lsnStartOffset += fileChannel.write(metadataBuffer, lsnStartOffset);
}
fileChannel.force(true);
}
}
}
-
}
\ No newline at end of file
diff --git a/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationManager.java b/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationManager.java
index 3e29828..36e5dff 100644
--- a/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationManager.java
+++ b/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationManager.java
@@ -81,6 +81,8 @@
import org.apache.hyracks.api.replication.IReplicationJob.ReplicationExecutionType;
import org.apache.hyracks.api.replication.IReplicationJob.ReplicationJobType;
import org.apache.hyracks.api.replication.IReplicationJob.ReplicationOperation;
+import org.apache.hyracks.storage.am.common.api.IMetaDataPageManager;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexReplicationJob;
import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndex;
@@ -296,13 +298,16 @@
long fileSize = fileChannel.size();
if (LSMComponentJob != null) {
- boolean requireLSNSync = AsterixLSMIndexUtil.lsmComponentFileHasLSN(
- (AbstractLSMIndex) LSMComponentJob.getLSMIndex(), filePath);
+ //since this is LSM_COMPONENT REPLICATE job, the job will contain only the component being replicated.
+ ILSMComponent diskComponent = LSMComponentJob.getLSMIndexOperationContext()
+ .getComponentsToBeReplicated().get(0);
+ long LSNByteOffset = AsterixLSMIndexUtil.getComponentFileLSNOffset(
+ (AbstractLSMIndex) LSMComponentJob.getLSMIndex(), diskComponent, filePath);
asterixFileProperties.initialize(filePath, fileSize, nodeId, isLSMComponentFile,
- requireLSNSync, remainingFiles == 0);
+ LSNByteOffset, remainingFiles == 0);
} else {
- asterixFileProperties.initialize(filePath, fileSize, nodeId, isLSMComponentFile, false,
- remainingFiles == 0);
+ asterixFileProperties.initialize(filePath, fileSize, nodeId, isLSMComponentFile,
+ IMetaDataPageManager.INVALID_LSN_OFFSET, remainingFiles == 0);
}
requestBuffer = AsterixReplicationProtocol.writeFileReplicationRequest(requestBuffer,
@@ -343,8 +348,8 @@
} else if (job.getOperation() == ReplicationOperation.DELETE) {
for (String filePath : job.getJobFiles()) {
remainingFiles--;
- asterixFileProperties.initialize(filePath, -1, nodeId, isLSMComponentFile, false,
- remainingFiles == 0);
+ asterixFileProperties.initialize(filePath, -1, nodeId, isLSMComponentFile,
+ IMetaDataPageManager.INVALID_LSN_OFFSET, remainingFiles == 0);
AsterixReplicationProtocol.writeFileReplicationRequest(requestBuffer, asterixFileProperties,
ReplicationRequestType.DELETE_FILE);
@@ -1068,7 +1073,7 @@
ILogRecord logRecord = new LogRecord();
while (responseType != ReplicationRequestType.GOODBYE) {
dataBuffer = AsterixReplicationProtocol.readRequest(socketChannel, dataBuffer);
- logRecord.deserialize(dataBuffer, true, nodeId);
+ logRecord.readRemoteLog(dataBuffer, true, nodeId);
if (logRecord.getNodeId().equals(nodeId)) {
//store log in memory to replay it for recovery
@@ -1243,5 +1248,4 @@
}
}
}
-
}
diff --git a/asterix-replication/src/main/java/org/apache/asterix/replication/storage/AsterixLSMIndexFileProperties.java b/asterix-replication/src/main/java/org/apache/asterix/replication/storage/AsterixLSMIndexFileProperties.java
index 30f2afc..5e0c9b0 100644
--- a/asterix-replication/src/main/java/org/apache/asterix/replication/storage/AsterixLSMIndexFileProperties.java
+++ b/asterix-replication/src/main/java/org/apache/asterix/replication/storage/AsterixLSMIndexFileProperties.java
@@ -24,6 +24,7 @@
import java.io.IOException;
import java.io.OutputStream;
+import org.apache.hyracks.storage.am.common.api.IMetaDataPageManager;
import org.apache.hyracks.storage.am.common.util.IndexFileNameUtil;
public class AsterixLSMIndexFileProperties {
@@ -35,36 +36,36 @@
private int ioDeviceNum;
private String idxName;
private boolean lsmComponentFile;
- private boolean requireLSNSync;
private String filePath;
private boolean requiresAck = false;
+ private long LSNByteOffset;
public AsterixLSMIndexFileProperties() {
}
public AsterixLSMIndexFileProperties(String filePath, long fileSize, String nodeId, boolean lsmComponentFile,
- boolean requireLSNSync, boolean requiresAck) {
- initialize(filePath, fileSize, nodeId, lsmComponentFile, requireLSNSync, requiresAck);
+ long LSNByteOffset, boolean requiresAck) {
+ initialize(filePath, fileSize, nodeId, lsmComponentFile, LSNByteOffset, requiresAck);
}
public AsterixLSMIndexFileProperties(LSMComponentProperties lsmComponentProperties) {
- initialize(lsmComponentProperties.getComponentId(), -1, lsmComponentProperties.getNodeId(), false, false, false);
+ initialize(lsmComponentProperties.getComponentId(), -1, lsmComponentProperties.getNodeId(), false,
+ IMetaDataPageManager.INVALID_LSN_OFFSET, false);
}
- public void initialize(String filePath, long fileSize, String nodeId, boolean lsmComponentFile,
- boolean requireLSNSync, boolean requiresAck) {
+ public void initialize(String filePath, long fileSize, String nodeId, boolean lsmComponentFile, long LSNByteOffset,
+ boolean requiresAck) {
this.filePath = filePath;
this.fileSize = fileSize;
this.nodeId = nodeId;
String[] tokens = filePath.split(File.separator);
-
int arraySize = tokens.length;
this.fileName = tokens[arraySize - 1];
this.ioDeviceNum = getDeviceIONumFromName(tokens[arraySize - 2]);
this.idxName = tokens[arraySize - 3];
this.dataverse = tokens[arraySize - 4];
this.lsmComponentFile = lsmComponentFile;
- this.requireLSNSync = requireLSNSync;
+ this.LSNByteOffset = LSNByteOffset;
this.requiresAck = requiresAck;
}
@@ -78,7 +79,7 @@
dos.writeUTF(filePath);
dos.writeLong(fileSize);
dos.writeBoolean(lsmComponentFile);
- dos.writeBoolean(requireLSNSync);
+ dos.writeLong(LSNByteOffset);
dos.writeBoolean(requiresAck);
}
@@ -87,10 +88,10 @@
String filePath = input.readUTF();
long fileSize = input.readLong();
boolean lsmComponentFile = input.readBoolean();
- boolean requireLSNSync = input.readBoolean();
+ long LSNByteOffset = input.readLong();
boolean requiresAck = input.readBoolean();
AsterixLSMIndexFileProperties fileProp = new AsterixLSMIndexFileProperties();
- fileProp.initialize(filePath, fileSize, nodeId, lsmComponentFile, requireLSNSync, requiresAck);
+ fileProp.initialize(filePath, fileSize, nodeId, lsmComponentFile, LSNByteOffset, requiresAck);
return fileProp;
}
@@ -158,14 +159,6 @@
this.lsmComponentFile = lsmComponentFile;
}
- public boolean isRequireLSNSync() {
- return requireLSNSync;
- }
-
- public void setRequireLSNSync(boolean requireLSNSync) {
- this.requireLSNSync = requireLSNSync;
- }
-
public boolean requiresAck() {
return requiresAck;
}
@@ -184,6 +177,15 @@
sb.append("IDX Name: " + idxName + " ");
sb.append("isLSMComponentFile : " + lsmComponentFile + " ");
sb.append("Dataverse: " + dataverse);
+ sb.append("LSN Byte Offset: " + LSNByteOffset);
return sb.toString();
}
+
+ public long getLSNByteOffset() {
+ return LSNByteOffset;
+ }
+
+ public void setLSNByteOffset(long lSNByteOffset) {
+ LSNByteOffset = lSNByteOffset;
+ }
}
\ No newline at end of file
diff --git a/asterix-replication/src/main/java/org/apache/asterix/replication/storage/LSMComponentLSNSyncTask.java b/asterix-replication/src/main/java/org/apache/asterix/replication/storage/LSMComponentLSNSyncTask.java
index 69f7d07..450eb7d 100644
--- a/asterix-replication/src/main/java/org/apache/asterix/replication/storage/LSMComponentLSNSyncTask.java
+++ b/asterix-replication/src/main/java/org/apache/asterix/replication/storage/LSMComponentLSNSyncTask.java
@@ -21,10 +21,12 @@
public class LSMComponentLSNSyncTask {
private String componentFilePath;
private String componentId;
+ private long LSNByteOffset;
- public LSMComponentLSNSyncTask(String componentId, String componentFilePath) {
+ public LSMComponentLSNSyncTask(String componentId, String componentFilePath, long LSNByteOffset) {
this.componentId = componentId;
this.componentFilePath = componentFilePath;
+ this.setLSNByteOffset(LSNByteOffset);
}
public String getComponentFilePath() {
@@ -42,4 +44,12 @@
public void setComponentId(String componentId) {
this.componentId = componentId;
}
+
+ public long getLSNByteOffset() {
+ return LSNByteOffset;
+ }
+
+ public void setLSNByteOffset(long lSNByteOffset) {
+ LSNByteOffset = lSNByteOffset;
+ }
}
diff --git a/asterix-replication/src/main/java/org/apache/asterix/replication/storage/LSMComponentProperties.java b/asterix-replication/src/main/java/org/apache/asterix/replication/storage/LSMComponentProperties.java
index 84d5dbe..794a6e1 100644
--- a/asterix-replication/src/main/java/org/apache/asterix/replication/storage/LSMComponentProperties.java
+++ b/asterix-replication/src/main/java/org/apache/asterix/replication/storage/LSMComponentProperties.java
@@ -27,7 +27,6 @@
import org.apache.asterix.common.ioopcallbacks.AbstractLSMIOOperationCallback;
import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.storage.am.common.frames.LIFOMetaDataFrame;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexOperationContext;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexReplicationJob;
import org.apache.hyracks.storage.am.lsm.common.api.LSMOperationType;
@@ -50,9 +49,7 @@
this.nodeId = nodeId;
componentId = LSMComponentProperties.getLSMComponentID((String) job.getJobFiles().toArray()[0], nodeId);
numberOfFiles = new AtomicInteger(job.getJobFiles().size());
- originalLSN = getLSMComponentLSN((AbstractLSMIndex) job.getLSMIndex(), job.getLSMIndexOperationContext());
- //TODO this should be changed to a dynamic value when append only LSM indexes are implemented
- LSNOffset = LIFOMetaDataFrame.lsnOff;
+ originalLSN = LSMComponentProperties.getLSMComponentLSN((AbstractLSMIndex) job.getLSMIndex(), job.getLSMIndexOperationContext());
opType = job.getLSMOpType();
}
@@ -60,11 +57,11 @@
}
- public long getLSMComponentLSN(AbstractLSMIndex lsmIndex, ILSMIndexOperationContext ctx) {
+ public static long getLSMComponentLSN(AbstractLSMIndex lsmIndex, ILSMIndexOperationContext ctx) {
long componentLSN = -1;
try {
- componentLSN = ((AbstractLSMIOOperationCallback) lsmIndex.getIOOperationCallback()).getComponentLSN(ctx
- .getComponentsToBeReplicated());
+ componentLSN = ((AbstractLSMIOOperationCallback) lsmIndex.getIOOperationCallback())
+ .getComponentLSN(ctx.getComponentsToBeReplicated());
} catch (HyracksDataException e) {
e.printStackTrace();
}
@@ -149,14 +146,6 @@
this.componentId = componentId;
}
- public long getLSNOffset() {
- return LSNOffset;
- }
-
- public void setLSNOffset(long lSNOffset) {
- LSNOffset = lSNOffset;
- }
-
public long getOriginalLSN() {
return originalLSN;
}
diff --git a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogBuffer.java b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogBuffer.java
index f4100ee..45ba9bd 100644
--- a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogBuffer.java
+++ b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogBuffer.java
@@ -179,6 +179,7 @@
appendOffset = 0;
flushOffset = 0;
isLastPage = false;
+ stop = false;
}
////////////////////////////////////
diff --git a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManagerWithReplication.java b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManagerWithReplication.java
index 2a40052..1966c39 100644
--- a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManagerWithReplication.java
+++ b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManagerWithReplication.java
@@ -31,7 +31,7 @@
private IReplicationManager replicationManager;
- public LogManagerWithReplication(TransactionSubsystem txnSubsystem) throws ACIDException {
+ public LogManagerWithReplication(TransactionSubsystem txnSubsystem) {
super(txnSubsystem);
}
@@ -41,7 +41,8 @@
throw new IllegalStateException();
}
- if (logRecord.getLogType() == LogType.FLUSH) {
+ //Remote flush logs do not need to be flushed separately since they may not trigger local flush
+ if (logRecord.getLogType() == LogType.FLUSH && logRecord.getLogSource() == LogSource.LOCAL) {
flushLogsQ.offer(logRecord);
return;
}