Allow Replication to be Enabled on Virtual Cluster
- Allow replication port assignment per NC.
- Allow replication to be enabled on virtual cluster.
- Wait for JOB_ABORT ACK from remote replicas.
- Fix LSM component mask file name.
- Fix index directory deletion on index drop.
- Eliminate multiple partition takeover requests.
- Free LogFlusher thread from sending replication ACKs.
- Fix possible deadlock between LogFlusher and Logs Replication Thread.
- Remove wait for FLUSH_LOG for replicated LSM components:
This wait is not needed since on node failure, complete remote recovery is done.
Change-Id: I34a38f59c4915a19242adb6a4eaa6ee1c82d2372
Reviewed-on: https://asterix-gerrit.ics.uci.edu/743
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Ian Maxon <imaxon@apache.org>
diff --git a/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java b/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java
index a568464..c67eb70 100644
--- a/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java
+++ b/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java
@@ -19,6 +19,7 @@
package org.apache.asterix.api.common;
import java.io.File;
+import java.util.Arrays;
import java.util.EnumSet;
import java.util.List;
@@ -83,6 +84,7 @@
ncConfig1.nodeId = ncName;
ncConfig1.resultTTL = 30000;
ncConfig1.resultSweepThreshold = 1000;
+ ncConfig1.appArgs = Arrays.asList("-virtual-NC");
String tempPath = System.getProperty(IO_DIR_KEY);
if (tempPath.endsWith(File.separator)) {
tempPath = tempPath.substring(0, tempPath.length() - 1);
@@ -109,8 +111,19 @@
}
}
ncConfig1.appNCMainClass = NCApplicationEntryPoint.class.getName();
- ncs[n] = new NodeControllerService(ncConfig1);
- ncs[n].start();
+ NodeControllerService nodeControllerService = new NodeControllerService(ncConfig1);
+ ncs[n] = nodeControllerService;
+ Thread ncStartThread = new Thread() {
+ @Override
+ public void run() {
+ try {
+ nodeControllerService.start();
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ };
+ ncStartThread.start();
++n;
}
hcc = new HyracksConnection(cc.getConfig().clientNetIpAddress, cc.getConfig().clientNetPort);
diff --git a/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java b/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java
index 4922ae6..643bb16 100644
--- a/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java
+++ b/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java
@@ -64,6 +64,9 @@
@Option(name = "-initial-run", usage = "A flag indicating if it's the first time the NC is started (default: false)", required = false)
public boolean initialRun = false;
+ @Option(name = "-virtual-NC", usage = "A flag indicating if this NC is running on virtual cluster (default: false)", required = false)
+ public boolean virtualNC = false;
+
private INCApplicationContext ncApplicationContext = null;
private IAsterixAppRuntimeContext runtimeContext;
private String nodeId;
@@ -88,7 +91,6 @@
ncAppCtx.setThreadFactory(new AsterixThreadFactory(ncAppCtx.getLifeCycleComponentManager()));
ncApplicationContext = ncAppCtx;
-
nodeId = ncApplicationContext.getNodeId();
if (LOGGER.isLoggable(Level.INFO)) {
LOGGER.info("Starting Asterix node controller: " + nodeId);
@@ -120,7 +122,8 @@
LOGGER.info("System is in a state: " + systemState);
}
- if (replicationEnabled) {
+ //do not attempt to perform remote recovery if this is a virtual NC
+ if (replicationEnabled && !virtualNC) {
if (systemState == SystemState.NEW_UNIVERSE || systemState == SystemState.CORRUPTED) {
//Try to perform remote recovery
IRemoteRecoveryManager remoteRecoveryMgr = runtimeContext.getRemoteRecoveryManager();
diff --git a/asterix-app/src/main/resources/asterix-build-configuration.xml b/asterix-app/src/main/resources/asterix-build-configuration.xml
index 731113b..ff03ab6 100644
--- a/asterix-app/src/main/resources/asterix-build-configuration.xml
+++ b/asterix-app/src/main/resources/asterix-build-configuration.xml
@@ -98,4 +98,10 @@
<description>Enabling plot of Algebricks plan to tmp folder. (Default = false)
</description>
</property>
+ <property>
+ <name>log.level</name>
+ <value>WARNING</value>
+ <description>The minimum log level to be displayed. (Default = INFO)
+ </description>
+ </property>
</asterixConfiguration>
diff --git a/asterix-app/src/main/resources/cluster.xml b/asterix-app/src/main/resources/cluster.xml
new file mode 100644
index 0000000..8f0b694
--- /dev/null
+++ b/asterix-app/src/main/resources/cluster.xml
@@ -0,0 +1,49 @@
+<!--
+ ! Licensed to the Apache Software Foundation (ASF) under one
+ ! or more contributor license agreements. See the NOTICE file
+ ! distributed with this work for additional information
+ ! regarding copyright ownership. The ASF licenses this file
+ ! to you under the Apache License, Version 2.0 (the
+ ! "License"); you may not use this file except in compliance
+ ! with the License. You may obtain a copy of the License at
+ !
+ ! http://www.apache.org/licenses/LICENSE-2.0
+ !
+ ! Unless required by applicable law or agreed to in writing,
+ ! software distributed under the License is distributed on an
+ ! "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ ! KIND, either express or implied. See the License for the
+ ! specific language governing permissions and limitations
+ ! under the License.
+ !-->
+<cluster xmlns="cluster">
+ <instance_name>asterix</instance_name>
+ <store>storage</store>
+
+ <data_replication>
+ <enabled>false</enabled>
+ <replication_port>2016</replication_port>
+ <replication_factor>2</replication_factor>
+ <auto_failover>false</auto_failover>
+ <replication_time_out>30</replication_time_out>
+ </data_replication>
+
+ <master_node>
+ <id>master</id>
+ <client_ip>127.0.0.1</client_ip>
+ <cluster_ip>127.0.0.1</cluster_ip>
+ <client_port>1098</client_port>
+ <cluster_port>1099</cluster_port>
+ <http_port>8888</http_port>
+ </master_node>
+ <node>
+ <id>nc1</id>
+ <cluster_ip>127.0.0.1</cluster_ip>
+ <replication_port>2016</replication_port>
+ </node>
+ <node>
+ <id>nc2</id>
+ <cluster_ip>127.0.0.1</cluster_ip>
+ <replication_port>2017</replication_port>
+ </node>
+</cluster>
\ No newline at end of file
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixReplicationProperties.java b/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixReplicationProperties.java
index cb7bcab..5d31d9a 100644
--- a/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixReplicationProperties.java
+++ b/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixReplicationProperties.java
@@ -77,10 +77,15 @@
}
public int getDataReplicationPort(String nodeId) {
- if (cluster != null) {
- return cluster.getDataReplication().getReplicationPort().intValue();
+ if (cluster != null && cluster.getDataReplication() != null) {
+ for (int i = 0; i < cluster.getNode().size(); i++) {
+ Node node = cluster.getNode().get(i);
+ if (getRealCluserNodeID(node.getId()).equals(nodeId)) {
+ return node.getReplicationPort() != null ? node.getReplicationPort().intValue()
+ : cluster.getDataReplication().getReplicationPort().intValue();
+ }
+ }
}
-
return REPLICATION_DATAPORT_DEFAULT;
}
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/replication/IReplicationThread.java b/asterix-common/src/main/java/org/apache/asterix/common/replication/IReplicationThread.java
index 9226a66..a88b82a 100644
--- a/asterix-common/src/main/java/org/apache/asterix/common/replication/IReplicationThread.java
+++ b/asterix-common/src/main/java/org/apache/asterix/common/replication/IReplicationThread.java
@@ -18,6 +18,8 @@
*/
package org.apache.asterix.common.replication;
+import java.nio.channels.SocketChannel;
+
import org.apache.asterix.common.transactions.LogRecord;
public interface IReplicationThread extends Runnable {
@@ -29,4 +31,9 @@
* The log that has been flushed.
*/
public void notifyLogReplicationRequester(LogRecord logRecord);
+
+ /**
+ * @return The replication client socket channel.
+ */
+ public SocketChannel getReplicationClientSocket();
}
diff --git a/asterix-common/src/main/resources/schema/cluster.xsd b/asterix-common/src/main/resources/schema/cluster.xsd
index 935d33f..ae09b16 100644
--- a/asterix-common/src/main/resources/schema/cluster.xsd
+++ b/asterix-common/src/main/resources/schema/cluster.xsd
@@ -123,6 +123,7 @@
<xs:element ref="cl:txn_log_dir" minOccurs="0" />
<xs:element ref="cl:iodevices" minOccurs="0" />
<xs:element ref="cl:debug_port" minOccurs="0" />
+ <xs:element ref="cl:replication_port" minOccurs="0" />
</xs:sequence>
</xs:complexType>
</xs:element>
diff --git a/asterix-events/src/main/java/org/apache/asterix/event/driver/EventDriver.java b/asterix-events/src/main/java/org/apache/asterix/event/driver/EventDriver.java
index 29765fd..c92262c 100644
--- a/asterix-events/src/main/java/org/apache/asterix/event/driver/EventDriver.java
+++ b/asterix-events/src/main/java/org/apache/asterix/event/driver/EventDriver.java
@@ -41,7 +41,7 @@
public class EventDriver {
public static final String CLIENT_NODE_ID = "client_node";
- public static final Node CLIENT_NODE = new Node(CLIENT_NODE_ID, "127.0.0.1", null, null, null, null, null);
+ public static final Node CLIENT_NODE = new Node(CLIENT_NODE_ID, "127.0.0.1", null, null, null, null, null, null);
private static String eventsDir;
private static Events events;
diff --git a/asterix-events/src/main/java/org/apache/asterix/event/management/EventUtil.java b/asterix-events/src/main/java/org/apache/asterix/event/management/EventUtil.java
index b83faa2..57648b8 100644
--- a/asterix-events/src/main/java/org/apache/asterix/event/management/EventUtil.java
+++ b/asterix-events/src/main/java/org/apache/asterix/event/management/EventUtil.java
@@ -20,6 +20,7 @@
import java.io.File;
import java.io.IOException;
+import java.math.BigInteger;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
@@ -191,7 +192,7 @@
String javaHome = cluster.getMasterNode().getJavaHome() == null ? cluster.getJavaHome() : cluster
.getMasterNode().getJavaHome();
return new Node(cluster.getMasterNode().getId(), cluster.getMasterNode().getClusterIp(), javaHome, logDir,
- null, null, cluster.getMasterNode().getDebugPort());
+ null, null, cluster.getMasterNode().getDebugPort(), null);
}
List<Node> nodeList = cluster.getNode();
diff --git a/asterix-installer/src/main/java/org/apache/asterix/installer/command/ValidateCommand.java b/asterix-installer/src/main/java/org/apache/asterix/installer/command/ValidateCommand.java
index 9559394..4037eaf 100644
--- a/asterix-installer/src/main/java/org/apache/asterix/installer/command/ValidateCommand.java
+++ b/asterix-installer/src/main/java/org/apache/asterix/installer/command/ValidateCommand.java
@@ -105,7 +105,7 @@
MasterNode masterNode = cluster.getMasterNode();
Node master = new Node(masterNode.getId(), masterNode.getClusterIp(), masterNode.getJavaHome(),
- masterNode.getLogDir(), null, null, null);
+ masterNode.getLogDir(), null, null, null, null);
ipAddresses.add(masterNode.getClusterIp());
valid = valid & validateNodeConfiguration(master, cluster);
diff --git a/asterix-metadata/src/main/java/org/apache/asterix/metadata/cluster/ClusterManager.java b/asterix-metadata/src/main/java/org/apache/asterix/metadata/cluster/ClusterManager.java
index 3e37694..e7bf3bf 100644
--- a/asterix-metadata/src/main/java/org/apache/asterix/metadata/cluster/ClusterManager.java
+++ b/asterix-metadata/src/main/java/org/apache/asterix/metadata/cluster/ClusterManager.java
@@ -62,9 +62,10 @@
private ClusterManager() {
Cluster asterixCluster = AsterixClusterProperties.INSTANCE.getCluster();
- String eventHome = asterixCluster == null ? null : asterixCluster.getWorkingDir().getDir();
+ String eventHome = asterixCluster == null ? null
+ : asterixCluster.getWorkingDir() == null ? null : asterixCluster.getWorkingDir().getDir();
- if (asterixCluster != null) {
+ if (eventHome != null) {
String asterixDir = System.getProperty("user.dir") + File.separator + "asterix";
File configFile = new File(System.getProperty("user.dir") + File.separator + "configuration.xml");
Configuration configuration = null;
@@ -74,8 +75,8 @@
Unmarshaller unmarshaller = configCtx.createUnmarshaller();
configuration = (Configuration) unmarshaller.unmarshal(configFile);
AsterixEventService.initialize(configuration, asterixDir, eventHome);
- client = AsterixEventService.getAsterixEventServiceClient(AsterixClusterProperties.INSTANCE
- .getCluster());
+ client = AsterixEventService
+ .getAsterixEventServiceClient(AsterixClusterProperties.INSTANCE.getCluster());
lookupService = ServiceProvider.INSTANCE.getLookupService();
if (!lookupService.isRunning(configuration)) {
diff --git a/asterix-om/src/main/java/org/apache/asterix/om/util/AsterixClusterProperties.java b/asterix-om/src/main/java/org/apache/asterix/om/util/AsterixClusterProperties.java
index 3173525..abfcf67 100644
--- a/asterix-om/src/main/java/org/apache/asterix/om/util/AsterixClusterProperties.java
+++ b/asterix-om/src/main/java/org/apache/asterix/om/util/AsterixClusterProperties.java
@@ -371,6 +371,7 @@
partitionRecoveryPlan.get(replica).add(partition.getPartitionId());
}
}
+ break;
}
}
diff --git a/asterix-replication/src/main/java/org/apache/asterix/replication/functions/ReplicationProtocol.java b/asterix-replication/src/main/java/org/apache/asterix/replication/functions/ReplicationProtocol.java
index d2380c1..1ff6cc4 100644
--- a/asterix-replication/src/main/java/org/apache/asterix/replication/functions/ReplicationProtocol.java
+++ b/asterix-replication/src/main/java/org/apache/asterix/replication/functions/ReplicationProtocol.java
@@ -37,7 +37,7 @@
/**
* All replication messages start with ReplicationFunctions (4 bytes), then the length of the request in bytes
*/
- public static final String JOB_COMMIT_ACK = "$";
+ public static final String JOB_REPLICATION_ACK = "$";
public final static int REPLICATION_REQUEST_TYPE_SIZE = Integer.BYTES;
public final static int REPLICATION_REQUEST_HEADER_SIZE = REPLICATION_REQUEST_TYPE_SIZE + Integer.BYTES;
@@ -297,11 +297,11 @@
}
public static int getJobIdFromLogAckMessage(String msg) {
- return Integer.parseInt(msg.substring((msg.indexOf(JOB_COMMIT_ACK) + 1)));
+ return Integer.parseInt(msg.substring((msg.indexOf(JOB_REPLICATION_ACK) + 1)));
}
public static String getNodeIdFromLogAckMessage(String msg) {
- return msg.substring(0, msg.indexOf(JOB_COMMIT_ACK));
+ return msg.substring(0, msg.indexOf(JOB_REPLICATION_ACK));
}
/**
diff --git a/asterix-replication/src/main/java/org/apache/asterix/replication/logging/RemoteLogMapping.java b/asterix-replication/src/main/java/org/apache/asterix/replication/logging/RemoteLogMapping.java
index f61fbc6..3b2aff7 100644
--- a/asterix-replication/src/main/java/org/apache/asterix/replication/logging/RemoteLogMapping.java
+++ b/asterix-replication/src/main/java/org/apache/asterix/replication/logging/RemoteLogMapping.java
@@ -24,18 +24,9 @@
private String remoteNodeID;
private long remoteLSN;
- private boolean isFlushed = false;
private long localLSN;
public AtomicInteger numOfFlushedIndexes = new AtomicInteger();
- public boolean isFlushed() {
- return isFlushed;
- }
-
- public void setFlushed(boolean isFlushed) {
- this.isFlushed = isFlushed;
- }
-
public String getRemoteNodeID() {
return remoteNodeID;
}
@@ -66,7 +57,10 @@
sb.append("Remote Node: " + remoteNodeID);
sb.append(" Remote LSN: " + remoteLSN);
sb.append(" Local LSN: " + localLSN);
- sb.append(" isFlushed : " + isFlushed);
return sb.toString();
}
+
+ public String getNodeUniqueLSN() {
+ return TxnLogUtil.getNodeUniqueLSN(remoteNodeID, remoteLSN);
+ }
}
diff --git a/asterix-replication/src/main/java/org/apache/asterix/replication/logging/TxnLogUtil.java b/asterix-replication/src/main/java/org/apache/asterix/replication/logging/TxnLogUtil.java
new file mode 100644
index 0000000..f51a64d
--- /dev/null
+++ b/asterix-replication/src/main/java/org/apache/asterix/replication/logging/TxnLogUtil.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.replication.logging;
+
+public class TxnLogUtil {
+
+ private TxnLogUtil() {
+ //prevent util class construction
+ }
+
+ /**
+ * @param nodeId
+ * @param LSN
+ * @return Concatenation of nodeId and LSN
+ */
+ public static String getNodeUniqueLSN(String nodeId, long LSN) {
+ return nodeId + LSN;
+ }
+}
diff --git a/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationChannel.java b/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationChannel.java
index b9447af..331116b 100644
--- a/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationChannel.java
+++ b/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationChannel.java
@@ -95,10 +95,13 @@
private final IAsterixAppRuntimeContextProvider asterixAppRuntimeContextProvider;
private final static int INTIAL_BUFFER_SIZE = 4000; //4KB
private final LinkedBlockingQueue<LSMComponentLSNSyncTask> lsmComponentRemoteLSN2LocalLSNMappingTaskQ;
+ private final LinkedBlockingQueue<LogRecord> pendingNotificationRemoteLogsQ;
private final Map<String, LSMComponentProperties> lsmComponentId2PropertiesMap;
- private final Map<Long, RemoteLogMapping> localLSN2RemoteLSNMap;
+ private final Map<String, RemoteLogMapping> replicaUniqueLSN2RemoteMapping;
private final LSMComponentsSyncService lsmComponentLSNMappingService;
private final Set<Integer> nodeHostedPartitions;
+ private final ReplicationNotifier replicationNotifier;
+ private final Object flushLogslock = new Object();
public ReplicationChannel(String nodeId, AsterixReplicationProperties replicationProperties, ILogManager logManager,
IReplicaResourcesManager replicaResoucesManager, IReplicationManager replicationManager,
@@ -110,9 +113,11 @@
this.replicationProperties = replicationProperties;
this.asterixAppRuntimeContextProvider = asterixAppRuntimeContextProvider;
lsmComponentRemoteLSN2LocalLSNMappingTaskQ = new LinkedBlockingQueue<LSMComponentLSNSyncTask>();
+ pendingNotificationRemoteLogsQ = new LinkedBlockingQueue<>();
lsmComponentId2PropertiesMap = new ConcurrentHashMap<String, LSMComponentProperties>();
- localLSN2RemoteLSNMap = new ConcurrentHashMap<Long, RemoteLogMapping>();
+ replicaUniqueLSN2RemoteMapping = new ConcurrentHashMap<>();
lsmComponentLSNMappingService = new LSMComponentsSyncService();
+ replicationNotifier = new ReplicationNotifier();
replicationThreads = Executors.newCachedThreadPool(appContext.getThreadFactory());
Map<String, ClusterPartition[]> nodePartitions = ((IAsterixPropertiesProvider) asterixAppRuntimeContextProvider
.getAppContext()).getMetadataProperties().getNodePartitions();
@@ -140,7 +145,7 @@
dataPort);
serverSocketChannel.socket().bind(replicationChannelAddress);
lsmComponentLSNMappingService.start();
-
+ replicationNotifier.start();
LOGGER.log(Level.INFO, "opened Replication Channel @ IP Address: " + nodeIP + ":" + dataPort);
//start accepting replication requests
@@ -152,7 +157,7 @@
}
} catch (IOException e) {
throw new IllegalStateException(
- "Could not opened replication channel @ IP Address: " + nodeIP + ":" + dataPort, e);
+ "Could not open replication channel @ IP Address: " + nodeIP + ":" + dataPort, e);
}
}
@@ -164,13 +169,13 @@
if (remainingFile == 0) {
if (lsmCompProp.getOpType() == LSMOperationType.FLUSH && lsmCompProp.getReplicaLSN() != null) {
//if this LSN wont be used for any other index, remove it
- if (localLSN2RemoteLSNMap.containsKey(lsmCompProp.getReplicaLSN())) {
- int remainingIndexes = localLSN2RemoteLSNMap.get(lsmCompProp.getReplicaLSN()).numOfFlushedIndexes
- .decrementAndGet();
+ if (replicaUniqueLSN2RemoteMapping.containsKey(lsmCompProp.getNodeUniqueLSN())) {
+ int remainingIndexes = replicaUniqueLSN2RemoteMapping
+ .get(lsmCompProp.getNodeUniqueLSN()).numOfFlushedIndexes.decrementAndGet();
if (remainingIndexes == 0) {
//Note: there is a chance that this is never deleted because some index in the dataset was not flushed because it is empty.
//This could be solved by passing only the number of successfully flushed indexes
- localLSN2RemoteLSNMap.remove(lsmCompProp.getReplicaLSN());
+ replicaUniqueLSN2RemoteMapping.remove(lsmCompProp.getNodeUniqueLSN());
}
}
}
@@ -182,22 +187,6 @@
}
}
- /**
- * @param replicaId
- * the remote replica id this log belongs to.
- * @param remoteLSN
- * the remote LSN received from the remote replica.
- * @return The local log mapping if found. Otherwise null.
- */
- private RemoteLogMapping getRemoteLogMapping(String replicaId, long remoteLSN) {
- for (RemoteLogMapping mapping : localLSN2RemoteLSNMap.values()) {
- if (mapping.getRemoteLSN() == remoteLSN && mapping.getRemoteNodeID().equals(replicaId)) {
- return mapping;
- }
- }
- return null;
- }
-
@Override
public void close() throws IOException {
if (!serverSocketChannel.isOpen()) {
@@ -538,56 +527,65 @@
}
break;
case LogType.JOB_COMMIT:
- LogRecord jobCommitLog = new LogRecord();
- TransactionUtil.formJobTerminateLogRecord(jobCommitLog, remoteLog.getJobId(), true);
- jobCommitLog.setReplicationThread(this);
- jobCommitLog.setLogSource(LogSource.REMOTE);
- logManager.log(jobCommitLog);
+ case LogType.ABORT:
+ LogRecord jobTerminationLog = new LogRecord();
+ TransactionUtil.formJobTerminateLogRecord(jobTerminationLog, remoteLog.getJobId(),
+ remoteLog.getLogType() == LogType.JOB_COMMIT);
+ jobTerminationLog.setReplicationThread(this);
+ jobTerminationLog.setLogSource(LogSource.REMOTE);
+ logManager.log(jobTerminationLog);
break;
case LogType.FLUSH:
- LogRecord flushLog = new LogRecord();
- TransactionUtil.formFlushLogRecord(flushLog, remoteLog.getDatasetId(), null, remoteLog.getNodeId(),
- remoteLog.getNumOfFlushedIndexes());
- flushLog.setReplicationThread(this);
- flushLog.setLogSource(LogSource.REMOTE);
- synchronized (localLSN2RemoteLSNMap) {
- logManager.log(flushLog);
- //store mapping information for flush logs to use them in incoming LSM components.
- RemoteLogMapping flushLogMap = new RemoteLogMapping();
- flushLogMap.setRemoteLSN(remoteLog.getLSN());
- flushLogMap.setRemoteNodeID(remoteLog.getNodeId());
- flushLogMap.setLocalLSN(flushLog.getLSN());
- flushLogMap.numOfFlushedIndexes.set(remoteLog.getNumOfFlushedIndexes());
- localLSN2RemoteLSNMap.put(flushLog.getLSN(), flushLogMap);
- localLSN2RemoteLSNMap.notifyAll();
+ //store mapping information for flush logs to use them in incoming LSM components.
+ RemoteLogMapping flushLogMap = new RemoteLogMapping();
+ flushLogMap.setRemoteNodeID(remoteLog.getNodeId());
+ flushLogMap.setRemoteLSN(remoteLog.getLSN());
+ logManager.log(remoteLog);
+ //the log LSN value is updated by logManager.log(.) to a local value
+ flushLogMap.setLocalLSN(remoteLog.getLSN());
+ flushLogMap.numOfFlushedIndexes.set(remoteLog.getNumOfFlushedIndexes());
+ replicaUniqueLSN2RemoteMapping.put(flushLogMap.getNodeUniqueLSN(), flushLogMap);
+ synchronized (flushLogslock) {
+ flushLogslock.notify();
}
break;
default:
- throw new ACIDException("Unsupported LogType: " + remoteLog.getLogType());
+ LOGGER.severe("Unsupported LogType: " + remoteLog.getLogType());
}
}
- //this method is called sequentially by LogPage (notifyReplicationTerminator) for JOB_COMMIT and FLUSH log types
+ //this method is called sequentially by LogPage (notifyReplicationTerminator) for JOB_COMMIT and JOB_ABORT log types.
@Override
public void notifyLogReplicationRequester(LogRecord logRecord) {
- //Note: this could be optimized by moving this to a different thread and freeing the LogPage thread faster
- if (logRecord.getLogType() == LogType.JOB_COMMIT) {
- //send ACK to requester
+ pendingNotificationRemoteLogsQ.offer(logRecord);
+ }
+
+ @Override
+ public SocketChannel getReplicationClientSocket() {
+ return socketChannel;
+ }
+ }
+
+ /**
+ * This thread is responsible for sending JOB_COMMIT/ABORT ACKs to replication clients.
+ */
+ private class ReplicationNotifier extends Thread {
+ @Override
+ public void run() {
+ Thread.currentThread().setName("ReplicationNotifier Thread");
+ while (true) {
try {
- socketChannel.socket().getOutputStream()
- .write((localNodeID + ReplicationProtocol.JOB_COMMIT_ACK + logRecord.getJobId() + "\n")
- .getBytes());
- socketChannel.socket().getOutputStream().flush();
- } catch (IOException e) {
- e.printStackTrace();
- }
- } else if (logRecord.getLogType() == LogType.FLUSH) {
- synchronized (localLSN2RemoteLSNMap) {
- RemoteLogMapping remoteLogMap = localLSN2RemoteLSNMap.get(logRecord.getLSN());
- synchronized (remoteLogMap) {
- remoteLogMap.setFlushed(true);
- remoteLogMap.notifyAll();
+ LogRecord logRecord = pendingNotificationRemoteLogsQ.take();
+ //send ACK to requester
+ try {
+ logRecord.getReplicationThread().getReplicationClientSocket().socket().getOutputStream()
+ .write((localNodeID + ReplicationProtocol.JOB_REPLICATION_ACK + logRecord.getJobId()
+ + System.lineSeparator()).getBytes());
+ } catch (IOException e) {
+ LOGGER.severe("Failed to send job replication ACK " + logRecord.getLogRecordForDisplay());
}
+ } catch (InterruptedException e1) {
+ LOGGER.severe("ReplicationNotifier Thread interrupted.");
}
}
}
@@ -629,26 +627,23 @@
return;
}
+ //path to the LSM component file
+ Path path = Paths.get(syncTask.getComponentFilePath());
if (lsmCompProp.getReplicaLSN() == null) {
if (lsmCompProp.getOpType() == LSMOperationType.FLUSH) {
//need to look up LSN mapping from memory
- RemoteLogMapping remoteLogMap = getRemoteLogMapping(lsmCompProp.getNodeId(), remoteLSN);
-
- //wait until flush log arrives
- while (remoteLogMap == null) {
- synchronized (localLSN2RemoteLSNMap) {
- localLSN2RemoteLSNMap.wait();
- }
- remoteLogMap = getRemoteLogMapping(lsmCompProp.getNodeId(), remoteLSN);
- }
-
- //wait until the log is flushed locally before updating the disk component LSN
- synchronized (remoteLogMap) {
- while (!remoteLogMap.isFlushed()) {
- remoteLogMap.wait();
+ RemoteLogMapping remoteLogMap = replicaUniqueLSN2RemoteMapping.get(lsmCompProp.getNodeUniqueLSN());
+ if (remoteLogMap == null) {
+ synchronized (flushLogslock) {
+ remoteLogMap = replicaUniqueLSN2RemoteMapping.get(lsmCompProp.getNodeUniqueLSN());
+ //wait until flush log arrives, and verify the LSM component file still exists
+ //The component file could be deleted if its NC fails.
+ while (remoteLogMap == null && Files.exists(path)) {
+ flushLogslock.wait();
+ remoteLogMap = replicaUniqueLSN2RemoteMapping.get(lsmCompProp.getNodeUniqueLSN());
+ }
}
}
-
lsmCompProp.setReplicaLSN(remoteLogMap.getLocalLSN());
} else if (lsmCompProp.getOpType() == LSMOperationType.MERGE) {
//need to load the LSN mapping from disk
@@ -665,13 +660,11 @@
*
*/
mappingLSN = logManager.getAppendLSN();
- } else {
- lsmCompProp.setReplicaLSN(mappingLSN);
}
+ lsmCompProp.setReplicaLSN(mappingLSN);
}
}
- Path path = Paths.get(syncTask.getComponentFilePath());
if (Files.notExists(path)) {
/*
* This could happen when a merged component arrives and deletes the flushed
diff --git a/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationManager.java b/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationManager.java
index 7243629..93d1085 100644
--- a/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationManager.java
+++ b/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationManager.java
@@ -213,7 +213,7 @@
@Override
public void replicateLog(ILogRecord logRecord) {
- if (logRecord.getLogType() == LogType.JOB_COMMIT) {
+ if (logRecord.getLogType() == LogType.JOB_COMMIT || logRecord.getLogType() == LogType.ABORT) {
//if replication is suspended, wait until it is resumed.
while (replicationSuspended.get()) {
synchronized (replicationSuspended) {
@@ -734,11 +734,7 @@
return true;
} else {
- if (!replicationJobsPendingAcks.containsKey(logRecord.getJobId())) {
- synchronized (replicationJobsPendingAcks) {
- replicationJobsPendingAcks.put(logRecord.getJobId(), logRecord);
- }
- }
+ replicationJobsPendingAcks.putIfAbsent(logRecord.getJobId(), logRecord);
return false;
}
}
diff --git a/asterix-replication/src/main/java/org/apache/asterix/replication/storage/LSMComponentProperties.java b/asterix-replication/src/main/java/org/apache/asterix/replication/storage/LSMComponentProperties.java
index 841a99f..9749c7a 100644
--- a/asterix-replication/src/main/java/org/apache/asterix/replication/storage/LSMComponentProperties.java
+++ b/asterix-replication/src/main/java/org/apache/asterix/replication/storage/LSMComponentProperties.java
@@ -26,6 +26,7 @@
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.asterix.common.ioopcallbacks.AbstractLSMIOOperationCallback;
+import org.apache.asterix.replication.logging.TxnLogUtil;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexOperationContext;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexReplicationJob;
@@ -96,6 +97,8 @@
public String getMaskPath(ReplicaResourcesManager resourceManager) {
if (maskPath == null) {
LSMIndexFileProperties afp = new LSMIndexFileProperties(this);
+ //split the index file path to get the LSM component file name
+ afp.splitFileName();
maskPath = getReplicaComponentPath(resourceManager) + File.separator + afp.getFileName()
+ ReplicaResourcesManager.LSM_COMPONENT_MASK_SUFFIX;
}
@@ -147,10 +150,6 @@
return nodeId;
}
- public void setNodeId(String nodeId) {
- this.nodeId = nodeId;
- }
-
public int getNumberOfFiles() {
return numberOfFiles.get();
}
@@ -178,4 +177,8 @@
public void setOpType(LSMOperationType opType) {
this.opType = opType;
}
+
+ public String getNodeUniqueLSN() {
+ return TxnLogUtil.getNodeUniqueLSN(nodeId, originalLSN);
+ }
}
diff --git a/asterix-replication/src/main/java/org/apache/asterix/replication/storage/ReplicaResourcesManager.java b/asterix-replication/src/main/java/org/apache/asterix/replication/storage/ReplicaResourcesManager.java
index 413fd7ac..41fc0b8 100644
--- a/asterix-replication/src/main/java/org/apache/asterix/replication/storage/ReplicaResourcesManager.java
+++ b/asterix-replication/src/main/java/org/apache/asterix/replication/storage/ReplicaResourcesManager.java
@@ -67,14 +67,12 @@
String indexPath = getIndexPath(afp);
if (indexPath != null) {
if (afp.isLSMComponentFile()) {
- String backupFilePath = indexPath + File.separator + afp.getFileName();
-
- //delete file
- File destFile = new File(backupFilePath);
+ //delete index file
+ String indexFilePath = indexPath + File.separator + afp.getFileName();
+ File destFile = new File(indexFilePath);
FileUtils.deleteQuietly(destFile);
} else {
- //delete index files
- indexPath = indexPath.substring(0, indexPath.lastIndexOf(File.separator));
+ //delete index directory
FileUtils.deleteQuietly(new File(indexPath));
}
}
diff --git a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogBuffer.java b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogBuffer.java
index 6060dd7..502e9c7 100644
--- a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogBuffer.java
+++ b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogBuffer.java
@@ -134,8 +134,7 @@
flushQ.offer(logRecord);
}
} else if (logRecord.getLogSource() == LogSource.REMOTE) {
-
- if (logRecord.getLogType() == LogType.JOB_COMMIT || logRecord.getLogType() == LogType.FLUSH) {
+ if (logRecord.getLogType() == LogType.JOB_COMMIT || logRecord.getLogType() == LogType.ABORT) {
remoteJobsQ.offer(logRecord);
}
}
@@ -276,7 +275,7 @@
notifyFlushTerminator();
}
} else if (logRecord.getLogSource() == LogSource.REMOTE) {
- if (logRecord.getLogType() == LogType.JOB_COMMIT || logRecord.getLogType() == LogType.FLUSH) {
+ if (logRecord.getLogType() == LogType.JOB_COMMIT || logRecord.getLogType() == LogType.ABORT) {
notifyReplicationTerminator();
}
}
diff --git a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManagerWithReplication.java b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManagerWithReplication.java
index 8363ff1..efd66a8 100644
--- a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManagerWithReplication.java
+++ b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManagerWithReplication.java
@@ -72,13 +72,8 @@
//ignore
}
}
- }
- }
- //wait for job ACK from replicas
- //TODO should JOB_ABORT be added as well?
- if ((logRecord.getLogType() == LogType.JOB_COMMIT) && !replicationManager.hasBeenReplicated(logRecord)) {
- synchronized (logRecord) {
+ //wait for job Commit/Abort ACK from replicas
while (!replicationManager.hasBeenReplicated(logRecord)) {
try {
logRecord.wait();