Exclude Temporary Resources From Replication

- Exclude temporary resources from replication.
- Remove flush logs from temporary datasets.
- Ignore takeover partitions request if NC is shutting down.
- Stop NCs on different threads to allow replica shutting down
  notification to be sent when replication is enabled.

Change-Id: I9a52557bf1f3e7632dd826384280abdaa186f672
Reviewed-on: https://asterix-gerrit.ics.uci.edu/778
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: abdullah alamoudi <bamousaa@gmail.com>
diff --git a/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java b/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java
index c67eb70..cc50b75 100644
--- a/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java
+++ b/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java
@@ -19,6 +19,7 @@
 package org.apache.asterix.api.common;
 
 import java.io.File;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.EnumSet;
 import java.util.List;
@@ -138,11 +139,31 @@
     }
 
     public static void deinit(boolean deleteOldInstanceData) throws Exception {
+        //stop NCs
+        ArrayList<Thread> stopNCThreads = new ArrayList<>();
         for (int n = 0; n < ncs.length; ++n) {
-            if (ncs[n] != null)
-                ncs[n].stop();
-
+            NodeControllerService nodeControllerService = ncs[n];
+            if (nodeControllerService != null) {
+                Thread ncStopThread = new Thread() {
+                    @Override
+                    public void run() {
+                        try {
+                            nodeControllerService.stop();
+                        } catch (Exception e) {
+                            e.printStackTrace();
+                        }
+                    }
+                };
+                stopNCThreads.add(ncStopThread);
+                ncStopThread.start();
+            }
         }
+
+        //make sure all NCs stopped
+        for (Thread stopNcTheard : stopNCThreads) {
+            stopNcTheard.join();
+        }
+
         if (cc != null) {
             cc.stop();
         }
diff --git a/asterix-app/src/main/java/org/apache/asterix/messaging/NCMessageBroker.java b/asterix-app/src/main/java/org/apache/asterix/messaging/NCMessageBroker.java
index 0a0a917..13b0189 100644
--- a/asterix-app/src/main/java/org/apache/asterix/messaging/NCMessageBroker.java
+++ b/asterix-app/src/main/java/org/apache/asterix/messaging/NCMessageBroker.java
@@ -124,14 +124,17 @@
 
     private void handleTakeoverPartitons(IMessage message) throws Exception {
         TakeoverPartitionsRequestMessage msg = (TakeoverPartitionsRequestMessage) message;
-        try {
-            IRemoteRecoveryManager remoteRecoeryManager = appContext.getRemoteRecoveryManager();
-            remoteRecoeryManager.takeoverPartitons(msg.getPartitions());
-        } finally {
-            //send response after takeover is completed
-            TakeoverPartitionsResponseMessage reponse = new TakeoverPartitionsResponseMessage(msg.getRequestId(),
-                    appContext.getTransactionSubsystem().getId(), msg.getPartitions());
-            sendMessage(reponse, null);
+        //if the NC is shutting down, it should ignore takeover partitions request
+        if (!appContext.isShuttingdown()) {
+            try {
+                IRemoteRecoveryManager remoteRecoeryManager = appContext.getRemoteRecoveryManager();
+                remoteRecoeryManager.takeoverPartitons(msg.getPartitions());
+            } finally {
+                //send response after takeover is completed
+                TakeoverPartitionsResponseMessage reponse = new TakeoverPartitionsResponseMessage(msg.getRequestId(),
+                        appContext.getTransactionSubsystem().getId(), msg.getPartitions());
+                sendMessage(reponse, null);
+            }
         }
     }
 
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java b/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
index f667bd8..c5f6915 100644
--- a/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
+++ b/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
@@ -65,8 +65,8 @@
     private final int numPartitions;
 
     public DatasetLifecycleManager(AsterixStorageProperties storageProperties,
-                                   ILocalResourceRepository resourceRepository, int firstAvilableUserDatasetID,
-                                   ILogManager logManager, int numPartitions) {
+            ILocalResourceRepository resourceRepository, int firstAvilableUserDatasetID, ILogManager logManager,
+            int numPartitions) {
         this.logManager = logManager;
         this.storageProperties = storageProperties;
         this.resourceRepository = resourceRepository;
@@ -111,6 +111,7 @@
         if (!dsInfo.isRegistered) {
             dsInfo.isExternal = !index.hasMemoryComponents();
             dsInfo.isRegistered = true;
+            dsInfo.durable = ((ILSMIndex) index).isDurable();
         }
 
         if (dsInfo.indexes.containsKey(resourceID)) {
@@ -338,6 +339,7 @@
             return dvbcs;
         }
     }
+
     @Override
     public ILSMOperationTracker getOperationTracker(int datasetID) {
         synchronized (datasetOpTrackers) {
@@ -400,6 +402,7 @@
         private boolean isExternal;
         private boolean isRegistered;
         private boolean memoryAllocated;
+        private boolean durable;
 
         public DatasetInfo(int datasetID) {
             this.indexes = new HashMap<Long, IndexInfo>();
@@ -480,7 +483,11 @@
         public String toString() {
             return "DatasetID: " + datasetID + ", isOpen: " + isOpen + ", refCount: " + referenceCount
                     + ", lastAccess: " + lastAccess + ", isRegistered: " + isRegistered + ", memoryAllocated: "
-                    + memoryAllocated;
+                    + memoryAllocated + ", isDurable: " + durable;
+        }
+
+        public boolean isDurable() {
+            return durable;
         }
     }
 
@@ -536,7 +543,7 @@
      * This method can only be called asynchronously safely if we're sure no modify operation will take place until the flush is scheduled
      */
     private void flushDatasetOpenIndexes(DatasetInfo dsInfo, boolean asyncFlush) throws HyracksDataException {
-        if (!dsInfo.isExternal) {
+        if (!dsInfo.isExternal && dsInfo.durable) {
             synchronized (logRecord) {
                 TransactionUtil.formFlushLogRecord(logRecord, dsInfo.datasetID, null, logManager.getNodeId(),
                         dsInfo.indexes.size());
@@ -731,8 +738,10 @@
             List<IVirtualBufferCache> vbcs = new ArrayList<>();
             for (int i = 0; i < storageProperties.getMemoryComponentsNum(); i++) {
                 MultitenantVirtualBufferCache vbc = new MultitenantVirtualBufferCache(
-                        new VirtualBufferCache(new ResourceHeapBufferAllocator(DatasetLifecycleManager.this,
-                                Integer.toString(datasetID)), storageProperties.getMemoryComponentPageSize(),
+                        new VirtualBufferCache(
+                                new ResourceHeapBufferAllocator(DatasetLifecycleManager.this,
+                                        Integer.toString(datasetID)),
+                                storageProperties.getMemoryComponentPageSize(),
                                 numPages / storageProperties.getMemoryComponentsNum() / numPartitions));
                 vbcs.add(vbc);
             }
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java b/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java
index e5a3473..dac05e7 100644
--- a/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java
+++ b/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java
@@ -121,13 +121,15 @@
                     }
                 }
             }
-            LogRecord logRecord = new LogRecord();
-            TransactionUtil.formFlushLogRecord(logRecord, datasetID, this, logManager.getNodeId(),
-                    dsInfo.getDatasetIndexes().size());
-            try {
-                logManager.log(logRecord);
-            } catch (ACIDException e) {
-                throw new HyracksDataException("could not write flush log", e);
+            if (dsInfo.isDurable()) {
+                LogRecord logRecord = new LogRecord();
+                TransactionUtil.formFlushLogRecord(logRecord, datasetID, this, logManager.getNodeId(),
+                        dsInfo.getDatasetIndexes().size());
+                try {
+                    logManager.log(logRecord);
+                } catch (ACIDException e) {
+                    throw new HyracksDataException("could not write flush log", e);
+                }
             }
 
             flushLogCreated = true;
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/utils/StoragePathUtil.java b/asterix-common/src/main/java/org/apache/asterix/common/utils/StoragePathUtil.java
index 5b4035c..78b06fb 100644
--- a/asterix-common/src/main/java/org/apache/asterix/common/utils/StoragePathUtil.java
+++ b/asterix-common/src/main/java/org/apache/asterix/common/utils/StoragePathUtil.java
@@ -67,7 +67,7 @@
         return (datasetName + DATASET_INDEX_NAME_SEPARATOR + idxName);
     }
 
-    public static int getPartitonNumFromName(String name) {
+    public static int getPartitionNumFromName(String name) {
         return Integer.parseInt(name.substring(PARTITION_DIR_PREFIX.length()));
     }
 }
diff --git a/asterix-replication/src/main/java/org/apache/asterix/replication/storage/LSMIndexFileProperties.java b/asterix-replication/src/main/java/org/apache/asterix/replication/storage/LSMIndexFileProperties.java
index 2bf5fa3..a349e51 100644
--- a/asterix-replication/src/main/java/org/apache/asterix/replication/storage/LSMIndexFileProperties.java
+++ b/asterix-replication/src/main/java/org/apache/asterix/replication/storage/LSMIndexFileProperties.java
@@ -69,7 +69,7 @@
         this.fileName = tokens[arraySize - 1];
         this.idxName = tokens[arraySize - 2];
         this.dataverse = tokens[arraySize - 3];
-        this.partition = StoragePathUtil.getPartitonNumFromName(tokens[arraySize - 4]);
+        this.partition = StoragePathUtil.getPartitionNumFromName(tokens[arraySize - 4]);
     }
 
     public void serialize(OutputStream out) throws IOException {
diff --git a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
index 3a1e729..561b144 100644
--- a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
+++ b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
@@ -337,14 +337,25 @@
     }
 
     private void createReplicationJob(ReplicationOperation operation, String filePath) throws HyracksDataException {
-        filesToBeReplicated.clear();
-        filesToBeReplicated.add(filePath);
-        AsterixReplicationJob job = new AsterixReplicationJob(ReplicationJobType.METADATA, operation,
-                ReplicationExecutionType.SYNC, filesToBeReplicated);
-        try {
-            replicationManager.submitJob(job);
-        } catch (IOException e) {
-            throw new HyracksDataException(e);
+        /**
+         * Durable resources path format:
+         * /partition/dataverse/idx/fileName
+         * Temporary resources path format:
+         * /partition/TEMP_DATASETS_STORAGE_FOLDER/dataverse/idx/fileName
+         */
+        String[] fileNameTokens = filePath.split(File.separator);
+        String partitionDir = fileNameTokens[fileNameTokens.length - 4];
+        //exclude temporary datasets resources
+        if (!partitionDir.equals(StoragePathUtil.TEMP_DATASETS_STORAGE_FOLDER)) {
+            filesToBeReplicated.clear();
+            filesToBeReplicated.add(filePath);
+            AsterixReplicationJob job = new AsterixReplicationJob(ReplicationJobType.METADATA, operation,
+                    ReplicationExecutionType.SYNC, filesToBeReplicated);
+            try {
+                replicationManager.submitJob(job);
+            } catch (IOException e) {
+                throw new HyracksDataException(e);
+            }
         }
     }
 
@@ -454,14 +465,14 @@
      */
     public static String getResourceRelativePath(String resourceAbsolutePath) {
         String[] tokens = resourceAbsolutePath.split(File.separator);
-        //partiton/dataverse/idx/fileName
+        //partition/dataverse/idx/fileName
         return tokens[tokens.length - 4] + File.separator + tokens[tokens.length - 3] + File.separator
                 + tokens[tokens.length - 2] + File.separator + tokens[tokens.length - 1];
     }
 
     public static int getResourcePartition(String resourceAbsolutePath) {
         String[] tokens = resourceAbsolutePath.split(File.separator);
-        //partiton/dataverse/idx/fileName
-        return StoragePathUtil.getPartitonNumFromName(tokens[tokens.length - 4]);
+        //partition/dataverse/idx/fileName
+        return StoragePathUtil.getPartitionNumFromName(tokens[tokens.length - 4]);
     }
 }