[ASTERIXDB-3217][CLUS] Support pause/resume for cloud deployment
- user model changes: no
- storage format changes: no
- interface changes: yes
Details:
- Upon booting, cluster's partitions will be restored
according to the configured caching policy (either
lazily or eagerly).
- Assume cluster state to be HEALTHY when txn
checkping is missing in cloud deployment.
- This patch also include some minor fixes
Change-Id: I271261c7ba2a4164babb7d4f79aff75bd5401595
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/17586
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Wail Alkowaileet <wael.y.k@gmail.com>
Reviewed-by: Murtadha Al Hubail <mhubail@apache.org>
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
index 0611545..56f794b 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
@@ -30,7 +30,8 @@
import org.apache.asterix.active.ActiveManager;
import org.apache.asterix.app.result.ResultReader;
-import org.apache.asterix.cloud.CloudIOManager;
+import org.apache.asterix.cloud.CloudManagerProvider;
+import org.apache.asterix.cloud.LocalPartitionBootstrapper;
import org.apache.asterix.common.api.IConfigValidator;
import org.apache.asterix.common.api.IConfigValidatorFactory;
import org.apache.asterix.common.api.ICoordinationService;
@@ -40,6 +41,7 @@
import org.apache.asterix.common.api.IPropertiesFactory;
import org.apache.asterix.common.api.IReceptionist;
import org.apache.asterix.common.api.IReceptionistFactory;
+import org.apache.asterix.common.cloud.IPartitionBootstrapper;
import org.apache.asterix.common.config.ActiveProperties;
import org.apache.asterix.common.config.BuildProperties;
import org.apache.asterix.common.config.CloudProperties;
@@ -95,7 +97,6 @@
import org.apache.hyracks.client.result.ResultSet;
import org.apache.hyracks.control.common.controllers.NCConfig;
import org.apache.hyracks.control.nc.NodeControllerService;
-import org.apache.hyracks.control.nc.io.IOManager;
import org.apache.hyracks.ipc.impl.HyracksConnection;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationScheduler;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMMergePolicyFactory;
@@ -166,6 +167,7 @@
private IConfigValidator configValidator;
private IDiskWriteRateLimiterProvider diskWriteRateLimiterProvider;
private final CloudProperties cloudProperties;
+ private IPartitionBootstrapper partitionBootstrapper;
public NCAppRuntimeContext(INCServiceContext ncServiceContext, NCExtensionManager extensionManager,
IPropertiesFactory propertiesFactory) {
@@ -194,9 +196,11 @@
boolean initialRun) throws IOException {
ioManager = getServiceContext().getIoManager();
if (isCloudDeployment()) {
- persistenceIOManager = new CloudIOManager((IOManager) ioManager, cloudProperties);
+ persistenceIOManager = CloudManagerProvider.createIOManager(cloudProperties, ioManager);
+ partitionBootstrapper = CloudManagerProvider.getCloudPartitionBootstrapper(persistenceIOManager);
} else {
persistenceIOManager = ioManager;
+ partitionBootstrapper = new LocalPartitionBootstrapper(ioManager);
}
int ioQueueLen = getServiceContext().getAppConfig().getInt(NCConfig.Option.IO_QUEUE_SIZE);
threadExecutor =
@@ -669,4 +673,9 @@
public CloudProperties getCloudProperties() {
return cloudProperties;
}
+
+ @Override
+ public IPartitionBootstrapper getPartitionBootstrapper() {
+ return partitionBootstrapper;
+ }
}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java
index 2ca3fbc..f6eb123 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java
@@ -135,10 +135,7 @@
//read checkpoint file
Checkpoint checkpointObject = checkpointManager.getLatest();
if (checkpointObject == null) {
- //The checkpoint file doesn't exist => Failure happened during NC initialization.
- //Retry to initialize the NC by setting the state to PERMANENT_DATA_LOSS
- state = SystemState.PERMANENT_DATA_LOSS;
- LOGGER.info("The checkpoint file doesn't exist: systemState = PERMANENT_DATA_LOSS");
+ state = appCtx.getPartitionBootstrapper().getSystemStateOnMissingCheckpoint();
return state;
}
long readableSmallestLSN = logMgr.getReadableSmallestLSN();
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/CloudToLocalStorageCachingTask.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/CloudToLocalStorageCachingTask.java
index d214088..a44a695 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/CloudToLocalStorageCachingTask.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/CloudToLocalStorageCachingTask.java
@@ -21,9 +21,10 @@
import java.util.Arrays;
import java.util.Set;
-import org.apache.asterix.cloud.CloudIOManager;
import org.apache.asterix.common.api.INCLifecycleTask;
import org.apache.asterix.common.api.INcApplicationContext;
+import org.apache.asterix.common.cloud.IPartitionBootstrapper;
+import org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository;
import org.apache.hyracks.api.control.CcId;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.service.IControllerService;
@@ -35,28 +36,36 @@
private static final Logger LOGGER = LogManager.getLogger();
private static final long serialVersionUID = 1L;
- private final Set<Integer> partitions;
+ private final Set<Integer> storagePartitions;
+ private final boolean metadataNode;
+ private final int metadataPartitionId;
+ private final boolean cleanup;
- public CloudToLocalStorageCachingTask(Set<Integer> partitions) {
- this.partitions = partitions;
+ public CloudToLocalStorageCachingTask(Set<Integer> storagePartitions, boolean metadataNode, int metadataPartitionId,
+ boolean cleanup) {
+ this.storagePartitions = storagePartitions;
+ this.metadataNode = metadataNode;
+ this.metadataPartitionId = metadataPartitionId;
+ this.cleanup = cleanup;
}
@Override
public void perform(CcId ccId, IControllerService cs) throws HyracksDataException {
INcApplicationContext applicationContext = (INcApplicationContext) cs.getApplicationContext();
+ PersistentLocalResourceRepository lrs =
+ (PersistentLocalResourceRepository) applicationContext.getLocalResourceRepository();
String nodeId = applicationContext.getServiceContext().getNodeId();
- LOGGER.info("Syncing cloud to local storage for node {}. for partitions: {}", nodeId, partitions);
+ LOGGER.info("Initializing Node {} with storage partitions: {}", nodeId, storagePartitions);
- CloudIOManager cloudIOManager = (CloudIOManager) applicationContext.getPersistenceIoManager();
-
- // TODO(htowaileb): eager caching is disabled for now as it depends on static partitioning work
- cloudIOManager.syncFiles(partitions);
+ IPartitionBootstrapper bootstrapper = applicationContext.getPartitionBootstrapper();
+ bootstrapper.bootstrap(storagePartitions, lrs.getOnDiskPartitions(), metadataNode, metadataPartitionId,
+ cleanup);
}
@Override
public String toString() {
return "{ \"class\" : \"" + getClass().getSimpleName() + "\", \"partitions\" : "
- + Arrays.toString(partitions.toArray()) + " }";
+ + Arrays.toString(storagePartitions.toArray()) + " }";
}
}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/NcLifecycleCoordinator.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/NcLifecycleCoordinator.java
index 449dd27..11a85ba 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/NcLifecycleCoordinator.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/NcLifecycleCoordinator.java
@@ -35,6 +35,7 @@
import org.apache.asterix.app.nc.task.BindMetadataNodeTask;
import org.apache.asterix.app.nc.task.CheckpointTask;
+import org.apache.asterix.app.nc.task.CloudToLocalStorageCachingTask;
import org.apache.asterix.app.nc.task.ExportMetadataNodeTask;
import org.apache.asterix.app.nc.task.LocalRecoveryTask;
import org.apache.asterix.app.nc.task.LocalStorageCleanupTask;
@@ -48,9 +49,11 @@
import org.apache.asterix.app.replication.message.NCLifecycleTaskReportMessage;
import org.apache.asterix.app.replication.message.RegistrationTasksRequestMessage;
import org.apache.asterix.app.replication.message.RegistrationTasksResponseMessage;
+import org.apache.asterix.common.api.IApplicationContext;
import org.apache.asterix.common.api.IClusterManagementWork.ClusterState;
import org.apache.asterix.common.api.INCLifecycleTask;
import org.apache.asterix.common.cluster.IClusterStateManager;
+import org.apache.asterix.common.cluster.StorageComputePartitionsMap;
import org.apache.asterix.common.exceptions.ErrorCode;
import org.apache.asterix.common.exceptions.RuntimeDataException;
import org.apache.asterix.common.messaging.api.ICCMessageBroker;
@@ -83,7 +86,7 @@
private final boolean replicationEnabled;
private final IGatekeeper gatekeeper;
Map<String, Map<String, Object>> nodeSecretsMap;
- private ICCServiceContext serviceContext;
+ private final ICCServiceContext serviceContext;
public NcLifecycleCoordinator(ICCServiceContext serviceCtx, boolean replicationEnabled) {
this.serviceContext = serviceCtx;
@@ -219,6 +222,8 @@
Set<Integer> nodeActivePartitions = getNodeActivePartitions(newNodeId, activePartitions, metadataNode);
tasks.add(new UpdateNodeStatusTask(NodeStatus.BOOTING, nodeActivePartitions));
int metadataPartitionId = clusterManager.getMetadataPartition().getPartitionId();
+ // Add any cloud-related tasks
+ addCloudTasks(tasks, nodeActivePartitions, metadataNode, metadataPartitionId);
tasks.add(new LocalStorageCleanupTask(metadataPartitionId));
if (state == SystemState.CORRUPTED) {
// need to perform local recovery for node active partitions
@@ -251,6 +256,19 @@
return tasks;
}
+ protected void addCloudTasks(List<INCLifecycleTask> tasks, Set<Integer> computePartitions, boolean metadataNode,
+ int metadataPartitionId) {
+ IApplicationContext appCtx = (IApplicationContext) serviceContext.getApplicationContext();
+ if (!appCtx.isCloudDeployment()) {
+ return;
+ }
+
+ StorageComputePartitionsMap map = clusterManager.getStorageComputeMap();
+ map = map == null ? StorageComputePartitionsMap.computePartitionsMap(clusterManager) : map;
+ Set<Integer> storagePartitions = map.getStoragePartitions(computePartitions);
+ tasks.add(new CloudToLocalStorageCachingTask(storagePartitions, metadataNode, metadataPartitionId, false));
+ }
+
private synchronized void process(MetadataNodeResponseMessage response) throws HyracksDataException {
// rebind metadata node since it might be changing
MetadataManager.INSTANCE.rebindMetadataNode();
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/common/CloudStorageIntegrationUtil.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/common/CloudStorageIntegrationUtil.java
index eb2b05b..47c0599 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/common/CloudStorageIntegrationUtil.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/common/CloudStorageIntegrationUtil.java
@@ -27,10 +27,11 @@
public static final String CONFIG_FILE = joinPath(RESOURCES_PATH, "cc-cloud-storage.conf");
public static void main(String[] args) throws Exception {
- CloudUtils.startS3CloudEnvironment();
+ boolean cleanStart = Boolean.getBoolean("cleanup.start");
+ LocalCloudUtil.startS3CloudEnvironment(cleanStart);
final AsterixHyracksIntegrationUtil integrationUtil = new AsterixHyracksIntegrationUtil();
try {
- integrationUtil.run(Boolean.getBoolean("cleanup.start"), Boolean.getBoolean("cleanup.shutdown"),
+ integrationUtil.run(cleanStart, Boolean.getBoolean("cleanup.shutdown"),
System.getProperty("external.lib", ""), CONFIG_FILE);
} catch (Exception e) {
LOGGER.fatal("Unexpected exception", e);
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/common/CloudUtils.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/common/LocalCloudUtil.java
similarity index 81%
rename from asterixdb/asterix-app/src/test/java/org/apache/asterix/api/common/CloudUtils.java
rename to asterixdb/asterix-app/src/test/java/org/apache/asterix/api/common/LocalCloudUtil.java
index 352dc06..39af667 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/common/CloudUtils.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/common/LocalCloudUtil.java
@@ -18,8 +18,12 @@
*/
package org.apache.asterix.api.common;
+import static org.apache.hyracks.util.file.FileUtil.joinPath;
+
+import java.io.File;
import java.net.URI;
+import org.apache.commons.io.FileUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -30,7 +34,7 @@
import software.amazon.awssdk.services.s3.S3ClientBuilder;
import software.amazon.awssdk.services.s3.model.CreateBucketRequest;
-public class CloudUtils {
+public class LocalCloudUtil {
private static final Logger LOGGER = LogManager.getLogger();
@@ -38,20 +42,26 @@
public static final String MOCK_SERVER_HOSTNAME = "http://127.0.0.1:" + MOCK_SERVER_PORT;
public static final String CLOUD_STORAGE_BUCKET = "cloud-storage-container";
public static final String MOCK_SERVER_REGION = "us-west-2";
+ private static final String MOCK_FILE_BACKEND = joinPath("target", "s3mock");
private static S3Mock s3MockServer;
- private CloudUtils() {
+ private LocalCloudUtil() {
throw new AssertionError("Do not instantiate");
}
public static void main(String[] args) {
- startS3CloudEnvironment();
+ // Change to 'true' if you want to delete "s3mock" folder on start
+ startS3CloudEnvironment(true);
}
- public static void startS3CloudEnvironment() {
+ public static void startS3CloudEnvironment(boolean cleanStart) {
+ if (cleanStart) {
+ FileUtils.deleteQuietly(new File(MOCK_FILE_BACKEND));
+ }
// Starting S3 mock server to be used instead of real S3 server
LOGGER.info("Starting S3 mock server");
- s3MockServer = new S3Mock.Builder().withPort(MOCK_SERVER_PORT).withInMemoryBackend().build();
+ // Use file backend for debugging/inspection
+ s3MockServer = new S3Mock.Builder().withPort(MOCK_SERVER_PORT).withFileBackend(MOCK_FILE_BACKEND).build();
shutdownSilently();
try {
s3MockServer.start();
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/cloud_storage/CloudStorageTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/cloud_storage/CloudStorageTest.java
index f3fd805..fd789e8 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/cloud_storage/CloudStorageTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/cloud_storage/CloudStorageTest.java
@@ -20,7 +20,7 @@
import java.util.Collection;
-import org.apache.asterix.api.common.CloudUtils;
+import org.apache.asterix.api.common.LocalCloudUtil;
import org.apache.asterix.common.config.GlobalConfig;
import org.apache.asterix.test.common.TestExecutor;
import org.apache.asterix.test.runtime.LangExecutionUtil;
@@ -53,7 +53,7 @@
@BeforeClass
public static void setUp() throws Exception {
- CloudUtils.startS3CloudEnvironment();
+ LocalCloudUtil.startS3CloudEnvironment(true);
LangExecutionUtil.setUp(CONFIG_FILE_NAME, testExecutor);
System.setProperty(GlobalConfig.CONFIG_FILE_PROPERTY, CONFIG_FILE_NAME);
}
diff --git a/asterixdb/asterix-app/src/test/resources/cc-cloud-storage.conf b/asterixdb/asterix-app/src/test/resources/cc-cloud-storage.conf
index 6ffa3c0..7d0b06b 100644
--- a/asterixdb/asterix-app/src/test/resources/cc-cloud-storage.conf
+++ b/asterixdb/asterix-app/src/test/resources/cc-cloud-storage.conf
@@ -62,8 +62,10 @@
messaging.frame.count=512
cloud.deployment=true
storage.buffercache.pagesize=32KB
+storage.partitioning=static
cloud.storage.scheme=s3
cloud.storage.bucket=cloud-storage-container
cloud.storage.region=us-west-2
cloud.storage.endpoint=http://127.0.0.1:8001
-cloud.storage.anonymous.auth=true
\ No newline at end of file
+cloud.storage.anonymous.auth=true
+cloud.storage.cache.policy=lazy
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1/cluster_state_1.1.regexadm b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1/cluster_state_1.1.regexadm
index 74a17e0..a6ab2e0 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1/cluster_state_1.1.regexadm
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1/cluster_state_1.1.regexadm
@@ -12,6 +12,7 @@
"cloud.deployment" : false,
"cloud.storage.anonymous.auth" : false,
"cloud.storage.bucket" : "",
+ "cloud.storage.cache.policy" : "lazy",
"cloud.storage.endpoint" : "",
"cloud.storage.prefix" : "",
"cloud.storage.region" : "",
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_full/cluster_state_1_full.1.regexadm b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_full/cluster_state_1_full.1.regexadm
index a61c0dd..395c96c 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_full/cluster_state_1_full.1.regexadm
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_full/cluster_state_1_full.1.regexadm
@@ -12,6 +12,7 @@
"cloud.deployment" : false,
"cloud.storage.anonymous.auth" : false,
"cloud.storage.bucket" : "",
+ "cloud.storage.cache.policy" : "lazy",
"cloud.storage.endpoint" : "",
"cloud.storage.prefix" : "",
"cloud.storage.region" : "",
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_less/cluster_state_1_less.1.regexadm b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_less/cluster_state_1_less.1.regexadm
index fa3f5fa..8396311 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_less/cluster_state_1_less.1.regexadm
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_less/cluster_state_1_less.1.regexadm
@@ -12,6 +12,7 @@
"cloud.deployment" : false,
"cloud.storage.anonymous.auth" : false,
"cloud.storage.bucket" : "",
+ "cloud.storage.cache.policy" : "lazy",
"cloud.storage.endpoint" : "",
"cloud.storage.prefix" : "",
"cloud.storage.region" : "",
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/AbstractCloudIOManager.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/AbstractCloudIOManager.java
new file mode 100644
index 0000000..1c06919
--- /dev/null
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/AbstractCloudIOManager.java
@@ -0,0 +1,284 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.cloud;
+
+import static org.apache.asterix.common.utils.StorageConstants.METADATA_PARTITION;
+import static org.apache.asterix.common.utils.StorageConstants.PARTITION_DIR_PREFIX;
+import static org.apache.asterix.common.utils.StorageConstants.STORAGE_ROOT_DIR_NAME;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.asterix.cloud.clients.CloudClientProvider;
+import org.apache.asterix.cloud.clients.ICloudClient;
+import org.apache.asterix.cloud.util.CloudFileUtil;
+import org.apache.asterix.common.cloud.IPartitionBootstrapper;
+import org.apache.asterix.common.config.CloudProperties;
+import org.apache.asterix.common.transactions.IRecoveryManager;
+import org.apache.asterix.common.utils.StoragePathUtil;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.io.FileReference;
+import org.apache.hyracks.api.io.IFileHandle;
+import org.apache.hyracks.api.util.IoUtil;
+import org.apache.hyracks.control.nc.io.IOManager;
+import org.apache.hyracks.util.file.FileUtil;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+public abstract class AbstractCloudIOManager extends IOManager implements IPartitionBootstrapper {
+ private static final Logger LOGGER = LogManager.getLogger();
+ private static final String DATAVERSE_PATH =
+ FileUtil.joinPath(STORAGE_ROOT_DIR_NAME, PARTITION_DIR_PREFIX + METADATA_PARTITION, "Metadata");
+ protected final ICloudClient cloudClient;
+ protected final WriteBufferProvider writeBufferProvider;
+ protected final String bucket;
+ protected final Set<Integer> partitions;
+ protected final List<FileReference> partitionPaths;
+ protected final IOManager localIoManager;
+
+ public AbstractCloudIOManager(IOManager ioManager, CloudProperties cloudProperties) throws HyracksDataException {
+ super(ioManager.getIODevices(), ioManager.getDeviceComputer(), ioManager.getIOParallelism(),
+ ioManager.getQueueSize());
+ this.bucket = cloudProperties.getStorageBucket();
+ cloudClient = CloudClientProvider.getClient(cloudProperties);
+ int numOfThreads = getIODevices().size() * getIOParallelism();
+ writeBufferProvider = new WriteBufferProvider(numOfThreads);
+ partitions = new HashSet<>();
+ partitionPaths = new ArrayList<>();
+ this.localIoManager = ioManager;
+ }
+
+ /*
+ * ******************************************************************
+ * IPartitionBootstrapper functions
+ * ******************************************************************
+ */
+
+ @Override
+ public IRecoveryManager.SystemState getSystemStateOnMissingCheckpoint() {
+ if (cloudClient.listObjects(bucket, DATAVERSE_PATH, IoUtil.NO_OP_FILTER).isEmpty()) {
+ LOGGER.info("First time to initialize this cluster: systemState = PERMANENT_DATA_LOSS");
+ return IRecoveryManager.SystemState.PERMANENT_DATA_LOSS;
+ } else {
+ LOGGER.info("Resuming a previous initialized cluster: systemState = HEALTHY");
+ return IRecoveryManager.SystemState.HEALTHY;
+ }
+ }
+
+ @Override
+ public final void bootstrap(Set<Integer> activePartitions, List<FileReference> currentOnDiskPartitions,
+ boolean metadataNode, int metadataPartition, boolean cleanup) throws HyracksDataException {
+ partitions.clear();
+ partitions.addAll(activePartitions);
+ if (metadataNode) {
+ partitions.add(metadataPartition);
+ }
+
+ partitionPaths.clear();
+ for (Integer partition : activePartitions) {
+ String partitionDir = PARTITION_DIR_PREFIX + partition;
+ partitionPaths.add(resolve(STORAGE_ROOT_DIR_NAME + File.separator + partitionDir));
+ }
+
+ LOGGER.warn("Initializing cloud manager with storage partitions: {}", partitions);
+
+ if (cleanup) {
+ deleteUnkeptPartitionDirs(currentOnDiskPartitions);
+ cleanupLocalFiles();
+ }
+ // Has different implementations depending on the caching policy
+ downloadPartitions();
+ }
+
+ private void deleteUnkeptPartitionDirs(List<FileReference> currentOnDiskPartitions) throws HyracksDataException {
+ for (FileReference partitionDir : currentOnDiskPartitions) {
+ int partitionNum = StoragePathUtil.getPartitionNumFromRelativePath(partitionDir.getRelativePath());
+ if (!partitions.contains(partitionNum)) {
+ LOGGER.warn("Deleting storage partition {} as it does not belong to the current storage partitions {}",
+ partitionNum, partitions);
+ localIoManager.deleteDirectory(partitionDir);
+ }
+ }
+ }
+
+ private void cleanupLocalFiles() throws HyracksDataException {
+ Set<String> cloudFiles = cloudClient.listObjects(bucket, STORAGE_ROOT_DIR_NAME, IoUtil.NO_OP_FILTER);
+ if (cloudFiles.isEmpty()) {
+ for (FileReference partitionPath : partitionPaths) {
+ if (localIoManager.exists(partitionPath)) {
+ // Clean local dir from all files
+ localIoManager.cleanDirectory(partitionPath);
+ }
+ }
+ } else {
+ for (FileReference partitionPath : partitionPaths) {
+ CloudFileUtil.cleanDirectoryFiles(localIoManager, cloudFiles, partitionPath);
+ }
+ }
+ }
+
+ protected abstract void downloadPartitions() throws HyracksDataException;
+
+ /*
+ * ******************************************************************
+ * IIOManager functions
+ * ******************************************************************
+ */
+
+ @Override
+ public boolean exists(FileReference fileRef) throws HyracksDataException {
+ return localIoManager.exists(fileRef) || cloudClient.exists(bucket, fileRef.getRelativePath());
+ }
+
+ @Override
+ public final IFileHandle open(FileReference fileRef, FileReadWriteMode rwMode, FileSyncMode syncMode)
+ throws HyracksDataException {
+ CloudFileHandle fHandle = new CloudFileHandle(cloudClient, bucket, fileRef, writeBufferProvider);
+ onOpen(fHandle, rwMode, syncMode);
+ try {
+ fHandle.open(rwMode, syncMode);
+ } catch (IOException e) {
+ throw HyracksDataException.create(e);
+ }
+ return fHandle;
+ }
+
+ /**
+ * Action required to do when opening a file
+ *
+ * @param fileHandle file to open
+ */
+ protected abstract void onOpen(CloudFileHandle fileHandle, FileReadWriteMode rwMode, FileSyncMode syncMode)
+ throws HyracksDataException;
+
+ @Override
+ public final long doSyncWrite(IFileHandle fHandle, long offset, ByteBuffer[] dataArray)
+ throws HyracksDataException {
+ long writtenBytes = localIoManager.doSyncWrite(fHandle, offset, dataArray);
+ CloudResettableInputStream inputStream = ((CloudFileHandle) fHandle).getInputStream();
+ try {
+ inputStream.write(dataArray[0], dataArray[1]);
+ } catch (HyracksDataException e) {
+ inputStream.abort();
+ throw e;
+ }
+ return writtenBytes;
+ }
+
+ @Override
+ public final int doSyncWrite(IFileHandle fHandle, long offset, ByteBuffer dataArray) throws HyracksDataException {
+ int writtenBytes = localIoManager.doSyncWrite(fHandle, offset, dataArray);
+ CloudResettableInputStream inputStream = ((CloudFileHandle) fHandle).getInputStream();
+ try {
+ inputStream.write(dataArray);
+ } catch (HyracksDataException e) {
+ inputStream.abort();
+ throw e;
+ }
+ return writtenBytes;
+ }
+
+ @Override
+ public final void delete(FileReference fileRef) throws HyracksDataException {
+ if (!STORAGE_ROOT_DIR_NAME.equals(IoUtil.getFileNameFromPath(fileRef.getAbsolutePath()))) {
+ // Never delete the storage dir in cloud storage
+ cloudClient.deleteObject(bucket, fileRef.getRelativePath());
+ }
+ localIoManager.delete(fileRef);
+ }
+
+ @Override
+ public final void close(IFileHandle fHandle) throws HyracksDataException {
+ try {
+ CloudFileHandle cloudFileHandle = (CloudFileHandle) fHandle;
+ cloudFileHandle.close();
+ } catch (IOException e) {
+ throw HyracksDataException.create(e);
+ }
+ }
+
+ @Override
+ public final void sync(IFileHandle fileHandle, boolean metadata) throws HyracksDataException {
+ HyracksDataException savedEx = null;
+ if (metadata) {
+ // only finish writing if metadata == true to prevent write limiter from finishing the stream and
+ // completing the upload.
+ CloudResettableInputStream stream = ((CloudFileHandle) fileHandle).getInputStream();
+ try {
+ stream.finish();
+ } catch (HyracksDataException e) {
+ savedEx = e;
+ }
+
+ if (savedEx != null) {
+ try {
+ stream.abort();
+ } catch (HyracksDataException e) {
+ savedEx.addSuppressed(e);
+ }
+ throw savedEx;
+ }
+ }
+ // Sync only after finalizing the upload to cloud storage
+ localIoManager.sync(fileHandle, metadata);
+ }
+
+ @Override
+ public final void overwrite(FileReference fileRef, byte[] bytes) throws HyracksDataException {
+ // Write here will overwrite the older object if exists
+ cloudClient.write(bucket, fileRef.getRelativePath(), bytes);
+ localIoManager.overwrite(fileRef, bytes);
+ }
+
+ @Override
+ public final void deleteDirectory(FileReference fileRef) throws HyracksDataException {
+ if (!STORAGE_ROOT_DIR_NAME.equals(IoUtil.getFileNameFromPath(fileRef.getAbsolutePath()))) {
+ // Never delete the storage dir in cloud storage
+ cloudClient.deleteObject(bucket, fileRef.getRelativePath());
+ }
+ localIoManager.deleteDirectory(fileRef);
+ }
+
+ @Override
+ public final void create(FileReference fileRef) throws HyracksDataException {
+ // We need to delete the local file on create as the cloud storage didn't complete the upload
+ // In other words, both cloud files and the local files are not in sync
+ localIoManager.delete(fileRef);
+ localIoManager.create(fileRef);
+ }
+
+ @Override
+ public final void copyDirectory(FileReference srcFileRef, FileReference destFileRef) throws HyracksDataException {
+ cloudClient.copy(bucket, srcFileRef.getRelativePath(), destFileRef);
+ localIoManager.copyDirectory(srcFileRef, destFileRef);
+ }
+
+ // TODO(htowaileb): the localIoManager is closed by the node controller service as well, check if we need this
+ @Override
+ public final void close() throws IOException {
+ cloudClient.close();
+ super.close();
+ localIoManager.close();
+ }
+}
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudFileHandle.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudFileHandle.java
index 24cb0bb..8572014 100644
--- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudFileHandle.java
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudFileHandle.java
@@ -43,6 +43,12 @@
}
}
+ @Override
+ public synchronized void close() throws IOException {
+ inputStream.close();
+ super.close();
+ }
+
public CloudResettableInputStream getInputStream() {
return inputStream;
}
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudIOManager.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudIOManager.java
deleted file mode 100644
index 4939ab5..0000000
--- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudIOManager.java
+++ /dev/null
@@ -1,362 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.cloud;
-
-import static org.apache.asterix.common.utils.StorageConstants.PARTITION_DIR_PREFIX;
-import static org.apache.asterix.common.utils.StorageConstants.STORAGE_ROOT_DIR_NAME;
-
-import java.io.File;
-import java.io.FilenameFilter;
-import java.io.IOException;
-import java.io.InputStream;
-import java.nio.ByteBuffer;
-import java.nio.channels.ClosedByInterruptException;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.asterix.cloud.clients.CloudClientProvider;
-import org.apache.asterix.cloud.clients.ICloudClient;
-import org.apache.asterix.cloud.clients.aws.s3.S3Utils;
-import org.apache.asterix.common.config.CloudProperties;
-import org.apache.commons.io.FileUtils;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.api.io.FileReference;
-import org.apache.hyracks.api.io.IFileDeviceResolver;
-import org.apache.hyracks.api.io.IFileHandle;
-import org.apache.hyracks.api.io.IODeviceHandle;
-import org.apache.hyracks.api.util.IoUtil;
-import org.apache.hyracks.control.nc.io.FileHandle;
-import org.apache.hyracks.control.nc.io.IOManager;
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
-
-public class CloudIOManager extends IOManager {
- private static final Logger LOGGER = LogManager.getLogger();
- private final ICloudClient cloudClient;
- private final WriteBufferProvider writeBufferProvider;
- private final String bucket;
- private IOManager localIoManager;
-
- private CloudIOManager(List<IODeviceHandle> devices, IFileDeviceResolver deviceComputer, int ioParallelism,
- int queueSize, CloudProperties cloudProperties) throws HyracksDataException {
- super(devices, deviceComputer, ioParallelism, queueSize);
- this.bucket = cloudProperties.getStorageBucket();
- cloudClient = CloudClientProvider.getClient(cloudProperties);
- int numOfThreads = getIODevices().size() * getIoParallelism();
- writeBufferProvider = new WriteBufferProvider(numOfThreads);
- }
-
- public CloudIOManager(IOManager ioManager, CloudProperties cloudProperties) throws HyracksDataException {
- this(ioManager.getIoDevices(), ioManager.getDeviceComputer(), ioManager.getIoParallelism(),
- ioManager.getQueueSize(), cloudProperties);
- this.localIoManager = ioManager;
- }
-
- public String getBucket() {
- return bucket;
- }
-
- @Override
- public long doSyncWrite(IFileHandle fHandle, long offset, ByteBuffer[] dataArray) throws HyracksDataException {
- long writtenBytes = super.doSyncWrite(fHandle, offset, dataArray);
- CloudResettableInputStream inputStream = ((CloudFileHandle) fHandle).getInputStream();
- try {
- inputStream.write(dataArray[0], dataArray[1]);
- } catch (HyracksDataException e) {
- inputStream.abort();
- throw e;
- }
- return writtenBytes;
- }
-
- @Override
- public int doSyncWrite(IFileHandle fHandle, long offset, ByteBuffer dataArray) throws HyracksDataException {
- int writtenBytes = super.doSyncWrite(fHandle, offset, dataArray);
- CloudResettableInputStream inputStream = ((CloudFileHandle) fHandle).getInputStream();
- try {
- inputStream.write(dataArray);
- } catch (HyracksDataException e) {
- inputStream.abort();
- throw e;
- }
- return writtenBytes;
- }
-
- @Override
- public IFileHandle open(FileReference fileRef, FileReadWriteMode rwMode, FileSyncMode syncMode)
- throws HyracksDataException {
-
- CloudFileHandle fHandle = new CloudFileHandle(cloudClient, bucket, fileRef, writeBufferProvider);
- if (!super.exists(fileRef) && cloudClient.exists(bucket, fileRef.getRelativePath())) {
- ByteBuffer writeBuffer = writeBufferProvider.getBuffer();
- try {
- LOGGER.info("Downloading {} from S3..", fileRef.getRelativePath());
- downloadFile(fHandle, rwMode, syncMode, writeBuffer);
- super.close(fHandle);
- LOGGER.info("Finished downloading {} from S3..", fileRef.getRelativePath());
- } finally {
- writeBufferProvider.recycle(writeBuffer);
- }
- }
-
- try {
- fHandle.open(rwMode, syncMode);
- } catch (IOException e) {
- throw HyracksDataException.create(e);
- }
- return fHandle;
- }
-
- @Override
- public void delete(FileReference fileRef) throws HyracksDataException {
- if (!STORAGE_ROOT_DIR_NAME.equals(IoUtil.getFileNameFromPath(fileRef.getAbsolutePath()))) {
- // Never delete the storage dir in cloud storage
- cloudClient.deleteObject(bucket, fileRef.getRelativePath());
- }
- super.delete(fileRef);
- }
-
- @Override
- public void close(IFileHandle fHandle) throws HyracksDataException {
- try {
- ((CloudFileHandle) fHandle).getInputStream().close();
- } catch (IOException e) {
- throw HyracksDataException.create(e);
- }
- super.close(fHandle);
- }
-
- // TODO This method should not do any syncing. It simply should list the files
- @Override
- public Set<FileReference> list(FileReference dir, FilenameFilter filter) throws HyracksDataException {
- Set<String> cloudFiles = cloudClient.listObjects(bucket, dir.getRelativePath(), filter);
- if (cloudFiles.isEmpty()) {
- // TODO(htowaileb): Can we end up in a state where local has files but cloud does not?
- return Collections.emptySet();
- }
-
- // First get the set of local files
- Set<FileReference> localFiles = super.list(dir, filter);
- Iterator<FileReference> localFilesIter = localFiles.iterator();
-
- // Reconcile local files and cloud files
- while (localFilesIter.hasNext()) {
- FileReference file = localFilesIter.next();
- if (file.getFile().isDirectory()) {
- continue;
- }
- String path = S3Utils.toCloudPrefix(file.getRelativePath());
- if (!cloudFiles.contains(path)) {
- // Delete local files that do not exist in cloud storage (the ground truth for valid files)
- localFilesIter.remove();
- super.delete(file);
- } else {
- // No need to re-add it in the following loop
- cloudFiles.remove(path);
- }
- }
-
- // Add the remaining files that are not stored locally (if any)
- for (String cloudFile : cloudFiles) {
- localFiles.add(new FileReference(dir.getDeviceHandle(),
- cloudFile.substring(cloudFile.indexOf(dir.getRelativePath()))));
- }
- return new HashSet<>(localFiles);
- }
-
- @Override
- public void sync(IFileHandle fileHandle, boolean metadata) throws HyracksDataException {
- HyracksDataException savedEx = null;
- if (metadata) {
- // only finish writing if metadata == true to prevent write limiter from finishing the stream and
- // completing the upload.
- CloudResettableInputStream stream = ((CloudFileHandle) fileHandle).getInputStream();
- try {
- stream.finish();
- } catch (HyracksDataException e) {
- savedEx = e;
- }
-
- if (savedEx != null) {
- try {
- stream.abort();
- } catch (HyracksDataException e) {
- savedEx.addSuppressed(e);
- }
- throw savedEx;
- }
- }
- // Sync only after finalizing the upload to cloud storage
- super.sync(fileHandle, metadata);
- }
-
- @Override
- public long getSize(IFileHandle fileHandle) throws HyracksDataException {
- if (!fileHandle.getFileReference().getFile().exists()) {
- return cloudClient.getObjectSize(bucket, fileHandle.getFileReference().getRelativePath());
- }
- return super.getSize(fileHandle);
- }
-
- @Override
- public long getSize(FileReference fileReference) throws HyracksDataException {
- if (!fileReference.getFile().exists()) {
- return cloudClient.getObjectSize(bucket, fileReference.getRelativePath());
- }
- return super.getSize(fileReference);
- }
-
- @Override
- public void overwrite(FileReference fileRef, byte[] bytes) throws ClosedByInterruptException, HyracksDataException {
- super.overwrite(fileRef, bytes);
- // Write here will overwrite the older object if exists
- cloudClient.write(bucket, fileRef.getRelativePath(), bytes);
- }
-
- @Override
- public int doSyncRead(IFileHandle fHandle, long offset, ByteBuffer data) throws HyracksDataException {
- return super.doSyncRead(fHandle, offset, data);
- }
-
- @Override
- public byte[] readAllBytes(FileReference fileRef) throws HyracksDataException {
- if (!fileRef.getFile().exists()) {
- IFileHandle open = open(fileRef, FileReadWriteMode.READ_WRITE, FileSyncMode.METADATA_SYNC_DATA_SYNC);
- fileRef = open.getFileReference();
- }
- return super.readAllBytes(fileRef);
- }
-
- @Override
- public void deleteDirectory(FileReference fileRef) throws HyracksDataException {
- // TODO(htowaileb): Should we delete the cloud first?
- super.deleteDirectory(fileRef);
- if (!STORAGE_ROOT_DIR_NAME.equals(IoUtil.getFileNameFromPath(fileRef.getAbsolutePath()))) {
- // Never delete the storage dir in cloud storage
- cloudClient.deleteObject(bucket, fileRef.getRelativePath());
- }
- }
-
- @Override
- public boolean exists(FileReference fileRef) throws HyracksDataException {
- // Check if the file exists locally first as newly created files (i.e., they are empty) are not stored in cloud storage
- return fileRef.getFile().exists() || cloudClient.exists(bucket, fileRef.getRelativePath());
- }
-
- @Override
- public void create(FileReference fileRef) throws HyracksDataException {
- // We need to delete the local file on create as the cloud storage didn't complete the upload
- // In other words, both cloud files and the local files are not in sync
- super.delete(fileRef);
- super.create(fileRef);
- }
-
- @Override
- public void copyDirectory(FileReference srcFileRef, FileReference destFileRef) throws HyracksDataException {
- cloudClient.copy(bucket, srcFileRef.getRelativePath(), destFileRef);
- super.copyDirectory(srcFileRef, destFileRef);
- }
-
- protected long writeLocally(IFileHandle fHandle, long offset, ByteBuffer buffer) throws HyracksDataException {
- return super.doSyncWrite(fHandle, offset, buffer);
- }
-
- protected void syncLocally(IFileHandle fileHandle) throws HyracksDataException {
- super.sync(fileHandle, true);
- }
-
- @Override
- public void syncFiles(Set<Integer> activePartitions) throws HyracksDataException {
- Map<String, String> cloudToLocalStoragePaths = new HashMap<>();
- for (Integer partition : activePartitions) {
- String partitionToFind = PARTITION_DIR_PREFIX + partition + "/";
- IODeviceHandle deviceHandle = getDeviceComputer().resolve(partitionToFind, getIODevices());
-
- String cloudStoragePath = STORAGE_ROOT_DIR_NAME + "/" + partitionToFind;
- String localStoragePath = deviceHandle.getMount().getAbsolutePath() + "/" + cloudStoragePath;
- cloudToLocalStoragePaths.put(cloudStoragePath, localStoragePath);
- }
- LOGGER.info("Resolved paths to io devices: {}", cloudToLocalStoragePaths);
- cloudClient.syncFiles(bucket, cloudToLocalStoragePaths);
- }
-
- // TODO(htowaileb): the localIoManager is closed by the node controller service as well, check if we need this
- @Override
- public void close() throws IOException {
- cloudClient.close();
- super.close();
- localIoManager.close();
- }
-
- private void downloadFile(FileHandle fileHandle, FileReadWriteMode rwMode, FileSyncMode syncMode,
- ByteBuffer writeBuffer) throws HyracksDataException {
- FileReference fileRef = fileHandle.getFileReference();
- File file = fileRef.getFile();
-
- try (InputStream inputStream = cloudClient.getObjectStream(bucket, fileRef.getRelativePath())) {
- FileUtils.createParentDirectories(file);
- if (!file.createNewFile()) {
- throw new IllegalStateException("Couldn't create local file");
- }
-
- fileHandle.open(rwMode, syncMode);
- writeToFile(fileHandle, inputStream, writeBuffer);
- } catch (IOException e) {
- throw HyracksDataException.create(e);
- }
- }
-
- private void writeToFile(IFileHandle fileHandle, InputStream inStream, ByteBuffer writeBuffer)
- throws HyracksDataException {
- writeBuffer.clear();
- try {
- int position = 0;
- long offset = 0;
- int read;
- while ((read = inStream.read(writeBuffer.array(), position, writeBuffer.remaining())) >= 0) {
- position += read;
- writeBuffer.position(position);
- if (writeBuffer.remaining() == 0) {
- offset += writeBufferToFile(fileHandle, writeBuffer, offset);
- position = 0;
- }
- }
-
- if (writeBuffer.position() > 0) {
- writeBufferToFile(fileHandle, writeBuffer, offset);
- syncLocally(fileHandle);
- }
- } catch (IOException e) {
- throw HyracksDataException.create(e);
- }
- }
-
- private long writeBufferToFile(IFileHandle fileHandle, ByteBuffer writeBuffer, long offset)
- throws HyracksDataException {
- writeBuffer.flip();
- long written = writeLocally(fileHandle, offset, writeBuffer);
- writeBuffer.clear();
- return written;
- }
-}
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudManagerProvider.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudManagerProvider.java
new file mode 100644
index 0000000..6ba31db
--- /dev/null
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudManagerProvider.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.cloud;
+
+import org.apache.asterix.common.cloud.CloudCachePolicy;
+import org.apache.asterix.common.cloud.IPartitionBootstrapper;
+import org.apache.asterix.common.config.CloudProperties;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.io.IIOManager;
+import org.apache.hyracks.control.nc.io.IOManager;
+
+public class CloudManagerProvider {
+ private CloudManagerProvider() {
+ }
+
+ public static IIOManager createIOManager(CloudProperties cloudProperties, IIOManager ioManager)
+ throws HyracksDataException {
+ IOManager localIoManager = (IOManager) ioManager;
+ if (cloudProperties.getCloudCachePolicy() == CloudCachePolicy.LAZY) {
+ return new LazyCloudIOManager(localIoManager, cloudProperties);
+ }
+
+ return new EagerCloudIOManager(localIoManager, cloudProperties);
+ }
+
+ public static IPartitionBootstrapper getCloudPartitionBootstrapper(IIOManager ioManager) {
+ if (!(ioManager instanceof AbstractCloudIOManager)) {
+ throw new IllegalArgumentException("Not a cloud IOManager");
+ }
+ return (IPartitionBootstrapper) ioManager;
+ }
+}
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/EagerCloudIOManager.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/EagerCloudIOManager.java
new file mode 100644
index 0000000..0c61957
--- /dev/null
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/EagerCloudIOManager.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.cloud;
+
+import static org.apache.asterix.common.utils.StorageConstants.STORAGE_ROOT_DIR_NAME;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.asterix.common.config.CloudProperties;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.io.FileReference;
+import org.apache.hyracks.control.nc.io.IOManager;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+/**
+ * CloudIOManager with eager caching
+ * {@link org.apache.hyracks.api.io.IIOManager} methods are implemented either in
+ * - {@link IOManager}
+ * OR
+ * - {@link AbstractCloudIOManager}
+ */
+class EagerCloudIOManager extends AbstractCloudIOManager {
+ private static final Logger LOGGER = LogManager.getLogger();
+
+ public EagerCloudIOManager(IOManager ioManager, CloudProperties cloudProperties) throws HyracksDataException {
+ super(ioManager, cloudProperties);
+ }
+
+ /*
+ * ******************************************************************
+ * AbstractCloudIOManager functions
+ * ******************************************************************
+ */
+
+ @Override
+ protected void downloadPartitions() throws HyracksDataException {
+ // TODO currently it throws an error in local test
+ Map<String, String> cloudToLocalStoragePaths = new HashMap<>();
+ for (FileReference partitionPath : partitionPaths) {
+ String cloudStoragePath = STORAGE_ROOT_DIR_NAME + "/" + partitionPath.getName();
+ cloudToLocalStoragePaths.put(cloudStoragePath, partitionPath.getAbsolutePath());
+ }
+ LOGGER.info("Resolved paths to io devices: {}", cloudToLocalStoragePaths);
+ cloudClient.syncFiles(bucket, cloudToLocalStoragePaths);
+ }
+
+ @Override
+ protected void onOpen(CloudFileHandle fileHandle, FileReadWriteMode rwMode, FileSyncMode syncMode) {
+ // NoOp
+ }
+}
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/LazyCloudIOManager.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/LazyCloudIOManager.java
new file mode 100644
index 0000000..2232dcd
--- /dev/null
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/LazyCloudIOManager.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.cloud;
+
+import static org.apache.asterix.common.utils.StorageConstants.PARTITION_DIR_PREFIX;
+
+import java.io.File;
+import java.io.FilenameFilter;
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.asterix.cloud.util.CloudFileUtil;
+import org.apache.asterix.common.config.CloudProperties;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.io.FileReference;
+import org.apache.hyracks.control.nc.io.IOManager;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+/**
+ * CloudIOManager with lazy caching
+ * - Overrides some of {@link IOManager} functions
+ */
+class LazyCloudIOManager extends AbstractCloudIOManager {
+ private static final Logger LOGGER = LogManager.getLogger();
+
+ public LazyCloudIOManager(IOManager ioManager, CloudProperties cloudProperties) throws HyracksDataException {
+ super(ioManager, cloudProperties);
+ }
+
+ /*
+ * ******************************************************************
+ * AbstractCloudIOManager functions
+ * ******************************************************************
+ */
+
+ @Override
+ protected void downloadPartitions() {
+ // NoOp
+ }
+
+ @Override
+ protected void onOpen(CloudFileHandle fileHandle, FileReadWriteMode rwMode, FileSyncMode syncMode)
+ throws HyracksDataException {
+ FileReference fileRef = fileHandle.getFileReference();
+ if (!localIoManager.exists(fileRef) && cloudClient.exists(bucket, fileRef.getRelativePath())) {
+ // File doesn't exist locally, download it.
+ ByteBuffer writeBuffer = writeBufferProvider.getBuffer();
+ try {
+ // TODO download for all partitions at once
+ LOGGER.info("Downloading {} from S3..", fileRef.getRelativePath());
+ CloudFileUtil.downloadFile(localIoManager, cloudClient, bucket, fileHandle, rwMode, syncMode,
+ writeBuffer);
+ localIoManager.close(fileHandle);
+ LOGGER.info("Finished downloading {} from S3..", fileRef.getRelativePath());
+ } finally {
+ writeBufferProvider.recycle(writeBuffer);
+ }
+ }
+ }
+
+ /*
+ * ******************************************************************
+ * IIOManager functions
+ * ******************************************************************
+ */
+ @Override
+ public Set<FileReference> list(FileReference dir, FilenameFilter filter) throws HyracksDataException {
+ Set<String> cloudFiles = cloudClient.listObjects(bucket, dir.getRelativePath(), filter);
+ if (cloudFiles.isEmpty()) {
+ return Collections.emptySet();
+ }
+
+ // First get the set of local files
+ Set<FileReference> localFiles = localIoManager.list(dir, filter);
+
+ // Reconcile local files and cloud files
+ for (FileReference file : localFiles) {
+ String path = file.getRelativePath();
+ if (!cloudFiles.contains(path)) {
+ throw new IllegalStateException("Local file is not clean");
+ } else {
+ // No need to re-add it in the following loop
+ cloudFiles.remove(path);
+ }
+ }
+
+ // Add the remaining files that are not stored locally in their designated partitions (if any)
+ for (String cloudFile : cloudFiles) {
+ FileReference localFile = resolve(cloudFile);
+ if (isInNodePartition(cloudFile) && dir.getDeviceHandle().equals(localFile.getDeviceHandle())) {
+ localFiles.add(localFile);
+ }
+ }
+ return new HashSet<>(localFiles);
+ }
+
+ @Override
+ public long getSize(FileReference fileReference) throws HyracksDataException {
+ if (localIoManager.exists(fileReference)) {
+ return localIoManager.getSize(fileReference);
+ }
+ return cloudClient.getObjectSize(bucket, fileReference.getRelativePath());
+ }
+
+ @Override
+ public byte[] readAllBytes(FileReference fileRef) throws HyracksDataException {
+ if (!localIoManager.exists(fileRef) && isInNodePartition(fileRef.getRelativePath())) {
+ byte[] bytes = cloudClient.readAllBytes(bucket, fileRef.getRelativePath());
+ if (bytes != null && !partitions.isEmpty()) {
+ localIoManager.overwrite(fileRef, bytes);
+ }
+ return bytes;
+ }
+ return localIoManager.readAllBytes(fileRef);
+ }
+
+ private boolean isInNodePartition(String path) {
+ int start = path.indexOf(PARTITION_DIR_PREFIX) + PARTITION_DIR_PREFIX.length();
+ int length = path.indexOf(File.separatorChar, start);
+ return partitions.contains(Integer.parseInt(path.substring(start, length)));
+ }
+}
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/LocalPartitionBootstrapper.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/LocalPartitionBootstrapper.java
new file mode 100644
index 0000000..db7b6d6
--- /dev/null
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/LocalPartitionBootstrapper.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.asterix.cloud;
+
+import java.util.List;
+import java.util.Set;
+
+import org.apache.asterix.common.cloud.IPartitionBootstrapper;
+import org.apache.asterix.common.transactions.IRecoveryManager;
+import org.apache.asterix.common.utils.StoragePathUtil;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.io.FileReference;
+import org.apache.hyracks.api.io.IIOManager;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+public class LocalPartitionBootstrapper implements IPartitionBootstrapper {
+ private static final Logger LOGGER = LogManager.getLogger();
+ private final IIOManager ioManager;
+
+ public LocalPartitionBootstrapper(IIOManager ioManager) {
+ this.ioManager = ioManager;
+ }
+
+ @Override
+ public IRecoveryManager.SystemState getSystemStateOnMissingCheckpoint() {
+ //The checkpoint file doesn't exist => Failure happened during NC initialization.
+ //Retry to initialize the NC by setting the state to PERMANENT_DATA_LOSS
+ LOGGER.info("The checkpoint file doesn't exist: systemState = PERMANENT_DATA_LOSS");
+ return IRecoveryManager.SystemState.PERMANENT_DATA_LOSS;
+ }
+
+ @Override
+ public void bootstrap(Set<Integer> activePartitions, List<FileReference> currentOnDiskPartitions,
+ boolean metadataNode, int metadataPartition, boolean cleanup) throws HyracksDataException {
+ for (FileReference onDiskPartition : currentOnDiskPartitions) {
+ int partitionNum = StoragePathUtil.getPartitionNumFromRelativePath(onDiskPartition.getAbsolutePath());
+ if (!activePartitions.contains(partitionNum)) {
+ LOGGER.warn("deleting partition {} since it is not on partitions to keep {}", partitionNum,
+ activePartitions);
+ ioManager.delete(onDiskPartition);
+ }
+ }
+ }
+}
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/util/CloudFileUtil.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/util/CloudFileUtil.java
new file mode 100644
index 0000000..79ddba1
--- /dev/null
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/util/CloudFileUtil.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.cloud.util;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.util.Iterator;
+import java.util.Set;
+
+import org.apache.asterix.cloud.clients.ICloudClient;
+import org.apache.commons.io.FileUtils;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.io.FileReference;
+import org.apache.hyracks.api.io.IFileHandle;
+import org.apache.hyracks.api.io.IIOManager;
+import org.apache.hyracks.control.nc.io.FileHandle;
+import org.apache.hyracks.control.nc.io.IOManager;
+
+public class CloudFileUtil {
+ private CloudFileUtil() {
+ }
+
+ public static void downloadFile(IOManager ioManager, ICloudClient cloudClient, String bucket, FileHandle fileHandle,
+ IIOManager.FileReadWriteMode rwMode, IIOManager.FileSyncMode syncMode, ByteBuffer writeBuffer)
+ throws HyracksDataException {
+ FileReference fileRef = fileHandle.getFileReference();
+ File file = fileRef.getFile();
+
+ try (InputStream inputStream = cloudClient.getObjectStream(bucket, fileRef.getRelativePath())) {
+ FileUtils.createParentDirectories(file);
+ if (!file.createNewFile()) {
+ throw new IllegalStateException("Couldn't create local file");
+ }
+
+ fileHandle.open(rwMode, syncMode);
+ writeToFile(ioManager, fileHandle, inputStream, writeBuffer);
+ } catch (IOException e) {
+ throw HyracksDataException.create(e);
+ }
+ }
+
+ public static void cleanDirectoryFiles(IOManager ioManager, Set<String> cloudFiles, FileReference partitionPath)
+ throws HyracksDataException {
+ // First get the set of local files
+ Set<FileReference> localFiles = ioManager.list(partitionPath);
+ Iterator<FileReference> localFilesIter = localFiles.iterator();
+
+ // Reconcile local files and cloud files
+ while (localFilesIter.hasNext()) {
+ FileReference file = localFilesIter.next();
+ if (file.getFile().isDirectory()) {
+ continue;
+ }
+
+ String path = file.getRelativePath();
+ if (!cloudFiles.contains(path)) {
+ // Delete local files that do not exist in cloud storage (the ground truth for valid files)
+ localFilesIter.remove();
+ ioManager.delete(file);
+ } else {
+ // No need to re-add it in the following loop
+ cloudFiles.remove(path);
+ }
+ }
+
+ // Add the remaining files that are not stored locally (if any)
+ for (String cloudFile : cloudFiles) {
+ if (!cloudFile.contains(partitionPath.getRelativePath())) {
+ continue;
+ }
+ localFiles.add(new FileReference(partitionPath.getDeviceHandle(),
+ cloudFile.substring(cloudFile.indexOf(partitionPath.getRelativePath()))));
+ }
+ }
+
+ private static void writeToFile(IOManager ioManager, IFileHandle fileHandle, InputStream inStream,
+ ByteBuffer writeBuffer) throws HyracksDataException {
+ writeBuffer.clear();
+ try {
+ int position = 0;
+ long offset = 0;
+ int read;
+ while ((read = inStream.read(writeBuffer.array(), position, writeBuffer.remaining())) >= 0) {
+ position += read;
+ writeBuffer.position(position);
+ if (writeBuffer.remaining() == 0) {
+ offset += writeBufferToFile(ioManager, fileHandle, writeBuffer, offset);
+ position = 0;
+ }
+ }
+
+ if (writeBuffer.position() > 0) {
+ writeBufferToFile(ioManager, fileHandle, writeBuffer, offset);
+ ioManager.sync(fileHandle, true);
+ }
+ } catch (IOException e) {
+ throw HyracksDataException.create(e);
+ }
+ }
+
+ private static long writeBufferToFile(IOManager ioManager, IFileHandle fileHandle, ByteBuffer writeBuffer,
+ long offset) throws HyracksDataException {
+ writeBuffer.flip();
+ long written = ioManager.doSyncWrite(fileHandle, offset, writeBuffer);
+ writeBuffer.clear();
+ return written;
+ }
+}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/INcApplicationContext.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/INcApplicationContext.java
index 23487a9..8e6becf 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/INcApplicationContext.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/INcApplicationContext.java
@@ -22,6 +22,7 @@
import java.rmi.RemoteException;
import java.util.concurrent.Executor;
+import org.apache.asterix.common.cloud.IPartitionBootstrapper;
import org.apache.asterix.common.context.IStorageComponentProvider;
import org.apache.asterix.common.library.ILibraryManager;
import org.apache.asterix.common.replication.IReplicationChannel;
@@ -51,6 +52,8 @@
IIOManager getPersistenceIoManager();
+ IPartitionBootstrapper getPartitionBootstrapper();
+
Executor getThreadExecutor();
ITransactionSubsystem getTransactionSubsystem();
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cloud/CloudCachePolicy.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cloud/CloudCachePolicy.java
new file mode 100644
index 0000000..c6858c5
--- /dev/null
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cloud/CloudCachePolicy.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.cloud;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Map;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+public enum CloudCachePolicy {
+ EAGER("eager"),
+ LAZY("lazy");
+ private static final Map<String, CloudCachePolicy> partitioningSchemes =
+ Collections.unmodifiableMap(Arrays.stream(CloudCachePolicy.values())
+ .collect(Collectors.toMap(CloudCachePolicy::getPolicyName, Function.identity())));
+
+ private final String policyName;
+
+ CloudCachePolicy(String policyName) {
+ this.policyName = policyName;
+ }
+
+ public String getPolicyName() {
+ return policyName;
+ }
+
+ public static CloudCachePolicy fromName(String policyName) {
+ CloudCachePolicy partitioningScheme = partitioningSchemes.get(policyName.toLowerCase());
+ if (partitioningScheme == null) {
+ throw new IllegalArgumentException("unknown cloud cache policy: " + policyName);
+ }
+ return partitioningScheme;
+ }
+}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cloud/IPartitionBootstrapper.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cloud/IPartitionBootstrapper.java
new file mode 100644
index 0000000..fd9a1b3
--- /dev/null
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cloud/IPartitionBootstrapper.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.cloud;
+
+import java.util.List;
+import java.util.Set;
+
+import org.apache.asterix.common.transactions.IRecoveryManager;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.io.FileReference;
+
+/**
+ * Bootstraps a node's partitions directories
+ */
+public interface IPartitionBootstrapper {
+
+ /**
+ * @return the systems state in case the checkpoint upon calling {@link IRecoveryManager#getSystemState()}
+ * is missing the checkpoint file
+ */
+ IRecoveryManager.SystemState getSystemStateOnMissingCheckpoint();
+
+ /**
+ * Bootstraps the node's partitions directory by doing the following:
+ * - Deleting the directories in <code>currentOnDiskPartitions</code> that are not in <code>activePartitions</code>
+ * - In cloud deployment only, it does the following:
+ * This will do the following:
+ * - Cleanup all local files (i.e., delete all local files that do not exist in the cloud storage)
+ * - Depends on the caching policy:
+ * - - Eager: Will download the contents of all active partitions
+ * - - Lazy: No file will be downloaded at start, but will be when opened
+ *
+ * @param activePartitions the current active storage partitions of the NC
+ * @param currentOnDiskPartitions paths to the current local partitions
+ * @param metadataNode whether the node is a metadata node as well
+ * @param metadataPartition metadata partition number
+ * @param cleanup performs cleanup by deleting all unkept partitions
+ */
+ void bootstrap(Set<Integer> activePartitions, List<FileReference> currentOnDiskPartitions, boolean metadataNode,
+ int metadataPartition, boolean cleanup) throws HyracksDataException;
+}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IClusterStateManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IClusterStateManager.java
index 5f714f4..343a23d 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IClusterStateManager.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IClusterStateManager.java
@@ -160,8 +160,7 @@
/**
* Returns the IO devices configured for a Node Controller
*
- * @param nodeId
- * unique identifier of the Node Controller
+ * @param nodeId unique identifier of the Node Controller
* @return a list of IO devices.
*/
String[] getIODevices(String nodeId);
@@ -172,15 +171,13 @@
AlgebricksAbsolutePartitionConstraint getClusterLocations();
/**
- * @param excludePendingRemoval
- * true, if the desired set shouldn't have pending removal nodes
+ * @param excludePendingRemoval true, if the desired set shouldn't have pending removal nodes
* @return the set of participant nodes
*/
Set<String> getParticipantNodes(boolean excludePendingRemoval);
/**
- * @param node
- * the node id
+ * @param node the node id
* @return the number of partitions on that node
*/
int getNodePartitionsCount(String node);
@@ -262,6 +259,7 @@
/**
* Indicate whether one or more datasets must be rebalanced before the cluster becomes ACTIVE
+ *
* @param rebalanceRequired
*/
void setRebalanceRequired(boolean rebalanceRequired) throws HyracksDataException;
@@ -283,13 +281,20 @@
/**
* Gets the count of storage partitions
+ *
* @return the count of storage partitions
*/
int getStoragePartitionsCount();
/**
+ * @return the current compute-storage partitions map
+ */
+ StorageComputePartitionsMap getStorageComputeMap();
+
+ /**
* Sets the compute-storage partitions map
- * @param map
+ *
+ * @param map the new map
*/
void setComputeStoragePartitionsMap(StorageComputePartitionsMap map);
}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/StorageComputePartitionsMap.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/StorageComputePartitionsMap.java
index 6561d05..a16204d 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/StorageComputePartitionsMap.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/StorageComputePartitionsMap.java
@@ -21,6 +21,7 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -28,7 +29,7 @@
public class StorageComputePartitionsMap {
- private final Map<Integer, ComputePartition> stoToComputeLocation = new HashMap<>();
+ private final Map<Integer, ComputePartition> storageComputeMap = new HashMap<>();
private final int storagePartitionsCount;
public StorageComputePartitionsMap(int storagePartitionsCount) {
@@ -36,7 +37,7 @@
}
public void addStoragePartition(int stoPart, ComputePartition compute) {
- stoToComputeLocation.put(stoPart, compute);
+ storageComputeMap.put(stoPart, compute);
}
public int[][] getComputeToStorageMap(boolean metadataDataset) {
@@ -63,8 +64,34 @@
return computerToStoArray;
}
+ public int getStoragePartitionsCount() {
+ return storagePartitionsCount;
+ }
+
public ComputePartition getComputePartition(int storagePartition) {
- return stoToComputeLocation.get(storagePartition);
+ return storageComputeMap.get(storagePartition);
+ }
+
+ public Set<String> getComputeNodes() {
+ return storageComputeMap.values().stream().map(ComputePartition::getNodeId).collect(Collectors.toSet());
+ }
+
+ /**
+ * For a set of compute partitions, return a set of their corresponding storage partitions
+ *
+ * @param computePartitions the current active compute partitions
+ * @return computePartitions's corresponding storage partitions
+ */
+ public Set<Integer> getStoragePartitions(Set<Integer> computePartitions) {
+ Set<Integer> storagePartitions = new HashSet<>();
+ for (Map.Entry<Integer, ComputePartition> entry : storageComputeMap.entrySet()) {
+ ComputePartition computePartition = entry.getValue();
+ if (computePartitions.contains(computePartition.getId())) {
+ storagePartitions.add(entry.getKey());
+ }
+ }
+
+ return storagePartitions;
}
public static StorageComputePartitionsMap computePartitionsMap(IClusterStateManager clusterStateManager) {
@@ -97,8 +124,4 @@
}
return newMap;
}
-
- public Set<String> getComputeNodes() {
- return stoToComputeLocation.values().stream().map(ComputePartition::getNodeId).collect(Collectors.toSet());
- }
}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/CloudProperties.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/CloudProperties.java
index a480167..90c3acb 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/CloudProperties.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/CloudProperties.java
@@ -21,6 +21,7 @@
import static org.apache.hyracks.control.common.config.OptionTypes.BOOLEAN;
import static org.apache.hyracks.control.common.config.OptionTypes.STRING;
+import org.apache.asterix.common.cloud.CloudCachePolicy;
import org.apache.hyracks.api.config.IOption;
import org.apache.hyracks.api.config.IOptionType;
import org.apache.hyracks.api.config.Section;
@@ -37,7 +38,8 @@
CLOUD_STORAGE_PREFIX(STRING, ""),
CLOUD_STORAGE_REGION(STRING, ""),
CLOUD_STORAGE_ENDPOINT(STRING, ""),
- CLOUD_STORAGE_ANONYMOUS_AUTH(BOOLEAN, false);
+ CLOUD_STORAGE_ANONYMOUS_AUTH(BOOLEAN, false),
+ CLOUD_STORAGE_CACHE_POLICY(STRING, "lazy");
private final IOptionType interpreter;
private final Object defaultValue;
@@ -56,6 +58,7 @@
case CLOUD_STORAGE_REGION:
case CLOUD_STORAGE_ENDPOINT:
case CLOUD_STORAGE_ANONYMOUS_AUTH:
+ case CLOUD_STORAGE_CACHE_POLICY:
return Section.COMMON;
default:
return Section.NC;
@@ -77,6 +80,10 @@
return "The cloud storage endpoint";
case CLOUD_STORAGE_ANONYMOUS_AUTH:
return "Indicates whether or not anonymous auth should be used for the cloud storage";
+ case CLOUD_STORAGE_CACHE_POLICY:
+ return "The caching policy (either eager or lazy). 'Eager' caching will download all partitions"
+ + " upon booting, whereas lazy caching will download a file upon request to open it."
+ + " (default: 'lazy')";
default:
throw new IllegalStateException("NYI: " + this);
}
@@ -117,4 +124,8 @@
public boolean isStorageAnonymousAuth() {
return accessor.getBoolean(Option.CLOUD_STORAGE_ANONYMOUS_AUTH);
}
+
+ public CloudCachePolicy getCloudCachePolicy() {
+ return CloudCachePolicy.fromName(accessor.getString(Option.CLOUD_STORAGE_CACHE_POLICY));
+ }
}
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java
index 0128478..4b36b20 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java
@@ -507,6 +507,7 @@
return appCtx.getStorageProperties().getStoragePartitionsCount();
}
+ @Override
public synchronized StorageComputePartitionsMap getStorageComputeMap() {
return storageComputePartitionsMap;
}
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/io/IIOManager.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/io/IIOManager.java
index 10f1637..95dcb2f 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/io/IIOManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/io/IIOManager.java
@@ -126,6 +126,7 @@
/**
* Lists the files matching {@code filter} recursively starting from {@code dir}
+ *
* @param dir
* @param filter
* @return the matching files
@@ -149,6 +150,4 @@
boolean makeDirectories(FileReference resourceDir);
void cleanDirectory(FileReference resourceDir) throws HyracksDataException;
-
- void syncFiles(Set<Integer> activePartitions) throws HyracksDataException;
}
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/io/IODeviceHandle.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/io/IODeviceHandle.java
index c826c2f..c160f67 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/io/IODeviceHandle.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/io/IODeviceHandle.java
@@ -41,10 +41,8 @@
private final String workspace;
/**
- * @param mount
- * The device root
- * @param workspace
- * The relative workspace inside the device
+ * @param mount The device root
+ * @param workspace The relative workspace inside the device
*/
public IODeviceHandle(File mount, String workspace) {
this.mount = mount;
@@ -63,8 +61,7 @@
/**
* Create a file reference
*
- * @param relPath
- * the relative path
+ * @param relPath the relative path
* @return
*/
public FileReference createFileRef(String relPath) {
@@ -74,8 +71,7 @@
/**
* Get handles for IO devices
*
- * @param ioDevices
- * comma separated list of devices
+ * @param ioDevices comma separated list of devices
* @return
*/
public static List<IODeviceHandle> getDevices(String[] ioDevices) {
@@ -97,11 +93,13 @@
throw new HyracksDataException(
"Passed path: " + absolutePath + " is not inside the device " + mount.getAbsolutePath());
}
- return absolutePath.substring(mount.getAbsolutePath().length());
+ // +1 to remove the leading '/'
+ return absolutePath.substring(mount.getAbsolutePath().length() + 1);
}
/**
* determinea if the device contains a file with the passed relative path
+ *
* @param relPath
* @return true if it contains, false, otherwise
*/
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/IoUtil.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/IoUtil.java
index 05e6544..fcf8c7a 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/IoUtil.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/IoUtil.java
@@ -206,8 +206,7 @@
List<FileReference> files = new ArrayList<>();
String[] matchingFiles = root.getFile().list(filter);
if (matchingFiles != null) {
- files.addAll(Arrays.stream(matchingFiles).map(pDir -> new FileReference(root.getDeviceHandle(), pDir))
- .collect(Collectors.toList()));
+ files.addAll(Arrays.stream(matchingFiles).map(root::getChild).collect(Collectors.toList()));
}
return files;
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/IOManager.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/IOManager.java
index 1733ad4..0e6578f 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/IOManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/IOManager.java
@@ -115,14 +115,10 @@
return queueSize;
}
- public int getIoParallelism() {
+ public int getIOParallelism() {
return ioParallelism;
}
- public List<IODeviceHandle> getIoDevices() {
- return ioDevices;
- }
-
public IFileDeviceResolver getDeviceComputer() {
return deviceComputer;
}
@@ -529,7 +525,7 @@
}
@Override
- public void overwrite(FileReference fileRef, byte[] bytes) throws ClosedByInterruptException, HyracksDataException {
+ public void overwrite(FileReference fileRef, byte[] bytes) throws HyracksDataException {
File file = fileRef.getFile();
try {
if (file.exists()) {
@@ -596,9 +592,4 @@
throw HyracksDataException.create(e);
}
}
-
- @Override
- public void syncFiles(Set<Integer> activePartitions) throws HyracksDataException {
- // do nothing
- }
}