Exclude Temporary Resources From Replication
- Exclude temporary resources from replication.
- Remove flush logs from temporary datasets.
- Ignore takeover partitions request if NC is shutting down.
- Stop NCs on different threads to allow replica shutting down
notification to be sent when replication is enabled.
Change-Id: I9a52557bf1f3e7632dd826384280abdaa186f672
Reviewed-on: https://asterix-gerrit.ics.uci.edu/778
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: abdullah alamoudi <bamousaa@gmail.com>
diff --git a/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java b/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java
index c67eb70..cc50b75 100644
--- a/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java
+++ b/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java
@@ -19,6 +19,7 @@
package org.apache.asterix.api.common;
import java.io.File;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.EnumSet;
import java.util.List;
@@ -138,11 +139,31 @@
}
public static void deinit(boolean deleteOldInstanceData) throws Exception {
+ //stop NCs
+ ArrayList<Thread> stopNCThreads = new ArrayList<>();
for (int n = 0; n < ncs.length; ++n) {
- if (ncs[n] != null)
- ncs[n].stop();
-
+ NodeControllerService nodeControllerService = ncs[n];
+ if (nodeControllerService != null) {
+ Thread ncStopThread = new Thread() {
+ @Override
+ public void run() {
+ try {
+ nodeControllerService.stop();
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ };
+ stopNCThreads.add(ncStopThread);
+ ncStopThread.start();
+ }
}
+
+ //make sure all NCs stopped
+ for (Thread stopNcTheard : stopNCThreads) {
+ stopNcTheard.join();
+ }
+
if (cc != null) {
cc.stop();
}
diff --git a/asterix-app/src/main/java/org/apache/asterix/messaging/NCMessageBroker.java b/asterix-app/src/main/java/org/apache/asterix/messaging/NCMessageBroker.java
index 0a0a917..13b0189 100644
--- a/asterix-app/src/main/java/org/apache/asterix/messaging/NCMessageBroker.java
+++ b/asterix-app/src/main/java/org/apache/asterix/messaging/NCMessageBroker.java
@@ -124,14 +124,17 @@
private void handleTakeoverPartitons(IMessage message) throws Exception {
TakeoverPartitionsRequestMessage msg = (TakeoverPartitionsRequestMessage) message;
- try {
- IRemoteRecoveryManager remoteRecoeryManager = appContext.getRemoteRecoveryManager();
- remoteRecoeryManager.takeoverPartitons(msg.getPartitions());
- } finally {
- //send response after takeover is completed
- TakeoverPartitionsResponseMessage reponse = new TakeoverPartitionsResponseMessage(msg.getRequestId(),
- appContext.getTransactionSubsystem().getId(), msg.getPartitions());
- sendMessage(reponse, null);
+ //if the NC is shutting down, it should ignore takeover partitions request
+ if (!appContext.isShuttingdown()) {
+ try {
+ IRemoteRecoveryManager remoteRecoeryManager = appContext.getRemoteRecoveryManager();
+ remoteRecoeryManager.takeoverPartitons(msg.getPartitions());
+ } finally {
+ //send response after takeover is completed
+ TakeoverPartitionsResponseMessage reponse = new TakeoverPartitionsResponseMessage(msg.getRequestId(),
+ appContext.getTransactionSubsystem().getId(), msg.getPartitions());
+ sendMessage(reponse, null);
+ }
}
}
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java b/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
index f667bd8..c5f6915 100644
--- a/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
+++ b/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
@@ -65,8 +65,8 @@
private final int numPartitions;
public DatasetLifecycleManager(AsterixStorageProperties storageProperties,
- ILocalResourceRepository resourceRepository, int firstAvilableUserDatasetID,
- ILogManager logManager, int numPartitions) {
+ ILocalResourceRepository resourceRepository, int firstAvilableUserDatasetID, ILogManager logManager,
+ int numPartitions) {
this.logManager = logManager;
this.storageProperties = storageProperties;
this.resourceRepository = resourceRepository;
@@ -111,6 +111,7 @@
if (!dsInfo.isRegistered) {
dsInfo.isExternal = !index.hasMemoryComponents();
dsInfo.isRegistered = true;
+ dsInfo.durable = ((ILSMIndex) index).isDurable();
}
if (dsInfo.indexes.containsKey(resourceID)) {
@@ -338,6 +339,7 @@
return dvbcs;
}
}
+
@Override
public ILSMOperationTracker getOperationTracker(int datasetID) {
synchronized (datasetOpTrackers) {
@@ -400,6 +402,7 @@
private boolean isExternal;
private boolean isRegistered;
private boolean memoryAllocated;
+ private boolean durable;
public DatasetInfo(int datasetID) {
this.indexes = new HashMap<Long, IndexInfo>();
@@ -480,7 +483,11 @@
public String toString() {
return "DatasetID: " + datasetID + ", isOpen: " + isOpen + ", refCount: " + referenceCount
+ ", lastAccess: " + lastAccess + ", isRegistered: " + isRegistered + ", memoryAllocated: "
- + memoryAllocated;
+ + memoryAllocated + ", isDurable: " + durable;
+ }
+
+ public boolean isDurable() {
+ return durable;
}
}
@@ -536,7 +543,7 @@
* This method can only be called asynchronously safely if we're sure no modify operation will take place until the flush is scheduled
*/
private void flushDatasetOpenIndexes(DatasetInfo dsInfo, boolean asyncFlush) throws HyracksDataException {
- if (!dsInfo.isExternal) {
+ if (!dsInfo.isExternal && dsInfo.durable) {
synchronized (logRecord) {
TransactionUtil.formFlushLogRecord(logRecord, dsInfo.datasetID, null, logManager.getNodeId(),
dsInfo.indexes.size());
@@ -731,8 +738,10 @@
List<IVirtualBufferCache> vbcs = new ArrayList<>();
for (int i = 0; i < storageProperties.getMemoryComponentsNum(); i++) {
MultitenantVirtualBufferCache vbc = new MultitenantVirtualBufferCache(
- new VirtualBufferCache(new ResourceHeapBufferAllocator(DatasetLifecycleManager.this,
- Integer.toString(datasetID)), storageProperties.getMemoryComponentPageSize(),
+ new VirtualBufferCache(
+ new ResourceHeapBufferAllocator(DatasetLifecycleManager.this,
+ Integer.toString(datasetID)),
+ storageProperties.getMemoryComponentPageSize(),
numPages / storageProperties.getMemoryComponentsNum() / numPartitions));
vbcs.add(vbc);
}
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java b/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java
index e5a3473..dac05e7 100644
--- a/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java
+++ b/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java
@@ -121,13 +121,15 @@
}
}
}
- LogRecord logRecord = new LogRecord();
- TransactionUtil.formFlushLogRecord(logRecord, datasetID, this, logManager.getNodeId(),
- dsInfo.getDatasetIndexes().size());
- try {
- logManager.log(logRecord);
- } catch (ACIDException e) {
- throw new HyracksDataException("could not write flush log", e);
+ if (dsInfo.isDurable()) {
+ LogRecord logRecord = new LogRecord();
+ TransactionUtil.formFlushLogRecord(logRecord, datasetID, this, logManager.getNodeId(),
+ dsInfo.getDatasetIndexes().size());
+ try {
+ logManager.log(logRecord);
+ } catch (ACIDException e) {
+ throw new HyracksDataException("could not write flush log", e);
+ }
}
flushLogCreated = true;
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/utils/StoragePathUtil.java b/asterix-common/src/main/java/org/apache/asterix/common/utils/StoragePathUtil.java
index 5b4035c..78b06fb 100644
--- a/asterix-common/src/main/java/org/apache/asterix/common/utils/StoragePathUtil.java
+++ b/asterix-common/src/main/java/org/apache/asterix/common/utils/StoragePathUtil.java
@@ -67,7 +67,7 @@
return (datasetName + DATASET_INDEX_NAME_SEPARATOR + idxName);
}
- public static int getPartitonNumFromName(String name) {
+ public static int getPartitionNumFromName(String name) {
return Integer.parseInt(name.substring(PARTITION_DIR_PREFIX.length()));
}
}
diff --git a/asterix-replication/src/main/java/org/apache/asterix/replication/storage/LSMIndexFileProperties.java b/asterix-replication/src/main/java/org/apache/asterix/replication/storage/LSMIndexFileProperties.java
index 2bf5fa3..a349e51 100644
--- a/asterix-replication/src/main/java/org/apache/asterix/replication/storage/LSMIndexFileProperties.java
+++ b/asterix-replication/src/main/java/org/apache/asterix/replication/storage/LSMIndexFileProperties.java
@@ -69,7 +69,7 @@
this.fileName = tokens[arraySize - 1];
this.idxName = tokens[arraySize - 2];
this.dataverse = tokens[arraySize - 3];
- this.partition = StoragePathUtil.getPartitonNumFromName(tokens[arraySize - 4]);
+ this.partition = StoragePathUtil.getPartitionNumFromName(tokens[arraySize - 4]);
}
public void serialize(OutputStream out) throws IOException {
diff --git a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
index 3a1e729..561b144 100644
--- a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
+++ b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
@@ -337,14 +337,25 @@
}
private void createReplicationJob(ReplicationOperation operation, String filePath) throws HyracksDataException {
- filesToBeReplicated.clear();
- filesToBeReplicated.add(filePath);
- AsterixReplicationJob job = new AsterixReplicationJob(ReplicationJobType.METADATA, operation,
- ReplicationExecutionType.SYNC, filesToBeReplicated);
- try {
- replicationManager.submitJob(job);
- } catch (IOException e) {
- throw new HyracksDataException(e);
+ /**
+ * Durable resources path format:
+ * /partition/dataverse/idx/fileName
+ * Temporary resources path format:
+ * /partition/TEMP_DATASETS_STORAGE_FOLDER/dataverse/idx/fileName
+ */
+ String[] fileNameTokens = filePath.split(File.separator);
+ String partitionDir = fileNameTokens[fileNameTokens.length - 4];
+ //exclude temporary datasets resources
+ if (!partitionDir.equals(StoragePathUtil.TEMP_DATASETS_STORAGE_FOLDER)) {
+ filesToBeReplicated.clear();
+ filesToBeReplicated.add(filePath);
+ AsterixReplicationJob job = new AsterixReplicationJob(ReplicationJobType.METADATA, operation,
+ ReplicationExecutionType.SYNC, filesToBeReplicated);
+ try {
+ replicationManager.submitJob(job);
+ } catch (IOException e) {
+ throw new HyracksDataException(e);
+ }
}
}
@@ -454,14 +465,14 @@
*/
public static String getResourceRelativePath(String resourceAbsolutePath) {
String[] tokens = resourceAbsolutePath.split(File.separator);
- //partiton/dataverse/idx/fileName
+ //partition/dataverse/idx/fileName
return tokens[tokens.length - 4] + File.separator + tokens[tokens.length - 3] + File.separator
+ tokens[tokens.length - 2] + File.separator + tokens[tokens.length - 1];
}
public static int getResourcePartition(String resourceAbsolutePath) {
String[] tokens = resourceAbsolutePath.split(File.separator);
- //partiton/dataverse/idx/fileName
- return StoragePathUtil.getPartitonNumFromName(tokens[tokens.length - 4]);
+ //partition/dataverse/idx/fileName
+ return StoragePathUtil.getPartitionNumFromName(tokens[tokens.length - 4]);
}
}