Introducing Data Replication To AsterixDB
This change includes the following:
- Add data replication properties to cluster properties and Managix validate command.
- Introduce Data Replication components.
- Add data replication required fields to LogRecord.
- Specialized LogManager for data replication.
- Fix for invalid cluster state on nodes failure.
- ASTERIXDB-139: Fix for cleaning workspace files on startup/shutdown.
- Fix for temp datasets storage reclamation.
- Allow MetadataNode rebinding with CC.
- Add flag to checkpoint to identify sharp checkpoints.
- ASTERIXDB-1170: Fix shutdown sequence
Change-Id: I729fdd1144dbc9ff039b4bc414494860d7553810
Reviewed-on: https://asterix-gerrit.ics.uci.edu/338
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Young-Seok Kim <kisskys@gmail.com>
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/api/IAsterixAppRuntimeContext.java b/asterix-common/src/main/java/org/apache/asterix/common/api/IAsterixAppRuntimeContext.java
index f86ed8a..63851bf 100644
--- a/asterix-common/src/main/java/org/apache/asterix/common/api/IAsterixAppRuntimeContext.java
+++ b/asterix-common/src/main/java/org/apache/asterix/common/api/IAsterixAppRuntimeContext.java
@@ -25,6 +25,10 @@
import org.apache.asterix.common.exceptions.ACIDException;
import org.apache.asterix.common.exceptions.AsterixException;
import org.apache.asterix.common.feeds.api.IFeedManager;
+import org.apache.asterix.common.replication.IRemoteRecoveryManager;
+import org.apache.asterix.common.replication.IReplicaResourcesManager;
+import org.apache.asterix.common.replication.IReplicationChannel;
+import org.apache.asterix.common.replication.IReplicationManager;
import org.apache.asterix.common.transactions.ITransactionSubsystem;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.io.IIOManager;
@@ -76,4 +80,14 @@
public List<IVirtualBufferCache> getVirtualBufferCaches(int datasetID);
public IFeedManager getFeedManager();
+
+ public IRemoteRecoveryManager getRemoteRecoveryManager();
+
+ public IReplicaResourcesManager getReplicaResourcesManager();
+
+ public IReplicationManager getReplicationManager();
+
+ public IReplicationChannel getReplicationChannel();
+
+ public void initializeResourceIdFactory() throws HyracksDataException;
}
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetLifecycleManager.java b/asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetLifecycleManager.java
index e1e6d96..803e708 100644
--- a/asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetLifecycleManager.java
+++ b/asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetLifecycleManager.java
@@ -21,6 +21,7 @@
import java.util.List;
import org.apache.asterix.common.context.DatasetLifecycleManager.DatasetInfo;
+import org.apache.asterix.common.context.DatasetLifecycleManager.IndexInfo;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.storage.am.common.api.IIndex;
import org.apache.hyracks.storage.am.common.api.IIndexLifecycleManager;
@@ -38,12 +39,14 @@
/**
* Flushes all open datasets synchronously.
+ *
* @throws HyracksDataException
*/
void flushAllDatasets() throws HyracksDataException;
/**
* Schedules asynchronous flush on datasets that have memory components with first LSN < nonSharpCheckpointTargetLSN.
+ *
* @param nonSharpCheckpointTargetLSN
* @throws HyracksDataException
*/
@@ -51,6 +54,7 @@
/**
* creates (if necessary) and returns the dataset info.
+ *
* @param datasetID
* @return
*/
@@ -67,6 +71,7 @@
/**
* creates (if necessary) and returns the primary index operation tracker of a dataset.
+ *
* @param datasetID
* @return
*/
@@ -74,8 +79,19 @@
/**
* creates (if necessary) and returns the dataset virtual buffer caches.
+ *
* @param datasetID
* @return
*/
List<IVirtualBufferCache> getVirtualBufferCaches(int datasetID);
+
+ /**
+ * Flushes then closes all open datasets
+ */
+ void closeAllDatasets() throws HyracksDataException;
+
+ /**
+ * @return a list of all indexes that are open at the time of the call.
+ */
+ List<IndexInfo> getOpenIndexesInfo();
}
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixReplicationProperties.java b/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixReplicationProperties.java
new file mode 100644
index 0000000..1ef7e3e
--- /dev/null
+++ b/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixReplicationProperties.java
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.config;
+
+import java.util.HashSet;
+import java.util.Set;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.asterix.common.replication.Replica;
+import org.apache.asterix.event.schema.cluster.Cluster;
+import org.apache.asterix.event.schema.cluster.Node;
+
+public class AsterixReplicationProperties extends AbstractAsterixProperties {
+
+ private static final Logger LOGGER = Logger.getLogger(AsterixReplicationProperties.class.getName());
+
+ private static int REPLICATION_DATAPORT_DEFAULT = 2000;
+ private static int REPLICATION_FACTOR_DEFAULT = 1;
+ private static int REPLICATION_TIME_OUT_DEFAULT = 15;
+
+ private static final String NODE_IP_ADDRESS_DEFAULT = "127.0.0.1";
+ private static final String REPLICATION_STORE_DEFAULT = "asterix-replication";
+ private final String NODE_NAME_PREFIX;
+ private final Cluster cluster;
+
+ public AsterixReplicationProperties(AsterixPropertiesAccessor accessor, Cluster cluster) {
+ super(accessor);
+ this.cluster = cluster;
+
+ if (cluster != null) {
+ NODE_NAME_PREFIX = cluster.getInstanceName() + "_";
+ } else {
+ NODE_NAME_PREFIX = "";
+ }
+ }
+
+ public boolean isReplicationEnabled() {
+ if (cluster != null && cluster.getDataReplication() != null) {
+ if (getReplicationFactor() == 1) {
+ return false;
+ }
+
+ return cluster.getDataReplication().isEnabled();
+
+ } else {
+ return false;
+ }
+ }
+
+ public String getReplicaIPAddress(String nodeId) {
+ if (cluster != null) {
+
+ for (int i = 0; i < cluster.getNode().size(); i++) {
+ Node node = cluster.getNode().get(i);
+ if (getRealCluserNodeID(node.getId()).equals(nodeId)) {
+ return node.getClusterIp();
+ }
+ }
+ }
+ return NODE_IP_ADDRESS_DEFAULT;
+ }
+
+ public int getDataReplicationPort(String nodeId) {
+ if (cluster != null) {
+ return cluster.getDataReplication().getReplicationPort().intValue();
+ }
+
+ return REPLICATION_DATAPORT_DEFAULT;
+ }
+
+ public Set<Replica> getRemoteReplicas(String nodeId) {
+ Set<Replica> remoteReplicas = new HashSet<Replica>();;
+
+ int numberOfRemoteReplicas = getReplicationFactor() - 1;
+
+ //Using chained-declustering
+ if (cluster != null) {
+ int nodeIndex = -1;
+ for (int i = 0; i < cluster.getNode().size(); i++) {
+ Node node = cluster.getNode().get(i);
+ if (getRealCluserNodeID(node.getId()).equals(nodeId)) {
+ nodeIndex = i;
+ break;
+ }
+ }
+
+ if (nodeIndex == -1) {
+ LOGGER.log(Level.WARNING, "Could not find node " + getRealCluserNodeID(nodeId)
+ + " in cluster configurations");
+ return null;
+ }
+
+ for (int i = nodeIndex + 1; i < cluster.getNode().size(); i++) {
+ remoteReplicas.add(getReplicaByNodeIndex(i));
+
+ if (remoteReplicas.size() == numberOfRemoteReplicas) {
+ break;
+ }
+ }
+
+ if (remoteReplicas.size() != numberOfRemoteReplicas) {
+ for (int i = 0; i < cluster.getNode().size(); i++) {
+
+ remoteReplicas.add(getReplicaByNodeIndex(i));
+
+ if (remoteReplicas.size() == numberOfRemoteReplicas) {
+ break;
+ }
+ }
+ }
+ }
+ return remoteReplicas;
+ }
+
+ private Replica getReplicaByNodeIndex(int nodeIndex) {
+ Node node = cluster.getNode().get(nodeIndex);
+ Node replicaNode = new Node();
+ replicaNode.setId(getRealCluserNodeID(node.getId()));
+ replicaNode.setClusterIp(node.getClusterIp());
+ return new Replica(replicaNode);
+ }
+
+ public Replica getReplicaById(String nodeId) {
+ int nodeIndex = -1;
+ if (cluster != null) {
+ for (int i = 0; i < cluster.getNode().size(); i++) {
+ Node node = cluster.getNode().get(i);
+
+ if (getRealCluserNodeID(node.getId()).equals(nodeId)) {
+ nodeIndex = i;
+ break;
+ }
+ }
+ }
+
+ if (nodeIndex < 0) {
+ return null;
+ }
+
+ return getReplicaByNodeIndex(nodeIndex);
+ }
+
+ public Set<String> getRemoteReplicasIds(String nodeId) {
+ Set<String> remoteReplicasIds = new HashSet<String>();
+ Set<Replica> remoteReplicas = getRemoteReplicas(nodeId);
+
+ for (Replica replica : remoteReplicas) {
+ remoteReplicasIds.add(replica.getId());
+ }
+
+ return remoteReplicasIds;
+ }
+
+ public String getRealCluserNodeID(String nodeId) {
+ return NODE_NAME_PREFIX + nodeId;
+ }
+
+ public Set<String> getNodeReplicasIds(String nodeId) {
+ Set<String> replicaIds = new HashSet<String>();
+ replicaIds.add(nodeId);
+ replicaIds.addAll(getRemoteReplicasIds(nodeId));
+ return replicaIds;
+ }
+
+ public String getReplicationStore() {
+ if (cluster != null) {
+ return cluster.getDataReplication().getReplicationStore();
+ }
+ return REPLICATION_STORE_DEFAULT;
+ }
+
+ public int getReplicationFactor() {
+ if (cluster != null) {
+ if (cluster.getDataReplication() == null || cluster.getDataReplication().getReplicationFactor() == null) {
+ return REPLICATION_FACTOR_DEFAULT;
+ }
+ return cluster.getDataReplication().getReplicationFactor().intValue();
+ }
+ return REPLICATION_FACTOR_DEFAULT;
+ }
+
+ public int getReplicationTimeOut() {
+ if (cluster != null) {
+ return cluster.getDataReplication().getReplicationTimeOut().intValue();
+ }
+ return REPLICATION_TIME_OUT_DEFAULT;
+ }
+
+}
\ No newline at end of file
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/config/IAsterixPropertiesProvider.java b/asterix-common/src/main/java/org/apache/asterix/common/config/IAsterixPropertiesProvider.java
index 93c58be..e6f383f 100644
--- a/asterix-common/src/main/java/org/apache/asterix/common/config/IAsterixPropertiesProvider.java
+++ b/asterix-common/src/main/java/org/apache/asterix/common/config/IAsterixPropertiesProvider.java
@@ -28,8 +28,10 @@
public AsterixMetadataProperties getMetadataProperties();
public AsterixExternalProperties getExternalProperties();
-
+
public AsterixFeedProperties getFeedProperties();
AsterixBuildProperties getBuildProperties();
+
+ public AsterixReplicationProperties getReplicationProperties();
}
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/context/BaseOperationTracker.java b/asterix-common/src/main/java/org/apache/asterix/common/context/BaseOperationTracker.java
index 776549a..21500b7 100644
--- a/asterix-common/src/main/java/org/apache/asterix/common/context/BaseOperationTracker.java
+++ b/asterix-common/src/main/java/org/apache/asterix/common/context/BaseOperationTracker.java
@@ -39,7 +39,8 @@
@Override
public void beforeOperation(ILSMIndex index, LSMOperationType opType, ISearchOperationCallback searchCallback,
IModificationOperationCallback modificationCallback) throws HyracksDataException {
- if (opType == LSMOperationType.FLUSH || opType == LSMOperationType.MERGE) {
+ if (opType == LSMOperationType.FLUSH || opType == LSMOperationType.MERGE
+ || opType == LSMOperationType.REPLICATE) {
dsInfo.declareActiveIOOperation();
}
}
@@ -47,7 +48,8 @@
@Override
public void afterOperation(ILSMIndex index, LSMOperationType opType, ISearchOperationCallback searchCallback,
IModificationOperationCallback modificationCallback) throws HyracksDataException {
- if (opType == LSMOperationType.FLUSH || opType == LSMOperationType.MERGE) {
+ if (opType == LSMOperationType.FLUSH || opType == LSMOperationType.MERGE
+ || opType == LSMOperationType.REPLICATE) {
dsInfo.undeclareActiveIOOperation();
}
}
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java b/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
index 20b07fa..adf1152 100644
--- a/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
+++ b/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
@@ -74,6 +74,7 @@
capacity = storageProperties.getMemoryComponentGlobalBudget();
used = 0;
logRecord = new LogRecord();
+ logRecord.setNodeId(logManager.getNodeId());
}
@Override
@@ -112,10 +113,10 @@
if (dsInfo.indexes.containsKey(resourceID)) {
throw new HyracksDataException("Index with resource ID " + resourceID + " already exists.");
}
- dsInfo.indexes.put(resourceID, new IndexInfo((ILSMIndex) index));
+ dsInfo.indexes.put(resourceID, new IndexInfo((ILSMIndex) index, dsInfo.datasetID, resourceID));
}
- private int getDIDfromResourceName(String resourceName) throws HyracksDataException {
+ public int getDIDfromResourceName(String resourceName) throws HyracksDataException {
LocalResource lr = resourceRepository.getResourceByName(resourceName);
if (lr == null) {
return -1;
@@ -123,7 +124,7 @@
return ((ILocalResourceMetadata) lr.getResourceObject()).getDatasetID();
}
- private long getResourceIDfromResourceName(String resourceName) throws HyracksDataException {
+ public long getResourceIDfromResourceName(String resourceName) throws HyracksDataException {
LocalResource lr = resourceRepository.getResourceByName(resourceName);
if (lr == null) {
return -1;
@@ -279,15 +280,25 @@
@Override
public synchronized List<IIndex> getOpenIndexes() {
+ List<IndexInfo> openIndexesInfo = getOpenIndexesInfo();
List<IIndex> openIndexes = new ArrayList<IIndex>();
+ for (IndexInfo iInfo : openIndexesInfo) {
+ openIndexes.add(iInfo.index);
+ }
+ return openIndexes;
+ }
+
+ @Override
+ public synchronized List<IndexInfo> getOpenIndexesInfo() {
+ List<IndexInfo> openIndexesInfo = new ArrayList<IndexInfo>();
for (DatasetInfo dsInfo : datasetInfos.values()) {
for (IndexInfo iInfo : dsInfo.indexes.values()) {
if (iInfo.isOpen) {
- openIndexes.add(iInfo.index);
+ openIndexesInfo.add(iInfo);
}
}
}
- return openIndexes;
+ return openIndexesInfo;
}
public List<IVirtualBufferCache> getVirtualBufferCaches(int datasetID) {
@@ -358,11 +369,27 @@
}
}
- private class IndexInfo extends Info {
+ public class IndexInfo extends Info {
private final ILSMIndex index;
+ private final long resourceId;
+ private final int datasetId;
- public IndexInfo(ILSMIndex index) {
+ public IndexInfo(ILSMIndex index, int datasetId, long resourceId) {
this.index = index;
+ this.datasetId = datasetId;
+ this.resourceId = resourceId;
+ }
+
+ public ILSMIndex getIndex() {
+ return index;
+ }
+
+ public long getResourceId() {
+ return resourceId;
+ }
+
+ public int getDatasetId() {
+ return datasetId;
}
}
@@ -456,14 +483,6 @@
+ ", lastAccess: " + lastAccess + ", isRegistered: " + isRegistered + ", memoryAllocated: "
+ memoryAllocated;
}
-
- public boolean isMemoryAllocated() {
- return memoryAllocated;
- }
-
- public int getDatasetID() {
- return datasetID;
- }
}
@Override
@@ -520,7 +539,7 @@
private void flushDatasetOpenIndexes(DatasetInfo dsInfo, boolean asyncFlush) throws HyracksDataException {
if (!dsInfo.isExternal) {
synchronized (logRecord) {
- logRecord.formFlushLogRecord(dsInfo.datasetID, null);
+ logRecord.formFlushLogRecord(dsInfo.datasetID, null, dsInfo.indexes.size());
try {
logManager.log(logRecord);
} catch (ACIDException e) {
@@ -588,15 +607,19 @@
removeDatasetFromCache(dsInfo.datasetID);
}
+ public void closeAllDatasets() throws HyracksDataException {
+ for (DatasetInfo dsInfo : datasetInfos.values()) {
+ closeDataset(dsInfo);
+ }
+ }
+
@Override
public synchronized void stop(boolean dumpState, OutputStream outputStream) throws IOException {
if (dumpState) {
dumpState(outputStream);
}
- for (DatasetInfo dsInfo : datasetInfos.values()) {
- closeDataset(dsInfo);
- }
+ closeAllDatasets();
datasetVirtualBufferCaches.clear();
datasetOpTrackers.clear();
@@ -686,4 +709,4 @@
int did = Integer.parseInt(resourceName);
allocateDatasetMemory(did);
}
-}
\ No newline at end of file
+}
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java b/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java
index b4a3ac9..437fac4 100644
--- a/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java
+++ b/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java
@@ -59,7 +59,8 @@
IModificationOperationCallback modificationCallback) throws HyracksDataException {
if (opType == LSMOperationType.MODIFICATION || opType == LSMOperationType.FORCE_MODIFICATION) {
incrementNumActiveOperations(modificationCallback);
- } else if (opType == LSMOperationType.FLUSH || opType == LSMOperationType.MERGE) {
+ } else if (opType == LSMOperationType.FLUSH || opType == LSMOperationType.MERGE
+ || opType == LSMOperationType.REPLICATE) {
dsInfo.declareActiveIOOperation();
}
}
@@ -68,7 +69,8 @@
public void afterOperation(ILSMIndex index, LSMOperationType opType, ISearchOperationCallback searchCallback,
IModificationOperationCallback modificationCallback) throws HyracksDataException {
// Searches are immediately considered complete, because they should not prevent the execution of flushes.
- if (opType == LSMOperationType.FLUSH || opType == LSMOperationType.MERGE) {
+ if (opType == LSMOperationType.FLUSH || opType == LSMOperationType.MERGE
+ || opType == LSMOperationType.REPLICATE) {
completeOperation(index, opType, searchCallback, modificationCallback);
}
}
@@ -84,7 +86,8 @@
} else if (numActiveOperations.get() < 0) {
throw new HyracksDataException("The number of active operations cannot be negative!");
}
- } else if (opType == LSMOperationType.FLUSH || opType == LSMOperationType.MERGE) {
+ } else if (opType == LSMOperationType.FLUSH || opType == LSMOperationType.MERGE
+ || opType == LSMOperationType.REPLICATE) {
dsInfo.undeclareActiveIOOperation();
}
}
@@ -119,7 +122,7 @@
}
LogRecord logRecord = new LogRecord();
- logRecord.formFlushLogRecord(datasetID, this);
+ logRecord.formFlushLogRecord(datasetID, this, logManager.getNodeId(), dsInfo.getDatasetIndexes().size());
try {
logManager.log(logRecord);
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/dataflow/AsterixLSMIndexUtil.java b/asterix-common/src/main/java/org/apache/asterix/common/dataflow/AsterixLSMIndexUtil.java
index 2cd0554..4259a10 100644
--- a/asterix-common/src/main/java/org/apache/asterix/common/dataflow/AsterixLSMIndexUtil.java
+++ b/asterix-common/src/main/java/org/apache/asterix/common/dataflow/AsterixLSMIndexUtil.java
@@ -41,4 +41,10 @@
}
}
}
+
+ public static boolean lsmComponentFileHasLSN(AbstractLSMIndex lsmIndex, String componentFilePath) {
+ AbstractLSMIOOperationCallback ioOpCallback = (AbstractLSMIOOperationCallback) lsmIndex
+ .getIOOperationCallback();
+ return ioOpCallback.componentFileHasLSN(componentFilePath);
+ }
}
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/AbstractLSMIOOperationCallback.java b/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/AbstractLSMIOOperationCallback.java
index abf7ba9..76a11d1 100644
--- a/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/AbstractLSMIOOperationCallback.java
+++ b/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/AbstractLSMIOOperationCallback.java
@@ -151,4 +151,7 @@
}
return false;
}
+
+ public abstract boolean componentFileHasLSN(String componentFilePath);
+
}
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMBTreeIOOperationCallback.java b/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMBTreeIOOperationCallback.java
index 12680fd..8b4fa01 100644
--- a/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMBTreeIOOperationCallback.java
+++ b/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMBTreeIOOperationCallback.java
@@ -24,6 +24,7 @@
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.storage.am.btree.impls.BTree;
import org.apache.hyracks.storage.am.lsm.btree.impls.LSMBTreeDiskComponent;
+import org.apache.hyracks.storage.am.lsm.btree.impls.LSMBTreeFileManager;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent;
import org.apache.hyracks.storage.am.lsm.common.api.LSMOperationType;
@@ -60,4 +61,13 @@
}
return maxLSN;
}
+
+ @Override
+ public boolean componentFileHasLSN(String componentFilePath) {
+ if (componentFilePath.endsWith(LSMBTreeFileManager.BTREE_STRING)) {
+ return true;
+ }
+
+ return false;
+ }
}
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMBTreeWithBuddyIOOperationCallback.java b/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMBTreeWithBuddyIOOperationCallback.java
index b5ce879..229ccd6 100644
--- a/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMBTreeWithBuddyIOOperationCallback.java
+++ b/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMBTreeWithBuddyIOOperationCallback.java
@@ -22,6 +22,7 @@
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.storage.am.lsm.btree.impls.LSMBTreeWithBuddyDiskComponent;
+import org.apache.hyracks.storage.am.lsm.btree.impls.LSMBTreeWithBuddyFileManager;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent;
import org.apache.hyracks.storage.am.lsm.common.api.LSMOperationType;
@@ -55,4 +56,14 @@
return maxLSN;
}
+ @Override
+ public boolean componentFileHasLSN(String componentFilePath) {
+ if (componentFilePath.endsWith(LSMBTreeWithBuddyFileManager.BTREE_STRING)
+ || componentFilePath.endsWith(LSMBTreeWithBuddyFileManager.BUDDY_BTREE_STRING)) {
+ return true;
+ }
+
+ return false;
+ }
+
}
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMInvertedIndexIOOperationCallback.java b/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMInvertedIndexIOOperationCallback.java
index fd3cf12..3e4ff04 100644
--- a/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMInvertedIndexIOOperationCallback.java
+++ b/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMInvertedIndexIOOperationCallback.java
@@ -25,6 +25,7 @@
import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent;
import org.apache.hyracks.storage.am.lsm.common.api.LSMOperationType;
import org.apache.hyracks.storage.am.lsm.invertedindex.impls.LSMInvertedIndexDiskComponent;
+import org.apache.hyracks.storage.am.lsm.invertedindex.impls.LSMInvertedIndexFileManager;
public class LSMInvertedIndexIOOperationCallback extends AbstractLSMIOOperationCallback {
@@ -58,4 +59,13 @@
}
return maxLSN;
}
+
+ @Override
+ public boolean componentFileHasLSN(String componentFilePath) {
+ if (componentFilePath.endsWith(LSMInvertedIndexFileManager.DELETED_KEYS_BTREE_SUFFIX)) {
+ return true;
+ }
+
+ return false;
+ }
}
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMRTreeIOOperationCallback.java b/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMRTreeIOOperationCallback.java
index 5d243a3..7c483f3 100644
--- a/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMRTreeIOOperationCallback.java
+++ b/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMRTreeIOOperationCallback.java
@@ -25,6 +25,7 @@
import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent;
import org.apache.hyracks.storage.am.lsm.common.api.LSMOperationType;
import org.apache.hyracks.storage.am.lsm.rtree.impls.LSMRTreeDiskComponent;
+import org.apache.hyracks.storage.am.lsm.rtree.impls.LSMRTreeFileManager;
public class LSMRTreeIOOperationCallback extends AbstractLSMIOOperationCallback {
@@ -59,4 +60,14 @@
}
return maxLSN;
}
+
+ @Override
+ public boolean componentFileHasLSN(String componentFilePath) {
+ if (componentFilePath.endsWith(LSMRTreeFileManager.RTREE_STRING)
+ || componentFilePath.endsWith(LSMRTreeFileManager.BTREE_STRING)) {
+ return true;
+ }
+
+ return false;
+ }
}
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/replication/AsterixReplicationJob.java b/asterix-common/src/main/java/org/apache/asterix/common/replication/AsterixReplicationJob.java
new file mode 100644
index 0000000..fc4f1ab
--- /dev/null
+++ b/asterix-common/src/main/java/org/apache/asterix/common/replication/AsterixReplicationJob.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.replication;
+
+import java.util.Set;
+
+import org.apache.hyracks.api.replication.impl.AbstractReplicationJob;
+
+/**
+ * LSMIndexReplicationJob is used for LSM Components only in Hyracks level.
+ * AsterixReplicationJob is used for everything else.
+ * Currently it is used to transfer indexes metadata files.
+ */
+public class AsterixReplicationJob extends AbstractReplicationJob {
+
+ public AsterixReplicationJob(ReplicationJobType jobType, ReplicationOperation operation,
+ ReplicationExecutionType executionType, Set<String> filesToReplicate) {
+ super(jobType, operation, executionType, filesToReplicate);
+ }
+
+}
\ No newline at end of file
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/replication/IRemoteRecoveryManager.java b/asterix-common/src/main/java/org/apache/asterix/common/replication/IRemoteRecoveryManager.java
new file mode 100644
index 0000000..63d29a0
--- /dev/null
+++ b/asterix-common/src/main/java/org/apache/asterix/common/replication/IRemoteRecoveryManager.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.replication;
+
+public interface IRemoteRecoveryManager {
+
+ public void performRemoteRecovery();
+
+}
\ No newline at end of file
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/replication/IReplicaResourcesManager.java b/asterix-common/src/main/java/org/apache/asterix/common/replication/IReplicaResourcesManager.java
new file mode 100644
index 0000000..f9481a0
--- /dev/null
+++ b/asterix-common/src/main/java/org/apache/asterix/common/replication/IReplicaResourcesManager.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.replication;
+
+import java.io.IOException;
+import java.util.Set;
+
+public interface IReplicaResourcesManager {
+
+ public String getIndexPath(String backupNodeId, int IODeviceNum, String dataverse, String dataset);
+
+ public String getLocalStorageFolder();
+
+ public long getMinRemoteLSN(Set<String> remoteNodes);
+
+ public void deleteAsterixStorageData() throws IOException;
+
+}
\ No newline at end of file
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/replication/IReplicationChannel.java b/asterix-common/src/main/java/org/apache/asterix/common/replication/IReplicationChannel.java
new file mode 100644
index 0000000..56ae20f
--- /dev/null
+++ b/asterix-common/src/main/java/org/apache/asterix/common/replication/IReplicationChannel.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.replication;
+
+import java.io.IOException;
+
+public interface IReplicationChannel {
+
+ /**
+ * Opens the replication channel and starts accepting replication requests.
+ */
+ public void start();
+
+ /**
+ * Closes the replication channel.
+ *
+ * @throws IOException
+ */
+ public void close() throws IOException;
+
+}
\ No newline at end of file
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/replication/IReplicationManager.java b/asterix-common/src/main/java/org/apache/asterix/common/replication/IReplicationManager.java
new file mode 100644
index 0000000..276d498
--- /dev/null
+++ b/asterix-common/src/main/java/org/apache/asterix/common/replication/IReplicationManager.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.replication;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Set;
+
+import org.apache.asterix.common.exceptions.ACIDException;
+import org.apache.asterix.common.transactions.ILogRecord;
+import org.apache.hyracks.api.replication.IIOReplicationManager;
+
+public interface IReplicationManager extends IIOReplicationManager {
+
+ /**
+ * Asynchronously sends a serialized version of the record to remote replicas.
+ *
+ * @param logRecord
+ * The log record to be replicated,
+ */
+ public void replicateLog(ILogRecord logRecord);
+
+ /**
+ * Checks whether a log record has been replicated
+ *
+ * @param logRecord
+ * the log to check for.
+ * @return true, if all ACKs were received from remote replicas.
+ */
+ public boolean hasBeenReplicated(ILogRecord logRecord);
+
+ /**
+ * Requests txns logs from a remote replica.
+ *
+ * @param remoteReplicaId
+ * The replica id to send the request to.
+ * @param replicasDataToRecover
+ * Get logs that belong to those replicas.
+ * @param fromLSN
+ * Low water mark for logs to be requested.
+ * @return The logs received that belong to the local node.
+ * @throws IOException
+ * @throws ACIDException
+ */
+ public ArrayList<ILogRecord> requestReplicaLogs(String remoteReplicaId, Set<String> replicasDataToRecover,
+ long fromLSN) throws IOException, ACIDException;
+
+ /**
+ * Requests LSM components files from a remote replica.
+ *
+ * @param remoteReplicaId
+ * The replica id to send the request to.
+ * @param replicasDataToRecover
+ * Get files that belong to those replicas.
+ * @throws IOException
+ */
+ public void requestReplicaFiles(String remoteReplicaId, Set<String> replicasDataToRecover) throws IOException;
+
+ /**
+ * Requests current maximum LSN from remote replicas.
+ *
+ * @param remoteReplicaIds
+ * remote replicas to send the request to.
+ * @return The maximum of the received maximum LSNs.
+ * @throws IOException
+ */
+ public long getMaxRemoteLSN(Set<String> remoteReplicaIds) throws IOException;
+
+ /**
+ * Sends the IP address of the local replica to all remote replicas.
+ *
+ * @throws IOException
+ */
+ public void broadcastNewIPAddress() throws IOException;
+
+ /**
+ * @return The number of remote replicas that are in ACTIVE state.
+ */
+ public int getActiveReplicasCount();
+
+ /**
+ * @return The IDs of the remote replicas that are in DEAD state.
+ */
+ public Set<String> getDeadReplicasIds();
+
+ /**
+ * Starts processing of ASYNC replication jobs as well as Txn logs.
+ */
+ public void startReplicationThreads();
+
+ /**
+ * Checks and sets each remote replica state.
+ */
+ public void initializeReplicasState();
+
+ /**
+ * Updates remote replica (in-memory) information.
+ *
+ * @param replica
+ * the replica to update.
+ */
+ public void updateReplicaInfo(Replica replica);
+
+ /**
+ * @return The IDs of the remote replicas that are in ACTIVE state.
+ */
+ public Set<String> getActiveReplicasIds();
+
+ /**
+ * Submits a ReplicaEvent to ReplicationEventsMonitor thread.
+ *
+ * @param event
+ */
+ public void reportReplicaEvent(ReplicaEvent event);
+
+ /**
+ * Requests the current minimum LSN of a remote replica.
+ *
+ * @param replicaId
+ * The replica to send the request to.
+ * @return The returned minimum LSN from the remote replica.
+ * @throws IOException
+ */
+ public long requestReplicaMinLSN(String replicaId) throws IOException;
+
+ /**
+ * Sends a request to remote replicas to flush indexes that have LSN less than nonSharpCheckpointTargetLSN
+ *
+ * @param nonSharpCheckpointTargetLSN
+ * @throws IOException
+ */
+ public void requestFlushLaggingReplicaIndexes(long nonSharpCheckpointTargetLSN) throws IOException;
+
+}
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/replication/IReplicationThread.java b/asterix-common/src/main/java/org/apache/asterix/common/replication/IReplicationThread.java
new file mode 100644
index 0000000..3e2569d
--- /dev/null
+++ b/asterix-common/src/main/java/org/apache/asterix/common/replication/IReplicationThread.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.replication;
+
+import org.apache.asterix.common.transactions.LogRecord;
+
+public interface IReplicationThread extends Runnable {
+
+ /**
+ * Sends a notification to this thread that logRecord has been flushed.
+ *
+ * @param logRecord
+ * The log that has been flushed.
+ */
+ public void notifyLogReplicationRequester(LogRecord logRecord);
+}
\ No newline at end of file
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/replication/Replica.java b/asterix-common/src/main/java/org/apache/asterix/common/replication/Replica.java
new file mode 100644
index 0000000..4c3f728
--- /dev/null
+++ b/asterix-common/src/main/java/org/apache/asterix/common/replication/Replica.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.replication;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.net.InetSocketAddress;
+
+import org.apache.asterix.common.config.AsterixReplicationProperties;
+import org.apache.asterix.event.schema.cluster.Node;
+
+public class Replica {
+
+ public enum ReplicaState {
+ ACTIVE,
+ DEAD,
+ UNKNOWN
+ }
+
+ final Node node;
+ private ReplicaState state = ReplicaState.UNKNOWN;
+
+ public Replica(Node node) {
+ this.node = node;
+ }
+
+ public ReplicaState getState() {
+ return state;
+ }
+
+ public void setState(ReplicaState state) {
+ this.state = state;
+ }
+
+ public Node getNode() {
+ return node;
+ }
+
+ public String getId() {
+ return node.getId();
+ }
+
+ public InetSocketAddress getAddress(AsterixReplicationProperties asterixReplicationProperties) {
+ String replicaIPAddress = node.getClusterIp();
+ int replicationPort = asterixReplicationProperties.getDataReplicationPort(node.getId());
+ InetSocketAddress replicaAddress = InetSocketAddress.createUnresolved(replicaIPAddress, replicationPort);
+ return replicaAddress;
+ }
+
+ public static Replica create(DataInput input) throws IOException {
+ Node node = new Node();
+ Replica replica = new Replica(node);
+ replica.readFields(input);
+ return replica;
+ }
+
+ public void writeFields(DataOutput output) throws IOException {
+ output.writeUTF(node.getId());
+ output.writeUTF(node.getClusterIp());
+ output.writeInt(state.ordinal());
+ }
+
+ public void readFields(DataInput input) throws IOException {
+ this.node.setId(input.readUTF());
+ this.node.setClusterIp(input.readUTF());
+ this.state = ReplicaState.values()[input.readInt()];
+ }
+
+ public void serialize(OutputStream out) throws IOException {
+ DataOutputStream dos = new DataOutputStream(out);
+ writeFields(dos);
+ }
+}
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/replication/ReplicaEvent.java b/asterix-common/src/main/java/org/apache/asterix/common/replication/ReplicaEvent.java
new file mode 100644
index 0000000..0797a02
--- /dev/null
+++ b/asterix-common/src/main/java/org/apache/asterix/common/replication/ReplicaEvent.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.replication;
+
+import java.io.DataInput;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+
+public class ReplicaEvent {
+
+ /*
+ * FAIL: remote replica failed.
+ * JOIN: remote replica rejoined the cluster.
+ * SHUTDOWN: remote replica is shutting down normally
+ * */
+ public enum ReplicaEventType {
+ FAIL,
+ JOIN,
+ SHUTDOWN
+ }
+
+ Replica replica;
+ ReplicaEventType eventType;
+
+ public ReplicaEvent(Replica replica, ReplicaEventType eventType) {
+ this.replica = replica;
+ this.eventType = eventType;
+ }
+
+ public Replica getReplica() {
+ return replica;
+ }
+
+ public void setReplica(Replica replica) {
+ this.replica = replica;
+ }
+
+ public ReplicaEventType getEventType() {
+ return eventType;
+ }
+
+ public void setEventType(ReplicaEventType eventType) {
+ this.eventType = eventType;
+ }
+
+ public void serialize(OutputStream out) throws IOException {
+ DataOutputStream dos = new DataOutputStream(out);
+ replica.writeFields(dos);
+ dos.writeInt(eventType.ordinal());
+ }
+
+ public static ReplicaEvent create(DataInput input) throws IOException {
+ Replica replica = Replica.create(input);
+ ReplicaEventType eventType = ReplicaEventType.values()[input.readInt()];
+ ReplicaEvent event = new ReplicaEvent(replica, eventType);
+ return event;
+ }
+
+}
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogBuffer.java b/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogBuffer.java
index 9e28cda..02d69e5 100644
--- a/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogBuffer.java
+++ b/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogBuffer.java
@@ -24,4 +24,6 @@
public void flush();
+ public void appendWithReplication(ILogRecord logRecord, long appendLSN);
+
}
\ No newline at end of file
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogManager.java b/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogManager.java
index fc0b407..82a0cca 100644
--- a/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogManager.java
+++ b/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogManager.java
@@ -18,40 +18,71 @@
*/
package org.apache.asterix.common.transactions;
+import java.io.IOException;
+
import org.apache.asterix.common.exceptions.ACIDException;
+import org.apache.asterix.common.replication.IReplicationManager;
public interface ILogManager {
/**
* Submits a logRecord to log Manager which appends it to the log tail
+ *
* @param logRecord
* @throws ACIDException
*/
public void log(ILogRecord logRecord) throws ACIDException;
/**
- *
* @param isRecoveryMode
* @returnLogReader instance which enables reading the log files
*/
public ILogReader getLogReader(boolean isRecoveryMode);
-
+
/**
- *
* @return the last LSN the log manager used
*/
- public long getAppendLSN();
-
+ public long getAppendLSN();
+
/**
* Deletes all log partitions which have a maximum LSN less than checkpointLSN
+ *
* @param checkpointLSN
*/
public void deleteOldLogFiles(long checkpointLSN);
-
+
/**
- *
* @return the smallest readable LSN on the current log partitions
*/
public long getReadableSmallestLSN();
+ /**
+ * @return The local NC ID
+ */
+ public String getNodeId();
+
+ /**
+ * Delete all log files and start new log partition > LSNtoStartFrom
+ *
+ * @param LSNtoStartFrom
+ * @throws IOException
+ */
+ public void renewLogFilesAndStartFromLSN(long LSNtoStartFrom) throws IOException;
+
+ /**
+ * @return the log page size in bytes
+ */
+ public int getLogPageSize();
+
+ /**
+ * @param replicationManager
+ * the replication manager to be used to replicate logs
+ */
+ public void setReplicationManager(IReplicationManager replicationManager);
+
+ /**
+ * @return the number of log pages
+ */
+ public int getNumLogPages();
+
}
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogRecord.java b/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogRecord.java
index 16c51fe..9694949 100644
--- a/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogRecord.java
+++ b/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogRecord.java
@@ -20,22 +20,22 @@
import java.nio.ByteBuffer;
+import org.apache.asterix.common.replication.IReplicationThread;
import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
public interface ILogRecord {
- public static final int JOB_TERMINATE_LOG_SIZE = 13; //JOB_COMMIT or ABORT log type
- public static final int ENTITY_COMMIT_LOG_BASE_SIZE = 25;
- public static final int UPDATE_LOG_BASE_SIZE = 54;
- public static final int FLUSH_LOG_SIZE = 17;
-
-
public enum RECORD_STATUS{
TRUNCATED,
BAD_CHKSUM,
OK
}
+ public static final int JOB_TERMINATE_LOG_SIZE = 18; //JOB_COMMIT or ABORT log type
+ public static final int ENTITY_COMMIT_LOG_BASE_SIZE = 30;
+ public static final int UPDATE_LOG_BASE_SIZE = 59;
+ public static final int FLUSH_LOG_SIZE = 22;
+
public LogRecord.RECORD_STATUS readLogRecord(ByteBuffer buffer);
public void writeLogRecord(ByteBuffer buffer);
@@ -115,4 +115,26 @@
public void setPKValue(ITupleReference PKValue);
+ public String getNodeId();
+
+ public void setNodeId(String nodeId);
+
+ public int serialize(ByteBuffer buffer);
+
+ public void deserialize(ByteBuffer buffer, boolean remoteRecoveryLog, String localNodeId);
+
+ public void setReplicationThread(IReplicationThread replicationThread);
+
+ public void setLogSource(byte logSource);
+
+ public byte getLogSource();
+
+ public int getSerializedLogSize();
+
+ public void writeLogRecord(ByteBuffer buffer, long appendLSN);
+
+ public ByteBuffer getSerializedLog();
+
+ public void formJobTerminateLogRecord(int jobId, boolean isCommit, String nodeId);
+
}
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/transactions/IRecoveryManager.java b/asterix-common/src/main/java/org/apache/asterix/common/transactions/IRecoveryManager.java
index e8d408b..9ea9957 100644
--- a/asterix-common/src/main/java/org/apache/asterix/common/transactions/IRecoveryManager.java
+++ b/asterix-common/src/main/java/org/apache/asterix/common/transactions/IRecoveryManager.java
@@ -19,6 +19,7 @@
package org.apache.asterix.common.transactions;
import java.io.IOException;
+import java.util.ArrayList;
import org.apache.asterix.common.exceptions.ACIDException;
import org.apache.hyracks.api.exceptions.HyracksDataException;
@@ -89,5 +90,25 @@
*/
public long checkpoint(boolean isSharpCheckpoint, long nonSharpCheckpointTargetLSN) throws ACIDException, HyracksDataException;
+ /**
+ * Performs recovery based on the passed logs
+ * @param remoteLogs the remote logs to be replayed
+ * @throws HyracksDataException
+ * @throws ACIDException
+ */
+ public void replayRemoteLogs(ArrayList<ILogRecord> remoteLogs) throws HyracksDataException, ACIDException;
+
+ /**
+ *
+ * @return min first LSN of the open indexes (including remote indexes if replication is enabled)
+ * @throws HyracksDataException
+ */
+ public long getMinFirstLSN() throws HyracksDataException;
+ /**
+ *
+ * @return min first LSN of the open indexes
+ * @throws HyracksDataException
+ */
+ public long getLocalMinFirstLSN() throws HyracksDataException;
}
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/transactions/LogRecord.java b/asterix-common/src/main/java/org/apache/asterix/common/transactions/LogRecord.java
index a510e51..bbbe59e 100644
--- a/asterix-common/src/main/java/org/apache/asterix/common/transactions/LogRecord.java
+++ b/asterix-common/src/main/java/org/apache/asterix/common/transactions/LogRecord.java
@@ -20,10 +20,13 @@
import java.nio.BufferUnderflowException;
import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.zip.CRC32;
import org.apache.asterix.common.context.PrimaryIndexOperationTracker;
+import org.apache.asterix.common.replication.IReplicationThread;
import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
import org.apache.hyracks.storage.am.common.tuples.SimpleTupleReference;
import org.apache.hyracks.storage.am.common.tuples.SimpleTupleWriter;
@@ -31,9 +34,12 @@
/*
* == LogRecordFormat ==
* ---------------------------
- * [Header1] (5 bytes) : for all log types
+ * [Header1] (10 bytes + NodeId Length) : for all log types
+ * LogSource(1)
* LogType(1)
* JobId(4)
+ * NodeIdLength(4)
+ * NodeId(?)
* ---------------------------
* [Header2] (12 bytes + PKValueSize) : for entity_commit and update log types
* DatasetId(4) //stored in dataset_dataset in Metadata Node
@@ -60,12 +66,15 @@
* 2) ENTITY_COMMIT: 25 + PKSize (5 + 12 + PKSize + 8)
* --> ENTITY_COMMIT_LOG_BASE_SIZE = 25
* 3) UPDATE: 54 + PKValueSize + NewValueSize (5 + 12 + PKValueSize + 20 + 9 + NewValueSize + 8)
- * 4) FLUSH: 5 + 8 + DatasetId(4)
- * --> UPDATE_LOG_BASE_SIZE = 54
+ * 4) FLUSH: 5 + 8 + DatasetId(4) (In case of serialize: + (8 bytes for LSN) + (4 bytes for number of flushed indexes)
*/
+
public class LogRecord implements ILogRecord {
//------------- fields in a log record (begin) ------------//
+ private byte logSource;
+ private String nodeId;
+ private int nodeIdLength;
private byte logType;
private int jobId;
private int datasetId;
@@ -92,6 +101,11 @@
private final CRC32 checksumGen;
private int[] PKFields;
private PrimaryIndexOperationTracker opTracker;
+ private IReplicationThread replicationThread;
+ private ByteBuffer serializedLog;
+ private final Map<String, byte[]> nodeIdsMap;
+ //this field is used for serialized flush logs only to indicate how many indexes were flushed using its LSN.
+ private int numOfFlushedIndexes;
public LogRecord() {
isFlushed = new AtomicBoolean(false);
@@ -99,29 +113,43 @@
readPKValue = new PrimaryKeyTupleReference();
readNewValue = (SimpleTupleReference) tupleWriter.createTupleReference();
checksumGen = new CRC32();
+ this.nodeIdsMap = new HashMap<String, byte[]>();
+ logSource = LogSource.LOCAL;
}
- private final static int TYPE_LEN = Byte.SIZE / Byte.SIZE;
- public final static int PKHASH_LEN = Integer.SIZE / Byte.SIZE;
- public final static int PKSZ_LEN = Integer.SIZE / Byte.SIZE;
- private final static int PRVLSN_LEN = Long.SIZE / Byte.SIZE;
- private final static int RSID_LEN = Long.SIZE / Byte.SIZE;
- private final static int LOGRCD_SZ_LEN = Integer.SIZE / Byte.SIZE;
- private final static int FLDCNT_LEN = Integer.SIZE / Byte.SIZE;
- private final static int NEWOP_LEN = Byte.SIZE / Byte.SIZE;
- private final static int NEWVALSZ_LEN = Integer.SIZE / Byte.SIZE;
- private final static int CHKSUM_LEN = Long.SIZE / Byte.SIZE;
+ private final static int LOG_SOURCE_LEN = Byte.BYTES;
+ private final static int NODE_ID_STRING_LENGTH = Integer.BYTES;
+ private final static int TYPE_LEN = Byte.BYTES;
+ public final static int PKHASH_LEN = Integer.BYTES;
+ public final static int PKSZ_LEN = Integer.BYTES;
+ private final static int PRVLSN_LEN = Long.BYTES;
+ private final static int RSID_LEN = Long.BYTES;
+ private final static int LOGRCD_SZ_LEN = Integer.BYTES;
+ private final static int FLDCNT_LEN = Integer.BYTES;
+ private final static int NEWOP_LEN = Byte.BYTES;
+ private final static int NEWVALSZ_LEN = Integer.BYTES;
+ private final static int CHKSUM_LEN = Long.BYTES;
- private final static int ALL_RECORD_HEADER_LEN = TYPE_LEN + JobId.BYTES;
+ private final static int ALL_RECORD_HEADER_LEN = LOG_SOURCE_LEN + TYPE_LEN + JobId.BYTES + NODE_ID_STRING_LENGTH;
private final static int ENTITYCOMMIT_UPDATE_HEADER_LEN = DatasetId.BYTES + PKHASH_LEN + PKSZ_LEN;
private final static int UPDATE_LSN_HEADER = PRVLSN_LEN + RSID_LEN + LOGRCD_SZ_LEN;
private final static int UPDATE_BODY_HEADER = FLDCNT_LEN + NEWOP_LEN + NEWVALSZ_LEN;
- @Override
- public void writeLogRecord(ByteBuffer buffer) {
- int beginOffset = buffer.position();
+ private void writeLogRecordCommonFields(ByteBuffer buffer) {
+ buffer.put(logSource);
buffer.put(logType);
buffer.putInt(jobId);
+ if (nodeIdsMap.containsKey(nodeId)) {
+ buffer.put(nodeIdsMap.get(nodeId));
+ } else {
+ //byte array for node id length and string
+ byte[] bytes = new byte[(Integer.SIZE / 8) + nodeId.length()];
+ buffer.putInt(nodeId.length());
+ buffer.put(nodeId.getBytes(java.nio.charset.StandardCharsets.UTF_8));
+ buffer.position(buffer.position() - bytes.length);
+ buffer.get(bytes, 0, bytes.length);
+ nodeIdsMap.put(nodeId, bytes);
+ }
if (logType == LogType.UPDATE || logType == LogType.ENTITY_COMMIT) {
buffer.putInt(datasetId);
buffer.putInt(PKHashValue);
@@ -144,15 +172,59 @@
if (logType == LogType.FLUSH) {
buffer.putInt(datasetId);
}
+ }
+
+ @Override
+ public void writeLogRecord(ByteBuffer buffer) {
+ int beginOffset = buffer.position();
+ writeLogRecordCommonFields(buffer);
+ checksum = generateChecksum(buffer, beginOffset, logSize - CHKSUM_LEN);
+ buffer.putLong(checksum);
+ }
+
+ //this method is used when replication is enabled to include the log record LSN in the serialized version
+ @Override
+ public void writeLogRecord(ByteBuffer buffer, long appendLSN) {
+ int beginOffset = buffer.position();
+ writeLogRecordCommonFields(buffer);
+
+ if (logSource == LogSource.LOCAL) {
+ //copy the serialized log to send it to replicas
+ int serializedLogSize = getSerializedLogSize(logType, logSize);
+
+ if (serializedLog == null || serializedLog.capacity() < serializedLogSize) {
+ serializedLog = ByteBuffer.allocate(serializedLogSize);
+ } else {
+ serializedLog.clear();
+ }
+
+ int currentPosition = buffer.position();
+ int currentLogSize = (currentPosition - beginOffset);
+
+ buffer.position(beginOffset);
+ buffer.get(serializedLog.array(), 0, currentLogSize);
+ serializedLog.position(currentLogSize);
+ if (logType == LogType.FLUSH) {
+ serializedLog.putLong(appendLSN);
+ serializedLog.putInt(numOfFlushedIndexes);
+ }
+ serializedLog.flip();
+ buffer.position(currentPosition);
+ }
checksum = generateChecksum(buffer, beginOffset, logSize - CHKSUM_LEN);
buffer.putLong(checksum);
}
private void writePKValue(ByteBuffer buffer) {
- int i;
- for (i = 0; i < PKFieldCnt; i++) {
- buffer.put(PKValue.getFieldData(0), PKValue.getFieldStart(PKFields[i]), PKValue.getFieldLength(PKFields[i]));
+ if (logSource == LogSource.LOCAL) {
+ for (int i = 0; i < PKFieldCnt; i++) {
+ buffer.put(PKValue.getFieldData(0), PKValue.getFieldStart(PKFields[i]),
+ PKValue.getFieldLength(PKFields[i]));
+ }
+ } else {
+ //since PKValue is already serialized in remote logs, just put it into buffer
+ buffer.put(PKValue.getFieldData(0), 0, PKValueSize);
}
}
@@ -171,13 +243,57 @@
@Override
public RECORD_STATUS readLogRecord(ByteBuffer buffer) {
int beginOffset = buffer.position();
- //first we need the logtype and Job ID, if the buffer isn't that big, then no dice.
- if (buffer.remaining() < ALL_RECORD_HEADER_LEN) {
+
+ //read header
+ RECORD_STATUS status = readLogHeader(buffer);
+ if (status != RECORD_STATUS.OK) {
+ buffer.position(beginOffset);
+ return status;
+ }
+
+ //read body
+ status = readLogBody(buffer, false);
+ if (status != RECORD_STATUS.OK) {
+ buffer.position(beginOffset);
+ return status;
+ }
+
+ //attempt to read checksum
+ if (buffer.remaining() < CHKSUM_LEN) {
buffer.position(beginOffset);
return RECORD_STATUS.TRUNCATED;
}
+ checksum = buffer.getLong();
+ if (checksum != generateChecksum(buffer, beginOffset, logSize - CHKSUM_LEN)) {
+ return RECORD_STATUS.BAD_CHKSUM;
+ }
+
+ return RECORD_STATUS.OK;
+ }
+
+ private RECORD_STATUS readLogHeader(ByteBuffer buffer) {
+ //first we need the logtype and Job ID, if the buffer isn't that big, then no dice.
+ if (buffer.remaining() < ALL_RECORD_HEADER_LEN) {
+ return RECORD_STATUS.TRUNCATED;
+ }
+ logSource = buffer.get();
logType = buffer.get();
jobId = buffer.getInt();
+ nodeIdLength = buffer.getInt();
+ //attempt to read node id
+ if (buffer.remaining() < nodeIdLength) {
+ return RECORD_STATUS.TRUNCATED;
+ }
+ //read node id string
+ nodeId = new String(buffer.array(), buffer.position() + buffer.arrayOffset(), nodeIdLength,
+ java.nio.charset.StandardCharsets.UTF_8);
+ //skip node id string bytes
+ buffer.position(buffer.position() + nodeIdLength);
+
+ return RECORD_STATUS.OK;
+ }
+
+ private RECORD_STATUS readLogBody(ByteBuffer buffer, boolean allocateTupleBuffer) {
if (logType != LogType.FLUSH) {
if (logType == LogType.JOB_COMMIT || logType == LogType.ABORT) {
datasetId = -1;
@@ -185,7 +301,6 @@
} else {
//attempt to read in the dsid, PK hash and PK length
if (buffer.remaining() < ENTITYCOMMIT_UPDATE_HEADER_LEN) {
- buffer.position(beginOffset);
return RECORD_STATUS.TRUNCATED;
}
datasetId = buffer.getInt();
@@ -193,7 +308,6 @@
PKValueSize = buffer.getInt();
//attempt to read in the PK
if (buffer.remaining() < PKValueSize) {
- buffer.position(beginOffset);
return RECORD_STATUS.TRUNCATED;
}
if (PKValueSize <= 0) {
@@ -201,10 +315,10 @@
}
PKValue = readPKValue(buffer);
}
+
if (logType == LogType.UPDATE) {
//attempt to read in the previous LSN, log size, new value size, and new record type
if (buffer.remaining() < UPDATE_LSN_HEADER + UPDATE_BODY_HEADER) {
- buffer.position(beginOffset);
return RECORD_STATUS.TRUNCATED;
}
prevLSN = buffer.getLong();
@@ -214,34 +328,50 @@
newOp = buffer.get();
newValueSize = buffer.getInt();
if (buffer.remaining() < newValueSize) {
- buffer.position(beginOffset);
return RECORD_STATUS.TRUNCATED;
}
- newValue = readTuple(buffer, readNewValue, fieldCnt, newValueSize);
+ if (!allocateTupleBuffer) {
+ newValue = readTuple(buffer, readNewValue, fieldCnt, newValueSize);
+ } else {
+ ByteBuffer tupleBuffer = ByteBuffer.allocate(newValueSize);
+ tupleBuffer.put(buffer.array(), buffer.position(), newValueSize);
+ tupleBuffer.flip();
+ newValue = readTuple(tupleBuffer, readNewValue, fieldCnt, newValueSize);
+ //skip tuple bytes
+ buffer.position(buffer.position() + newValueSize);
+ }
} else {
computeAndSetLogSize();
}
} else {
computeAndSetLogSize();
if (buffer.remaining() < DatasetId.BYTES) {
- buffer.position(beginOffset);
return RECORD_STATUS.TRUNCATED;
}
datasetId = buffer.getInt();
resourceId = 0l;
- }
- //atempt to read checksum
- if (buffer.remaining() < CHKSUM_LEN) {
- buffer.position(beginOffset);
- return RECORD_STATUS.TRUNCATED;
- }
- checksum = buffer.getLong();
- if (checksum != generateChecksum(buffer, beginOffset, logSize - CHKSUM_LEN)) {
- return RECORD_STATUS.BAD_CHKSUM;
+ computeAndSetLogSize();
}
return RECORD_STATUS.OK;
}
+ @Override
+ public void deserialize(ByteBuffer buffer, boolean remoteRecoveryLog, String localNodeId) {
+ readLogHeader(buffer);
+ if (!remoteRecoveryLog || !nodeId.equals(localNodeId)) {
+ readLogBody(buffer, false);
+ } else {
+ //need to allocate buffer for tuple since the logs will be kept in memory to use during remote recovery
+ //TODO when this is redesigned to spill remote recovery logs to disk, this will not be needed
+ readLogBody(buffer, true);
+ }
+
+ if (logType == LogType.FLUSH) {
+ LSN = buffer.getLong();
+ numOfFlushedIndexes = buffer.getInt();
+ }
+ }
+
private ITupleReference readPKValue(ByteBuffer buffer) {
if (buffer.position() + PKValueSize > buffer.limit()) {
throw new BufferUnderflowException();
@@ -251,7 +381,8 @@
return readPKValue;
}
- private ITupleReference readTuple(ByteBuffer srcBuffer, SimpleTupleReference destTuple, int fieldCnt, int size) {
+ private static ITupleReference readTuple(ByteBuffer srcBuffer, SimpleTupleReference destTuple, int fieldCnt,
+ int size) {
if (srcBuffer.position() + size > srcBuffer.limit()) {
throw new BufferUnderflowException();
}
@@ -264,18 +395,33 @@
@Override
public void formJobTerminateLogRecord(ITransactionContext txnCtx, boolean isCommit) {
this.txnCtx = txnCtx;
+ formJobTerminateLogRecord(txnCtx.getJobId().getId(), isCommit, nodeId);
+ }
+
+ @Override
+ public void formJobTerminateLogRecord(int jobId, boolean isCommit, String nodeId) {
this.logType = isCommit ? LogType.JOB_COMMIT : LogType.ABORT;
- this.jobId = txnCtx.getJobId().getId();
+ this.jobId = jobId;
this.datasetId = -1;
this.PKHashValue = -1;
+ setNodeId(nodeId);
computeAndSetLogSize();
}
- public void formFlushLogRecord(int datasetId, PrimaryIndexOperationTracker opTracker) {
+ public void formFlushLogRecord(int datasetId, PrimaryIndexOperationTracker opTracker, int numOfFlushedIndexes) {
+ formFlushLogRecord(datasetId, opTracker, null, numOfFlushedIndexes);
+ }
+
+ public void formFlushLogRecord(int datasetId, PrimaryIndexOperationTracker opTracker, String nodeId,
+ int numberOfIndexes) {
this.logType = LogType.FLUSH;
this.jobId = -1;
this.datasetId = datasetId;
this.opTracker = opTracker;
+ this.numOfFlushedIndexes = numberOfIndexes;
+ if (nodeId != null) {
+ setNodeId(nodeId);
+ }
computeAndSetLogSize();
}
@@ -326,11 +472,15 @@
default:
throw new IllegalStateException("Unsupported Log Type");
}
+
+ logSize += nodeIdLength;
}
@Override
public String getLogRecordForDisplay() {
StringBuilder builder = new StringBuilder();
+ builder.append(" Source : ").append(LogSource.toString(logSource));
+ builder.append(" NodeID : ").append(nodeId);
builder.append(" LSN : ").append(LSN);
builder.append(" LogType : ").append(LogType.toString(logType));
builder.append(" LogSize : ").append(logSize);
@@ -348,6 +498,20 @@
return builder.toString();
}
+ @Override
+ public int serialize(ByteBuffer buffer) {
+ int bufferBegin = buffer.position();
+ writeLogRecordCommonFields(buffer);
+
+ if (logType == LogType.FLUSH) {
+ buffer.putLong(LSN);
+ buffer.putInt(numOfFlushedIndexes);
+ }
+
+ buffer.putLong(LSN);
+ return buffer.position() - bufferBegin;
+ }
+
////////////////////////////////////////////
// getter and setter methods
////////////////////////////////////////////
@@ -438,6 +602,25 @@
}
@Override
+ public int getSerializedLogSize() {
+ return getSerializedLogSize(logType, logSize);
+ }
+
+ private static int getSerializedLogSize(Byte logType, int logSize) {
+ if (logType == LogType.FLUSH) {
+ //LSN
+ logSize += (Long.SIZE / 8);
+ //num of indexes
+ logSize += (Integer.SIZE / 8);
+ }
+
+ //checksum not included in serialized version
+ logSize -= CHKSUM_LEN;
+
+ return logSize;
+ }
+
+ @Override
public void setLogSize(int logSize) {
this.logSize = logSize;
}
@@ -517,4 +700,52 @@
public PrimaryIndexOperationTracker getOpTracker() {
return opTracker;
}
+
+ @Override
+ public ByteBuffer getSerializedLog() {
+ return serializedLog;
+ }
+
+ public void setSerializedLog(ByteBuffer serializedLog) {
+ this.serializedLog = serializedLog;
+ }
+
+ @Override
+ public String getNodeId() {
+ return nodeId;
+ }
+
+ @Override
+ public void setNodeId(String nodeId) {
+ this.nodeId = nodeId;
+ this.nodeIdLength = nodeId.length();
+ }
+
+ public IReplicationThread getReplicationThread() {
+ return replicationThread;
+ }
+
+ @Override
+ public void setReplicationThread(IReplicationThread replicationThread) {
+ this.replicationThread = replicationThread;
+ }
+
+ @Override
+ public void setLogSource(byte logSource) {
+ this.logSource = logSource;
+ }
+
+ @Override
+ public byte getLogSource() {
+ return logSource;
+ }
+
+ public int getNumOfFlushedIndexes() {
+ return numOfFlushedIndexes;
+ }
+
+ public void setNumOfFlushedIndexes(int numOfFlushedIndexes) {
+ this.numOfFlushedIndexes = numOfFlushedIndexes;
+ }
+
}
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/transactions/LogSource.java b/asterix-common/src/main/java/org/apache/asterix/common/transactions/LogSource.java
new file mode 100644
index 0000000..34e81db
--- /dev/null
+++ b/asterix-common/src/main/java/org/apache/asterix/common/transactions/LogSource.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.transactions;
+
+public class LogSource {
+
+ public static final byte LOCAL = 0;
+ public static final byte REMOTE = 1;
+ public static final byte REMOTE_RECOVERY = 2;
+
+ private static final String STRING_LOCAL = "LOCAL";
+ private static final String STRING_REMOTE = "REMOTE";
+ private static final String STRING_REMOTE_RECOVERY = "REMOTE_RECOVERY";
+
+ private static final String STRING_INVALID_LOG_SOURCE = "INVALID_LOG_SOURCE";
+
+ public static String toString(byte LogSource) {
+ switch (LogSource) {
+ case LOCAL:
+ return STRING_LOCAL;
+ case REMOTE:
+ return STRING_REMOTE;
+ case REMOTE_RECOVERY:
+ return STRING_REMOTE_RECOVERY;
+ default:
+ return STRING_INVALID_LOG_SOURCE;
+ }
+ }
+
+}
\ No newline at end of file
diff --git a/asterix-common/src/main/resources/schema/cluster.xsd b/asterix-common/src/main/resources/schema/cluster.xsd
index d9b62c2..d3203d5 100644
--- a/asterix-common/src/main/resources/schema/cluster.xsd
+++ b/asterix-common/src/main/resources/schema/cluster.xsd
@@ -44,7 +44,11 @@
<xs:element name="http_port" type="xs:integer" />
<xs:element name="debug_port" type="xs:integer" />
<xs:element name="metadata_node" type="xs:string" />
-
+ <xs:element name="enabled" type="xs:boolean" />
+ <xs:element name="replication_port" type="xs:integer" />
+ <xs:element name="replication_factor" type="xs:integer" />
+ <xs:element name="replication_store" type="xs:string" />
+ <xs:element name="replication_time_out" type="xs:integer" />
<!-- definition of complex elements -->
<xs:element name="working_dir">
@@ -72,6 +76,18 @@
</xs:complexType>
</xs:element>
+ <xs:element name="data_replication">
+ <xs:complexType>
+ <xs:sequence>
+ <xs:element ref="cl:enabled" />
+ <xs:element ref="cl:replication_port" />
+ <xs:element ref="cl:replication_factor" />
+ <xs:element ref="cl:replication_store" />
+ <xs:element ref="cl:replication_time_out" />
+ </xs:sequence>
+ </xs:complexType>
+ </xs:element>
+
<xs:element name="property">
<xs:complexType>
<xs:sequence>
@@ -126,6 +142,7 @@
<xs:element ref="cl:iodevices" minOccurs="0" />
<xs:element ref="cl:working_dir" />
<xs:element ref="cl:metadata_node" />
+ <xs:element ref="cl:data_replication" minOccurs="0" />
<xs:element ref="cl:master_node" />
<xs:element ref="cl:node" maxOccurs="unbounded" />
<xs:element ref="cl:substitute_nodes" />