[NO ISSUE][STO] Adapt Storage Structure To Rebalance
- user model changes: no
- storage format changes: no
- interface changes: yes
-- Added IResource#setPath to use for the resource
storage migration.
Details:
- Unify storage structure to support dataset rebalance:
Old format:
./storage/partition_#/dataverse/datasetName_idx_indexName
New format:
./storage/partition_#/dataverse/datasetName/rebalanaceNum/indexName
- Adapt recovery and replication to new storage structure.
- Add old structure -> new structure NC migration task.
- Add CompatibilityUtil to ensure NC can be upgraded during
NC startup.
- Centralize the logic for parsing file path to its components in
ResourceReference/DatasetResourceReference.
- Add storage structure migration test case.
- Add test case for recovery after rebalance.
Change-Id: I0f968b9f493bf5aa2d49f503afe21f0d438bb7f0
Reviewed-on: https://asterix-gerrit.ics.uci.edu/2181
Sonar-Qube: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Contrib: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: abdullah alamoudi <bamousaa@gmail.com>
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java
index 19966fe..e29e3fe 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java
@@ -443,7 +443,7 @@
return minFirstLSN;
}
- private long getRemoteMinFirstLSN() {
+ private long getRemoteMinFirstLSN() throws HyracksDataException {
IReplicaResourcesManager remoteResourcesManager =
txnSubsystem.getAsterixAppRuntimeContextProvider().getAppContext().getReplicaResourcesManager();
return remoteResourcesManager.getPartitionsMinLSN(localResourceRepository.getInactivePartitions());
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/TransactionSubsystem.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/TransactionSubsystem.java
index f922832..47f9315 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/TransactionSubsystem.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/TransactionSubsystem.java
@@ -81,11 +81,6 @@
checkpointManager = CheckpointManagerFactory.create(this, checkpointProperties, replicationEnabled);
final Checkpoint latestCheckpoint = checkpointManager.getLatest();
if (latestCheckpoint != null) {
- if (latestCheckpoint.getStorageVersion() != StorageConstants.VERSION) {
- throw new IllegalStateException(
- String.format("Storage version mismatch. Current version (%s). On disk version: (%s)",
- StorageConstants.VERSION, latestCheckpoint.getStorageVersion()));
- }
transactionManager.ensureMaxTxnId(latestCheckpoint.getMaxTxnId());
}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/MigrateStorageResourcesTask.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/MigrateStorageResourcesTask.java
new file mode 100644
index 0000000..503b8de
--- /dev/null
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/MigrateStorageResourcesTask.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.app.nc.task;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.List;
+import java.util.function.Predicate;
+import java.util.logging.Logger;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import org.apache.asterix.common.api.INCLifecycleTask;
+import org.apache.asterix.common.api.INcApplicationContext;
+import org.apache.asterix.common.dataflow.DatasetLocalResource;
+import org.apache.asterix.common.storage.DatasetResourceReference;
+import org.apache.asterix.common.transactions.Checkpoint;
+import org.apache.asterix.common.transactions.ICheckpointManager;
+import org.apache.asterix.common.utils.StorageConstants;
+import org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository;
+import org.apache.commons.io.FileUtils;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.io.FileReference;
+import org.apache.hyracks.api.io.IIOManager;
+import org.apache.hyracks.api.io.IODeviceHandle;
+import org.apache.hyracks.api.service.IControllerService;
+import org.apache.hyracks.storage.common.ILocalResourceRepository;
+import org.apache.hyracks.storage.common.LocalResource;
+
+/**
+ * Migrates a legacy storage structure to the current one
+ */
+public class MigrateStorageResourcesTask implements INCLifecycleTask {
+
+ private static final Logger LOGGER = Logger.getLogger(MigrateStorageResourcesTask.class.getName());
+ private static final long serialVersionUID = 1L;
+ public static final int LEGACY_RESOURCES_TREE_DEPTH_FROM_STORAGE_ROOT = 5;
+
+ @Override
+ public void perform(IControllerService cs) throws HyracksDataException {
+ final INcApplicationContext appCtx = (INcApplicationContext) cs.getApplicationContext();
+ final ICheckpointManager checkpointMgr = appCtx.getTransactionSubsystem().getCheckpointManager();
+ final Checkpoint latestCheckpoint = checkpointMgr.getLatest();
+ if (latestCheckpoint == null) {
+ // nothing to migrate
+ return;
+ }
+ final IIOManager ioManager = appCtx.getIoManager();
+ final List<IODeviceHandle> ioDevices = ioManager.getIODevices();
+ for (IODeviceHandle ioDeviceHandle : ioDevices) {
+ final Path root = Paths.get(ioDeviceHandle.getMount().getAbsolutePath());
+ if (!root.toFile().exists()) {
+ continue;
+ }
+ try (Stream<Path> stream = Files.find(root, LEGACY_RESOURCES_TREE_DEPTH_FROM_STORAGE_ROOT,
+ (path, attr) -> path.getFileName().toString().equals(StorageConstants.METADATA_FILE_NAME))) {
+ final List<Path> resourceToMigrate = stream.map(Path::getParent).collect(Collectors.toList());
+ for (Path src : resourceToMigrate) {
+ final Path dest =
+ migrateResourceMetadata(root.relativize(src), appCtx, latestCheckpoint.getStorageVersion());
+ copyResourceFiles(root.resolve(src), root.resolve(dest),
+ PersistentLocalResourceRepository.INDEX_COMPONENTS);
+ FileUtils.deleteDirectory(root.resolve(src).toFile());
+ }
+ } catch (IOException e) {
+ throw HyracksDataException.create(e);
+ }
+ }
+ }
+
+ /**
+ * Migrates the resource metadata file at {@code resourcePath} to the new storage structure
+ * and updates the migrated resource's metadata to reflect the new path.
+ *
+ * @param resourcePath
+ * @param appCtx
+ * @param resourceVersion
+ * @return The migrated resource relative path
+ * @throws HyracksDataException
+ */
+ private Path migrateResourceMetadata(Path resourcePath, INcApplicationContext appCtx, int resourceVersion)
+ throws HyracksDataException {
+ final ILocalResourceRepository localResourceRepository = appCtx.getLocalResourceRepository();
+ final LocalResource srcResource = localResourceRepository.get(resourcePath.toFile().getPath());
+ final DatasetLocalResource lsmResource = (DatasetLocalResource) srcResource.getResource();
+ // recreate the resource with the new path and version
+ final DatasetResourceReference lrr = DatasetResourceReference.of(srcResource, resourceVersion);
+ final Path destPath = lrr.getRelativePath();
+ final FileReference destDir = appCtx.getIoManager().resolve(destPath.toString());
+ // ensure the new dest dir is empty
+ if (destDir.getFile().exists()) {
+ FileUtils.deleteQuietly(destDir.getFile());
+ }
+ lsmResource.setPath(destPath.toString());
+
+ final LocalResource destResource =
+ new LocalResource(srcResource.getId(), srcResource.getVersion(), srcResource.isDurable(), lsmResource);
+ LOGGER.info(() -> "Migrating resource from: " + srcResource.getPath() + " to " + destResource.getPath());
+ localResourceRepository.insert(destResource);
+ return destPath;
+ }
+
+ /**
+ * Copies the files matching {@code filter} at {@code src} path to {@code dest}
+ *
+ * @param src
+ * @param dest
+ * @param filter
+ * @throws IOException
+ */
+ private void copyResourceFiles(Path src, Path dest, Predicate<Path> filter) throws IOException {
+ try (Stream<Path> stream = Files.list(src)) {
+ final List<Path> srcFiles = stream.filter(filter).collect(Collectors.toList());
+ for (Path srcFile : srcFiles) {
+ Path fileDest = Paths.get(dest.toString(), srcFile.getFileName().toString());
+ Files.copy(srcFile, fileDest);
+ }
+ }
+ }
+
+ @Override
+ public String toString() {
+ return "{ \"class\" : \"" + getClass().getSimpleName() + "\" }";
+ }
+}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
index 63f5bfc..47e5ac9 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
@@ -37,6 +37,7 @@
import org.apache.asterix.common.config.NodeProperties;
import org.apache.asterix.common.config.StorageProperties;
import org.apache.asterix.common.config.TransactionProperties;
+import org.apache.asterix.common.transactions.Checkpoint;
import org.apache.asterix.common.transactions.IRecoveryManager;
import org.apache.asterix.common.transactions.IRecoveryManager.SystemState;
import org.apache.asterix.common.utils.PrintUtil;
@@ -46,6 +47,7 @@
import org.apache.asterix.messaging.MessagingChannelInterfaceFactory;
import org.apache.asterix.messaging.NCMessageBroker;
import org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository;
+import org.apache.asterix.utils.CompatibilityUtil;
import org.apache.hyracks.api.application.INCServiceContext;
import org.apache.hyracks.api.application.IServiceContext;
import org.apache.hyracks.api.client.NodeStatus;
@@ -54,7 +56,6 @@
import org.apache.hyracks.api.io.IFileDeviceResolver;
import org.apache.hyracks.api.job.resource.NodeCapacity;
import org.apache.hyracks.api.messages.IMessageBroker;
-import org.apache.hyracks.api.util.IoUtil;
import org.apache.hyracks.control.common.controllers.NCConfig;
import org.apache.hyracks.control.nc.BaseNCApplication;
import org.apache.hyracks.control.nc.NodeControllerService;
@@ -115,7 +116,10 @@
MessagingChannelInterfaceFactory interfaceFactory =
new MessagingChannelInterfaceFactory((NCMessageBroker) messageBroker, messagingProperties);
this.ncServiceCtx.setMessagingChannelInterfaceFactory(interfaceFactory);
-
+ final Checkpoint latestCheckpoint = runtimeContext.getTransactionSubsystem().getCheckpointManager().getLatest();
+ if (latestCheckpoint != null) {
+ CompatibilityUtil.ensureCompatibility(controllerService, latestCheckpoint.getStorageVersion());
+ }
IRecoveryManager recoveryMgr = runtimeContext.getTransactionSubsystem().getRecoveryManager();
final SystemState stateOnStartup = recoveryMgr.getSystemState();
if (stateOnStartup == SystemState.PERMANENT_DATA_LOSS) {
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/CompatibilityUtil.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/CompatibilityUtil.java
new file mode 100644
index 0000000..5d44fc9
--- /dev/null
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/CompatibilityUtil.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.utils;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.logging.Logger;
+
+import org.apache.asterix.app.nc.task.MigrateStorageResourcesTask;
+import org.apache.asterix.common.api.INCLifecycleTask;
+import org.apache.asterix.common.utils.StorageConstants;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.control.nc.NodeControllerService;
+
+public class CompatibilityUtil {
+
+ private static final Logger LOGGER = Logger.getLogger(CompatibilityUtil.class.getName());
+ private static final int MIN_COMPATIBLE_VERSION = 1;
+
+ private CompatibilityUtil() {
+ }
+
+ public static void ensureCompatibility(NodeControllerService ncs, int onDiskVerson) throws HyracksDataException {
+ if (onDiskVerson == StorageConstants.VERSION) {
+ return;
+ }
+ ensureUpgradability(onDiskVerson);
+ LOGGER.info(() -> "Upgrading from storage version " + onDiskVerson + " to " + StorageConstants.VERSION);
+ final List<INCLifecycleTask> upgradeTasks = getUpgradeTasks(onDiskVerson);
+ for (INCLifecycleTask task : upgradeTasks) {
+ task.perform(ncs);
+ }
+ }
+
+ private static void ensureUpgradability(int onDiskVerson) {
+ if (onDiskVerson < MIN_COMPATIBLE_VERSION) {
+ throw new IllegalStateException(String.format(
+ "Storage cannot be upgraded to new version. Current version (%s). On disk version: (%s)",
+ StorageConstants.VERSION, onDiskVerson));
+ }
+ }
+
+ private static List<INCLifecycleTask> getUpgradeTasks(int fromVersion) {
+ List<INCLifecycleTask> upgradeTasks = new ArrayList<>();
+ if (fromVersion < StorageConstants.REBALANCE_STORAGE_VERSION) {
+ upgradeTasks.add(new MigrateStorageResourcesTask());
+ }
+ return upgradeTasks;
+ }
+}
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/http/servlet/ConnectorApiServletTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/http/servlet/ConnectorApiServletTest.java
index 33f3a42..87ee2ef 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/http/servlet/ConnectorApiServletTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/http/servlet/ConnectorApiServletTest.java
@@ -122,7 +122,7 @@
// Checks the correctness of results.
ArrayNode splits = (ArrayNode) actualResponse.get("splits");
String path = (splits.get(0)).get("path").asText();
- Assert.assertTrue(path.endsWith("Metadata/Dataset_idx_Dataset"));
+ Assert.assertTrue(path.endsWith("Metadata/Dataset/0/Dataset"));
}
@Test
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/common/TestDataUtil.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/common/TestDataUtil.java
new file mode 100644
index 0000000..e24ef2d
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/common/TestDataUtil.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common;
+
+import java.io.InputStream;
+import java.util.Arrays;
+import java.util.LinkedHashSet;
+
+import org.apache.asterix.api.common.AsterixHyracksIntegrationUtil;
+import org.apache.asterix.app.active.ActiveNotificationHandler;
+import org.apache.asterix.common.api.IMetadataLockManager;
+import org.apache.asterix.common.dataflow.ICcApplicationContext;
+import org.apache.asterix.common.utils.Servlets;
+import org.apache.asterix.metadata.declared.MetadataProvider;
+import org.apache.asterix.rebalance.NoOpDatasetRebalanceCallback;
+import org.apache.asterix.test.common.TestExecutor;
+import org.apache.asterix.testframework.context.TestCaseContext;
+import org.apache.asterix.utils.RebalanceUtil;
+import org.junit.Assert;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+
+public class TestDataUtil {
+
+ private static final TestExecutor TEST_EXECUTOR = new TestExecutor();
+ private static final TestCaseContext.OutputFormat OUTPUT_FORMAT = TestCaseContext.OutputFormat.CLEAN_JSON;
+ private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+
+ private TestDataUtil() {
+ }
+
+ /**
+ * Creates dataset with a single field called id as its primary key.
+ *
+ * @param dataset
+ * @throws Exception
+ */
+ public static void createIdOnlyDataset(String dataset) throws Exception {
+ TEST_EXECUTOR.executeSqlppUpdateOrDdl("CREATE TYPE KeyType IF NOT EXISTS AS { id: int };", OUTPUT_FORMAT);
+ TEST_EXECUTOR.executeSqlppUpdateOrDdl("CREATE DATASET " + dataset + "(KeyType) PRIMARY KEY id;", OUTPUT_FORMAT);
+ }
+
+ /**
+ * Upserts {@code count} ids into {@code dataset}
+ *
+ * @param dataset
+ * @param count
+ * @throws Exception
+ */
+ public static void upsertData(String dataset, long count) throws Exception {
+ for (int i = 0; i < count; i++) {
+ TEST_EXECUTOR.executeSqlppUpdateOrDdl("UPSERT INTO " + dataset + " ({\"id\": " + i + "});",
+ TestCaseContext.OutputFormat.CLEAN_JSON);
+ }
+ }
+
+ /**
+ * Gets the number of records in dataset {@code dataset}
+ *
+ * @param datasetName
+ * @return The count
+ * @throws Exception
+ */
+ public static long getDatasetCount(String datasetName) throws Exception {
+ final String query = "SELECT VALUE COUNT(*) FROM `" + datasetName + "`;";
+ final InputStream responseStream = TEST_EXECUTOR
+ .executeQueryService(query, TEST_EXECUTOR.getEndpoint(Servlets.QUERY_SERVICE), OUTPUT_FORMAT);
+ final ObjectNode response = OBJECT_MAPPER.readValue(responseStream, ObjectNode.class);
+ final JsonNode result = response.get("results");
+ // make sure there is a single value in result
+ Assert.assertEquals(1, result.size());
+ return result.get(0).asInt();
+ }
+
+ /**
+ * Rebalances a dataset to {@code targetNodes}
+ *
+ * @param integrationUtil
+ * @param dataverseName
+ * @param datasetName
+ * @param targetNodes
+ * @throws Exception
+ */
+ public static void rebalanceDataset(AsterixHyracksIntegrationUtil integrationUtil, String dataverseName,
+ String datasetName, String[] targetNodes) throws Exception {
+ ICcApplicationContext ccAppCtx =
+ (ICcApplicationContext) integrationUtil.getClusterControllerService().getApplicationContext();
+ MetadataProvider metadataProvider = new MetadataProvider(ccAppCtx, null);
+ try {
+ ActiveNotificationHandler activeNotificationHandler =
+ (ActiveNotificationHandler) ccAppCtx.getActiveNotificationHandler();
+ activeNotificationHandler.suspend(metadataProvider);
+ try {
+ IMetadataLockManager lockManager = ccAppCtx.getMetadataLockManager();
+ lockManager.acquireDatasetExclusiveModificationLock(metadataProvider.getLocks(),
+ dataverseName + '.' + datasetName);
+ RebalanceUtil.rebalance(dataverseName, datasetName, new LinkedHashSet<>(Arrays.asList(targetNodes)),
+ metadataProvider, ccAppCtx.getHcc(), NoOpDatasetRebalanceCallback.INSTANCE);
+ } finally {
+ activeNotificationHandler.resume(metadataProvider);
+ }
+ } finally {
+ metadataProvider.getLocks().unlock();
+ }
+ }
+}
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/storage/MigrateStorageResourcesTaskTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/storage/MigrateStorageResourcesTaskTest.java
new file mode 100644
index 0000000..5b53041
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/storage/MigrateStorageResourcesTaskTest.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.test.storage;
+
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.function.Function;
+
+import org.apache.asterix.api.common.AsterixHyracksIntegrationUtil;
+import org.apache.asterix.common.TestDataUtil;
+import org.apache.asterix.common.api.INcApplicationContext;
+import org.apache.asterix.common.config.GlobalConfig;
+import org.apache.asterix.common.storage.IndexPathElements;
+import org.apache.asterix.common.transactions.Checkpoint;
+import org.apache.asterix.common.utils.StorageConstants;
+import org.apache.asterix.common.utils.StoragePathUtil;
+import org.apache.asterix.transaction.management.service.recovery.AbstractCheckpointManager;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class MigrateStorageResourcesTaskTest {
+
+ private static final String DEFAULT_TEST_CONFIG_FILE_NAME = "asterix-build-configuration.xml";
+ private static final AsterixHyracksIntegrationUtil integrationUtil = new AsterixHyracksIntegrationUtil();
+
+ @Before
+ public void setUp() throws Exception {
+ System.setProperty(GlobalConfig.CONFIG_FILE_PROPERTY, DEFAULT_TEST_CONFIG_FILE_NAME);
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ integrationUtil.deinit(true);
+ }
+
+ @Test
+ public void storageStructureMigration() throws Exception {
+ Function<IndexPathElements, String> legacyIndexPathProvider = (pathElements) ->
+ (pathElements.getRebalanceCount().equals("0") ? "" : pathElements.getRebalanceCount() + File.separator)
+ + pathElements.getDatasetName() + StoragePathUtil.DATASET_INDEX_NAME_SEPARATOR + pathElements
+ .getIndexName();
+ StoragePathUtil.setIndexPathProvider(legacyIndexPathProvider);
+ integrationUtil.init(true);
+ // create dataset and insert data using legacy structure
+ String datasetName = "ds";
+ TestDataUtil.createIdOnlyDataset(datasetName);
+ TestDataUtil.upsertData(datasetName, 100);
+ final long countBeforeMigration = TestDataUtil.getDatasetCount(datasetName);
+ // stop NCs
+ integrationUtil.deinit(false);
+ // forge a checkpoint with old version to force migration to new storage structure on all ncs
+ final INcApplicationContext nc1AppCtx = (INcApplicationContext) integrationUtil.ncs[0].getApplicationContext();
+ final AbstractCheckpointManager nc1CheckpointManager =
+ (AbstractCheckpointManager) nc1AppCtx.getTransactionSubsystem().getCheckpointManager();
+ forgeOldVersionCheckpoint(nc1CheckpointManager);
+ final INcApplicationContext nc2AppCtx = (INcApplicationContext) integrationUtil.ncs[1].getApplicationContext();
+ final AbstractCheckpointManager nc2CheckpointManager =
+ (AbstractCheckpointManager) nc2AppCtx.getTransactionSubsystem().getCheckpointManager();
+ forgeOldVersionCheckpoint(nc2CheckpointManager);
+
+ // remove the legacy path provider to use the new default structure
+ StoragePathUtil.setIndexPathProvider(null);
+ // start the NCs to do the migration
+ integrationUtil.init(false);
+ final long countAfterMigration = TestDataUtil.getDatasetCount(datasetName);
+ // ensure data migrated to new structure without issues
+ Assert.assertEquals(countBeforeMigration, countAfterMigration);
+ }
+
+ private void forgeOldVersionCheckpoint(AbstractCheckpointManager manger) throws HyracksDataException {
+ Checkpoint cp = new Checkpoint(-1, -1, 0, System.currentTimeMillis(), true,
+ StorageConstants.REBALANCE_STORAGE_VERSION - 1);
+ Path path = manger.getCheckpointPath(cp.getTimeStamp());
+ // Write checkpoint file to disk
+ try (BufferedWriter writer = Files.newBufferedWriter(path)) {
+ writer.write(cp.asJson());
+ writer.flush();
+ } catch (IOException e) {
+ throw HyracksDataException.create(e);
+ }
+ }
+}
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/txn/RecoveryManagerTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/txn/RecoveryManagerTest.java
index 723786c..29efa47 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/txn/RecoveryManagerTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/txn/RecoveryManagerTest.java
@@ -19,38 +19,30 @@
package org.apache.asterix.test.txn;
import java.io.File;
-import java.io.InputStream;
-import java.util.Random;
+import java.util.logging.ConsoleHandler;
+import java.util.logging.Level;
+import java.util.logging.Logger;
import org.apache.asterix.api.common.AsterixHyracksIntegrationUtil;
+import org.apache.asterix.common.TestDataUtil;
import org.apache.asterix.common.config.GlobalConfig;
import org.apache.asterix.common.configuration.AsterixConfiguration;
import org.apache.asterix.common.configuration.Property;
-import org.apache.asterix.common.utils.Servlets;
-import org.apache.asterix.test.common.TestExecutor;
+import org.apache.asterix.metadata.bootstrap.MetadataBuiltinEntities;
import org.apache.asterix.test.common.TestHelper;
-import org.apache.asterix.testframework.context.TestCaseContext;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
-import com.fasterxml.jackson.databind.JsonNode;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.node.ObjectNode;
-
public class RecoveryManagerTest {
- private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
private static final String DEFAULT_TEST_CONFIG_FILE_NAME = "asterix-build-configuration.xml";
private static final String TEST_CONFIG_FILE_NAME = "asterix-test-configuration.xml";
private static final String TEST_CONFIG_PATH =
System.getProperty("user.dir") + File.separator + "target" + File.separator + "config";
private static final String TEST_CONFIG_FILE_PATH = TEST_CONFIG_PATH + File.separator + TEST_CONFIG_FILE_NAME;
- private static final TestExecutor testExecutor = new TestExecutor();
private static final AsterixHyracksIntegrationUtil integrationUtil = new AsterixHyracksIntegrationUtil();
- private static final Random random = new Random();
- private static final int numRecords = 1;
@Before
public void setUp() throws Exception {
@@ -74,52 +66,53 @@
@Test
public void multiDatasetRecovery() throws Exception {
String datasetNamePrefix = "ds_";
- final TestCaseContext.OutputFormat format = TestCaseContext.OutputFormat.CLEAN_JSON;
- testExecutor.executeSqlppUpdateOrDdl("CREATE TYPE KeyType AS { id: int };", format);
int numDatasets = 50;
String datasetName = null;
for (int i = 1; i <= numDatasets; i++) {
datasetName = datasetNamePrefix + i;
- testExecutor.executeSqlppUpdateOrDdl("CREATE DATASET " + datasetName + "(KeyType) PRIMARY KEY id;", format);
- insertData(datasetName);
+ TestDataUtil.createIdOnlyDataset(datasetName);
+ TestDataUtil.upsertData(datasetName, 10);
}
+ final long countBeforeFirstRecovery = TestDataUtil.getDatasetCount(datasetName);
// do ungraceful shutdown to enforce recovery
integrationUtil.deinit(false);
integrationUtil.init(false);
- validateRecovery(datasetName);
-
+ final long countAfterFirstRecovery = TestDataUtil.getDatasetCount(datasetName);
+ Assert.assertEquals(countBeforeFirstRecovery, countAfterFirstRecovery);
// create more datasets after recovery
numDatasets = 100;
for (int i = 51; i <= numDatasets; i++) {
datasetName = datasetNamePrefix + i;
- testExecutor.executeSqlppUpdateOrDdl("CREATE DATASET " + datasetName + "(KeyType) PRIMARY KEY id;", format);
- insertData(datasetName);
+ TestDataUtil.createIdOnlyDataset(datasetName);
+ TestDataUtil.upsertData(datasetName, 1);
}
+ final long countBeforeSecondRecovery = TestDataUtil.getDatasetCount(datasetName);
// do ungraceful shutdown to enforce recovery again
integrationUtil.deinit(false);
integrationUtil.init(false);
- validateRecovery(datasetName);
+ final long countAfterSecondRecovery = TestDataUtil.getDatasetCount(datasetName);
+ Assert.assertEquals(countBeforeSecondRecovery, countAfterSecondRecovery);
}
- private void insertData(String datasetName) throws Exception {
- for (int i = 0; i < numRecords; i++) {
- testExecutor.executeSqlppUpdateOrDdl("UPSERT INTO " + datasetName + " ({\"id\": " + random.nextInt() + "})",
- TestCaseContext.OutputFormat.CLEAN_JSON);
- }
- }
-
- private void validateRecovery(String datasetName) throws Exception {
- final String query = "select value count(*) from `" + datasetName + "`;";
- final InputStream inputStream = testExecutor
- .executeQueryService(query, testExecutor.getEndpoint(Servlets.QUERY_SERVICE),
- TestCaseContext.OutputFormat.CLEAN_JSON);
- final ObjectNode jsonNodes = OBJECT_MAPPER.readValue(inputStream, ObjectNode.class);
- JsonNode result = jsonNodes.get("results");
- // make sure there is result
- Assert.assertEquals(1, result.size());
- for (int i = 0; i < result.size(); i++) {
- JsonNode json = result.get(i);
- Assert.assertEquals(numRecords, json.asInt());
- }
+ @Test
+ public void reoveryAfterRebalance() throws Exception {
+ String datasetName = "ds";
+ TestDataUtil.createIdOnlyDataset(datasetName);
+ TestDataUtil.upsertData(datasetName, 10);
+ final long countBeforeRebalance = TestDataUtil.getDatasetCount(datasetName);
+ // rebalance dataset to single nc
+ TestDataUtil.rebalanceDataset(integrationUtil, MetadataBuiltinEntities.DEFAULT_DATAVERSE.getDataverseName(),
+ datasetName, new String[] { "asterix_nc2" });
+ // check data after rebalance
+ final long countAfterRebalance = TestDataUtil.getDatasetCount(datasetName);
+ Assert.assertEquals(countBeforeRebalance, countAfterRebalance);
+ // insert data after rebalance
+ TestDataUtil.upsertData(datasetName, 20);
+ final long countBeforeRecovery = TestDataUtil.getDatasetCount(datasetName);
+ // do ungraceful shutdown to enforce recovery
+ integrationUtil.deinit(false);
+ integrationUtil.init(false);
+ final long countAfterRecovery = TestDataUtil.getDatasetCount(datasetName);
+ Assert.assertEquals(countBeforeRecovery, countAfterRecovery);
}
}
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/rebalance/all_datasets/all_datasets.11.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/rebalance/all_datasets/all_datasets.11.adm
index f57b2301..d970119 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/results/rebalance/all_datasets/all_datasets.11.adm
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/rebalance/all_datasets/all_datasets.11.adm
@@ -1 +1 @@
-{"keys":"l_orderkey,l_linenumber","type":{"type":"org.apache.asterix.om.types.ARecordType","name":"LineItemType","open":false,"fields":[{"l_orderkey":{"type":"AInt64"}},{"l_partkey":{"type":"AInt64"}},{"l_suppkey":{"type":"AInt64"}},{"l_linenumber":{"type":"AInt64"}},{"l_quantity":{"type":"ADouble"}},{"l_extendedprice":{"type":"ADouble"}},{"l_discount":{"type":"ADouble"}},{"l_tax":{"type":"ADouble"}},{"l_returnflag":{"type":"AString"}},{"l_linestatus":{"type":"AString"}},{"l_shipdate":{"type":"AString"}},{"l_commitdate":{"type":"AString"}},{"l_receiptdate":{"type":"AString"}},{"l_shipinstruct":{"type":"AString"}},{"l_shipmode":{"type":"AString"}},{"l_comment":{"type":"AString"}}]},"splits":[{"ip":"127.0.0.1","path":"storage/partition_0/tpch1/2/LineItem_idx_LineItem"},{"ip":"127.0.0.1","path":"storage/partition_1/tpch1/2/LineItem_idx_LineItem"},{"ip":"127.0.0.1","path":"storage/partition_2/tpch1/2/LineItem_idx_LineItem"},{"ip":"127.0.0.1","path":"storage/partition_3/tpch1/2/LineItem_idx_LineItem"}]}
\ No newline at end of file
+{"keys":"l_orderkey,l_linenumber","type":{"type":"org.apache.asterix.om.types.ARecordType","name":"LineItemType","open":false,"fields":[{"l_orderkey":{"type":"AInt64"}},{"l_partkey":{"type":"AInt64"}},{"l_suppkey":{"type":"AInt64"}},{"l_linenumber":{"type":"AInt64"}},{"l_quantity":{"type":"ADouble"}},{"l_extendedprice":{"type":"ADouble"}},{"l_discount":{"type":"ADouble"}},{"l_tax":{"type":"ADouble"}},{"l_returnflag":{"type":"AString"}},{"l_linestatus":{"type":"AString"}},{"l_shipdate":{"type":"AString"}},{"l_commitdate":{"type":"AString"}},{"l_receiptdate":{"type":"AString"}},{"l_shipinstruct":{"type":"AString"}},{"l_shipmode":{"type":"AString"}},{"l_comment":{"type":"AString"}}]},"splits":[{"ip":"127.0.0.1","path":"storage/partition_0/tpch1/LineItem/2/LineItem"},{"ip":"127.0.0.1","path":"storage/partition_1/tpch1/LineItem/2/LineItem"},{"ip":"127.0.0.1","path":"storage/partition_2/tpch1/LineItem/2/LineItem"},{"ip":"127.0.0.1","path":"storage/partition_3/tpch1/LineItem/2/LineItem"}]}
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/rebalance/all_datasets/all_datasets.12.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/rebalance/all_datasets/all_datasets.12.adm
index 39cad23..38d2eac 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/results/rebalance/all_datasets/all_datasets.12.adm
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/rebalance/all_datasets/all_datasets.12.adm
@@ -1 +1 @@
-{"keys":"o_orderkey","type":{"type":"org.apache.asterix.om.types.ARecordType","name":"OrderType","open":false,"fields":[{"o_orderkey":{"type":"AInt64"}},{"o_custkey":{"type":"AInt64"}},{"o_orderstatus":{"type":"AString"}},{"o_totalprice":{"type":"ADouble"}},{"o_orderdate":{"type":"AString"}},{"o_orderpriority":{"type":"AString"}},{"o_clerk":{"type":"AString"}},{"o_shippriority":{"type":"AInt64"}},{"o_comment":{"type":"AString"}}]},"splits":[{"ip":"127.0.0.1","path":"storage/partition_0/tpch2/2/Orders_idx_Orders"},{"ip":"127.0.0.1","path":"storage/partition_1/tpch2/2/Orders_idx_Orders"},{"ip":"127.0.0.1","path":"storage/partition_2/tpch2/2/Orders_idx_Orders"},{"ip":"127.0.0.1","path":"storage/partition_3/tpch2/2/Orders_idx_Orders"}]}
\ No newline at end of file
+{"keys":"o_orderkey","type":{"type":"org.apache.asterix.om.types.ARecordType","name":"OrderType","open":false,"fields":[{"o_orderkey":{"type":"AInt64"}},{"o_custkey":{"type":"AInt64"}},{"o_orderstatus":{"type":"AString"}},{"o_totalprice":{"type":"ADouble"}},{"o_orderdate":{"type":"AString"}},{"o_orderpriority":{"type":"AString"}},{"o_clerk":{"type":"AString"}},{"o_shippriority":{"type":"AInt64"}},{"o_comment":{"type":"AString"}}]},"splits":[{"ip":"127.0.0.1","path":"storage/partition_0/tpch2/Orders/2/Orders"},{"ip":"127.0.0.1","path":"storage/partition_1/tpch2/Orders/2/Orders"},{"ip":"127.0.0.1","path":"storage/partition_2/tpch2/Orders/2/Orders"},{"ip":"127.0.0.1","path":"storage/partition_3/tpch2/Orders/2/Orders"}]}
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/rebalance/all_datasets/all_datasets.5.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/rebalance/all_datasets/all_datasets.5.adm
index fa5763e..af9759b 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/results/rebalance/all_datasets/all_datasets.5.adm
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/rebalance/all_datasets/all_datasets.5.adm
@@ -1 +1 @@
-{"keys":"l_orderkey,l_linenumber","type":{"type":"org.apache.asterix.om.types.ARecordType","name":"LineItemType","open":false,"fields":[{"l_orderkey":{"type":"AInt64"}},{"l_partkey":{"type":"AInt64"}},{"l_suppkey":{"type":"AInt64"}},{"l_linenumber":{"type":"AInt64"}},{"l_quantity":{"type":"ADouble"}},{"l_extendedprice":{"type":"ADouble"}},{"l_discount":{"type":"ADouble"}},{"l_tax":{"type":"ADouble"}},{"l_returnflag":{"type":"AString"}},{"l_linestatus":{"type":"AString"}},{"l_shipdate":{"type":"AString"}},{"l_commitdate":{"type":"AString"}},{"l_receiptdate":{"type":"AString"}},{"l_shipinstruct":{"type":"AString"}},{"l_shipmode":{"type":"AString"}},{"l_comment":{"type":"AString"}}]},"splits":[{"ip":"127.0.0.1","path":"storage/partition_0/tpch1/1/LineItem_idx_LineItem"},{"ip":"127.0.0.1","path":"storage/partition_1/tpch1/1/LineItem_idx_LineItem"}]}
\ No newline at end of file
+{"keys":"l_orderkey,l_linenumber","type":{"type":"org.apache.asterix.om.types.ARecordType","name":"LineItemType","open":false,"fields":[{"l_orderkey":{"type":"AInt64"}},{"l_partkey":{"type":"AInt64"}},{"l_suppkey":{"type":"AInt64"}},{"l_linenumber":{"type":"AInt64"}},{"l_quantity":{"type":"ADouble"}},{"l_extendedprice":{"type":"ADouble"}},{"l_discount":{"type":"ADouble"}},{"l_tax":{"type":"ADouble"}},{"l_returnflag":{"type":"AString"}},{"l_linestatus":{"type":"AString"}},{"l_shipdate":{"type":"AString"}},{"l_commitdate":{"type":"AString"}},{"l_receiptdate":{"type":"AString"}},{"l_shipinstruct":{"type":"AString"}},{"l_shipmode":{"type":"AString"}},{"l_comment":{"type":"AString"}}]},"splits":[{"ip":"127.0.0.1","path":"storage/partition_0/tpch1/LineItem/1/LineItem"},{"ip":"127.0.0.1","path":"storage/partition_1/tpch1/LineItem/1/LineItem"}]}
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/rebalance/all_datasets/all_datasets.6.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/rebalance/all_datasets/all_datasets.6.adm
index b3dea05..0003101 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/results/rebalance/all_datasets/all_datasets.6.adm
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/rebalance/all_datasets/all_datasets.6.adm
@@ -1 +1 @@
-{"keys":"o_orderkey","type":{"type":"org.apache.asterix.om.types.ARecordType","name":"OrderType","open":false,"fields":[{"o_orderkey":{"type":"AInt64"}},{"o_custkey":{"type":"AInt64"}},{"o_orderstatus":{"type":"AString"}},{"o_totalprice":{"type":"ADouble"}},{"o_orderdate":{"type":"AString"}},{"o_orderpriority":{"type":"AString"}},{"o_clerk":{"type":"AString"}},{"o_shippriority":{"type":"AInt64"}},{"o_comment":{"type":"AString"}}]},"splits":[{"ip":"127.0.0.1","path":"storage/partition_0/tpch2/1/Orders_idx_Orders"},{"ip":"127.0.0.1","path":"storage/partition_1/tpch2/1/Orders_idx_Orders"}]}
\ No newline at end of file
+{"keys":"o_orderkey","type":{"type":"org.apache.asterix.om.types.ARecordType","name":"OrderType","open":false,"fields":[{"o_orderkey":{"type":"AInt64"}},{"o_custkey":{"type":"AInt64"}},{"o_orderstatus":{"type":"AString"}},{"o_totalprice":{"type":"ADouble"}},{"o_orderdate":{"type":"AString"}},{"o_orderpriority":{"type":"AString"}},{"o_clerk":{"type":"AString"}},{"o_shippriority":{"type":"AInt64"}},{"o_comment":{"type":"AString"}}]},"splits":[{"ip":"127.0.0.1","path":"storage/partition_0/tpch2/Orders/1/Orders"},{"ip":"127.0.0.1","path":"storage/partition_1/tpch2/Orders/1/Orders"}]}
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/rebalance/single_dataset/single_dataset.5.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/rebalance/single_dataset/single_dataset.5.adm
index 8ff99de..374e1af 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/results/rebalance/single_dataset/single_dataset.5.adm
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/rebalance/single_dataset/single_dataset.5.adm
@@ -1 +1 @@
-{"keys":"l_orderkey,l_linenumber","type":{"type":"org.apache.asterix.om.types.ARecordType","name":"LineItemType","open":false,"fields":[{"l_orderkey":{"type":"AInt64"}},{"l_partkey":{"type":"AInt64"}},{"l_suppkey":{"type":"AInt64"}},{"l_linenumber":{"type":"AInt64"}},{"l_quantity":{"type":"ADouble"}},{"l_extendedprice":{"type":"ADouble"}},{"l_discount":{"type":"ADouble"}},{"l_tax":{"type":"ADouble"}},{"l_returnflag":{"type":"AString"}},{"l_linestatus":{"type":"AString"}},{"l_shipdate":{"type":"AString"}},{"l_commitdate":{"type":"AString"}},{"l_receiptdate":{"type":"AString"}},{"l_shipinstruct":{"type":"AString"}},{"l_shipmode":{"type":"AString"}},{"l_comment":{"type":"AString"}}]},"splits":[{"ip":"127.0.0.1","path":"storage/partition_0/tpch/1/LineItem_idx_LineItem"},{"ip":"127.0.0.1","path":"storage/partition_1/tpch/1/LineItem_idx_LineItem"}]}
+{"keys":"l_orderkey,l_linenumber","type":{"type":"org.apache.asterix.om.types.ARecordType","name":"LineItemType","open":false,"fields":[{"l_orderkey":{"type":"AInt64"}},{"l_partkey":{"type":"AInt64"}},{"l_suppkey":{"type":"AInt64"}},{"l_linenumber":{"type":"AInt64"}},{"l_quantity":{"type":"ADouble"}},{"l_extendedprice":{"type":"ADouble"}},{"l_discount":{"type":"ADouble"}},{"l_tax":{"type":"ADouble"}},{"l_returnflag":{"type":"AString"}},{"l_linestatus":{"type":"AString"}},{"l_shipdate":{"type":"AString"}},{"l_commitdate":{"type":"AString"}},{"l_receiptdate":{"type":"AString"}},{"l_shipinstruct":{"type":"AString"}},{"l_shipmode":{"type":"AString"}},{"l_comment":{"type":"AString"}}]},"splits":[{"ip":"127.0.0.1","path":"storage/partition_0/tpch/LineItem/1/LineItem"},{"ip":"127.0.0.1","path":"storage/partition_1/tpch/LineItem/1/LineItem"}]}
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/rebalance/single_dataset/single_dataset.9.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/rebalance/single_dataset/single_dataset.9.adm
index 6eece47..ae6e2fb 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/results/rebalance/single_dataset/single_dataset.9.adm
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/rebalance/single_dataset/single_dataset.9.adm
@@ -1 +1 @@
-{"keys":"l_orderkey,l_linenumber","type":{"type":"org.apache.asterix.om.types.ARecordType","name":"LineItemType","open":false,"fields":[{"l_orderkey":{"type":"AInt64"}},{"l_partkey":{"type":"AInt64"}},{"l_suppkey":{"type":"AInt64"}},{"l_linenumber":{"type":"AInt64"}},{"l_quantity":{"type":"ADouble"}},{"l_extendedprice":{"type":"ADouble"}},{"l_discount":{"type":"ADouble"}},{"l_tax":{"type":"ADouble"}},{"l_returnflag":{"type":"AString"}},{"l_linestatus":{"type":"AString"}},{"l_shipdate":{"type":"AString"}},{"l_commitdate":{"type":"AString"}},{"l_receiptdate":{"type":"AString"}},{"l_shipinstruct":{"type":"AString"}},{"l_shipmode":{"type":"AString"}},{"l_comment":{"type":"AString"}}]},"splits":[{"ip":"127.0.0.1","path":"storage/partition_0/tpch/2/LineItem_idx_LineItem"},{"ip":"127.0.0.1","path":"storage/partition_1/tpch/2/LineItem_idx_LineItem"},{"ip":"127.0.0.1","path":"storage/partition_2/tpch/2/LineItem_idx_LineItem"},{"ip":"127.0.0.1","path":"storage/partition_3/tpch/2/LineItem_idx_LineItem"}]}
\ No newline at end of file
+{"keys":"l_orderkey,l_linenumber","type":{"type":"org.apache.asterix.om.types.ARecordType","name":"LineItemType","open":false,"fields":[{"l_orderkey":{"type":"AInt64"}},{"l_partkey":{"type":"AInt64"}},{"l_suppkey":{"type":"AInt64"}},{"l_linenumber":{"type":"AInt64"}},{"l_quantity":{"type":"ADouble"}},{"l_extendedprice":{"type":"ADouble"}},{"l_discount":{"type":"ADouble"}},{"l_tax":{"type":"ADouble"}},{"l_returnflag":{"type":"AString"}},{"l_linestatus":{"type":"AString"}},{"l_shipdate":{"type":"AString"}},{"l_commitdate":{"type":"AString"}},{"l_receiptdate":{"type":"AString"}},{"l_shipinstruct":{"type":"AString"}},{"l_shipmode":{"type":"AString"}},{"l_comment":{"type":"AString"}}]},"splits":[{"ip":"127.0.0.1","path":"storage/partition_0/tpch/LineItem/2/LineItem"},{"ip":"127.0.0.1","path":"storage/partition_1/tpch/LineItem/2/LineItem"},{"ip":"127.0.0.1","path":"storage/partition_2/tpch/LineItem/2/LineItem"},{"ip":"127.0.0.1","path":"storage/partition_3/tpch/LineItem/2/LineItem"}]}
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/rebalance/single_dataset_with_index/single_dataset_with_index.10.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/rebalance/single_dataset_with_index/single_dataset_with_index.10.adm
index 6eece47..ae6e2fb 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/results/rebalance/single_dataset_with_index/single_dataset_with_index.10.adm
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/rebalance/single_dataset_with_index/single_dataset_with_index.10.adm
@@ -1 +1 @@
-{"keys":"l_orderkey,l_linenumber","type":{"type":"org.apache.asterix.om.types.ARecordType","name":"LineItemType","open":false,"fields":[{"l_orderkey":{"type":"AInt64"}},{"l_partkey":{"type":"AInt64"}},{"l_suppkey":{"type":"AInt64"}},{"l_linenumber":{"type":"AInt64"}},{"l_quantity":{"type":"ADouble"}},{"l_extendedprice":{"type":"ADouble"}},{"l_discount":{"type":"ADouble"}},{"l_tax":{"type":"ADouble"}},{"l_returnflag":{"type":"AString"}},{"l_linestatus":{"type":"AString"}},{"l_shipdate":{"type":"AString"}},{"l_commitdate":{"type":"AString"}},{"l_receiptdate":{"type":"AString"}},{"l_shipinstruct":{"type":"AString"}},{"l_shipmode":{"type":"AString"}},{"l_comment":{"type":"AString"}}]},"splits":[{"ip":"127.0.0.1","path":"storage/partition_0/tpch/2/LineItem_idx_LineItem"},{"ip":"127.0.0.1","path":"storage/partition_1/tpch/2/LineItem_idx_LineItem"},{"ip":"127.0.0.1","path":"storage/partition_2/tpch/2/LineItem_idx_LineItem"},{"ip":"127.0.0.1","path":"storage/partition_3/tpch/2/LineItem_idx_LineItem"}]}
\ No newline at end of file
+{"keys":"l_orderkey,l_linenumber","type":{"type":"org.apache.asterix.om.types.ARecordType","name":"LineItemType","open":false,"fields":[{"l_orderkey":{"type":"AInt64"}},{"l_partkey":{"type":"AInt64"}},{"l_suppkey":{"type":"AInt64"}},{"l_linenumber":{"type":"AInt64"}},{"l_quantity":{"type":"ADouble"}},{"l_extendedprice":{"type":"ADouble"}},{"l_discount":{"type":"ADouble"}},{"l_tax":{"type":"ADouble"}},{"l_returnflag":{"type":"AString"}},{"l_linestatus":{"type":"AString"}},{"l_shipdate":{"type":"AString"}},{"l_commitdate":{"type":"AString"}},{"l_receiptdate":{"type":"AString"}},{"l_shipinstruct":{"type":"AString"}},{"l_shipmode":{"type":"AString"}},{"l_comment":{"type":"AString"}}]},"splits":[{"ip":"127.0.0.1","path":"storage/partition_0/tpch/LineItem/2/LineItem"},{"ip":"127.0.0.1","path":"storage/partition_1/tpch/LineItem/2/LineItem"},{"ip":"127.0.0.1","path":"storage/partition_2/tpch/LineItem/2/LineItem"},{"ip":"127.0.0.1","path":"storage/partition_3/tpch/LineItem/2/LineItem"}]}
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/rebalance/single_dataset_with_index/single_dataset_with_index.5.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/rebalance/single_dataset_with_index/single_dataset_with_index.5.adm
index 8ff99de..374e1af 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/results/rebalance/single_dataset_with_index/single_dataset_with_index.5.adm
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/rebalance/single_dataset_with_index/single_dataset_with_index.5.adm
@@ -1 +1 @@
-{"keys":"l_orderkey,l_linenumber","type":{"type":"org.apache.asterix.om.types.ARecordType","name":"LineItemType","open":false,"fields":[{"l_orderkey":{"type":"AInt64"}},{"l_partkey":{"type":"AInt64"}},{"l_suppkey":{"type":"AInt64"}},{"l_linenumber":{"type":"AInt64"}},{"l_quantity":{"type":"ADouble"}},{"l_extendedprice":{"type":"ADouble"}},{"l_discount":{"type":"ADouble"}},{"l_tax":{"type":"ADouble"}},{"l_returnflag":{"type":"AString"}},{"l_linestatus":{"type":"AString"}},{"l_shipdate":{"type":"AString"}},{"l_commitdate":{"type":"AString"}},{"l_receiptdate":{"type":"AString"}},{"l_shipinstruct":{"type":"AString"}},{"l_shipmode":{"type":"AString"}},{"l_comment":{"type":"AString"}}]},"splits":[{"ip":"127.0.0.1","path":"storage/partition_0/tpch/1/LineItem_idx_LineItem"},{"ip":"127.0.0.1","path":"storage/partition_1/tpch/1/LineItem_idx_LineItem"}]}
+{"keys":"l_orderkey,l_linenumber","type":{"type":"org.apache.asterix.om.types.ARecordType","name":"LineItemType","open":false,"fields":[{"l_orderkey":{"type":"AInt64"}},{"l_partkey":{"type":"AInt64"}},{"l_suppkey":{"type":"AInt64"}},{"l_linenumber":{"type":"AInt64"}},{"l_quantity":{"type":"ADouble"}},{"l_extendedprice":{"type":"ADouble"}},{"l_discount":{"type":"ADouble"}},{"l_tax":{"type":"ADouble"}},{"l_returnflag":{"type":"AString"}},{"l_linestatus":{"type":"AString"}},{"l_shipdate":{"type":"AString"}},{"l_commitdate":{"type":"AString"}},{"l_receiptdate":{"type":"AString"}},{"l_shipinstruct":{"type":"AString"}},{"l_shipmode":{"type":"AString"}},{"l_comment":{"type":"AString"}}]},"splits":[{"ip":"127.0.0.1","path":"storage/partition_0/tpch/LineItem/1/LineItem"},{"ip":"127.0.0.1","path":"storage/partition_1/tpch/LineItem/1/LineItem"}]}
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/rebalance/single_dataverse/single_dataverse.11.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/rebalance/single_dataverse/single_dataverse.11.adm
index 6eece47..ae6e2fb 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/results/rebalance/single_dataverse/single_dataverse.11.adm
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/rebalance/single_dataverse/single_dataverse.11.adm
@@ -1 +1 @@
-{"keys":"l_orderkey,l_linenumber","type":{"type":"org.apache.asterix.om.types.ARecordType","name":"LineItemType","open":false,"fields":[{"l_orderkey":{"type":"AInt64"}},{"l_partkey":{"type":"AInt64"}},{"l_suppkey":{"type":"AInt64"}},{"l_linenumber":{"type":"AInt64"}},{"l_quantity":{"type":"ADouble"}},{"l_extendedprice":{"type":"ADouble"}},{"l_discount":{"type":"ADouble"}},{"l_tax":{"type":"ADouble"}},{"l_returnflag":{"type":"AString"}},{"l_linestatus":{"type":"AString"}},{"l_shipdate":{"type":"AString"}},{"l_commitdate":{"type":"AString"}},{"l_receiptdate":{"type":"AString"}},{"l_shipinstruct":{"type":"AString"}},{"l_shipmode":{"type":"AString"}},{"l_comment":{"type":"AString"}}]},"splits":[{"ip":"127.0.0.1","path":"storage/partition_0/tpch/2/LineItem_idx_LineItem"},{"ip":"127.0.0.1","path":"storage/partition_1/tpch/2/LineItem_idx_LineItem"},{"ip":"127.0.0.1","path":"storage/partition_2/tpch/2/LineItem_idx_LineItem"},{"ip":"127.0.0.1","path":"storage/partition_3/tpch/2/LineItem_idx_LineItem"}]}
\ No newline at end of file
+{"keys":"l_orderkey,l_linenumber","type":{"type":"org.apache.asterix.om.types.ARecordType","name":"LineItemType","open":false,"fields":[{"l_orderkey":{"type":"AInt64"}},{"l_partkey":{"type":"AInt64"}},{"l_suppkey":{"type":"AInt64"}},{"l_linenumber":{"type":"AInt64"}},{"l_quantity":{"type":"ADouble"}},{"l_extendedprice":{"type":"ADouble"}},{"l_discount":{"type":"ADouble"}},{"l_tax":{"type":"ADouble"}},{"l_returnflag":{"type":"AString"}},{"l_linestatus":{"type":"AString"}},{"l_shipdate":{"type":"AString"}},{"l_commitdate":{"type":"AString"}},{"l_receiptdate":{"type":"AString"}},{"l_shipinstruct":{"type":"AString"}},{"l_shipmode":{"type":"AString"}},{"l_comment":{"type":"AString"}}]},"splits":[{"ip":"127.0.0.1","path":"storage/partition_0/tpch/LineItem/2/LineItem"},{"ip":"127.0.0.1","path":"storage/partition_1/tpch/LineItem/2/LineItem"},{"ip":"127.0.0.1","path":"storage/partition_2/tpch/LineItem/2/LineItem"},{"ip":"127.0.0.1","path":"storage/partition_3/tpch/LineItem/2/LineItem"}]}
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/rebalance/single_dataverse/single_dataverse.12.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/rebalance/single_dataverse/single_dataverse.12.adm
index 5a549ef..f9cd003 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/results/rebalance/single_dataverse/single_dataverse.12.adm
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/rebalance/single_dataverse/single_dataverse.12.adm
@@ -1 +1 @@
-{"keys":"o_orderkey","type":{"type":"org.apache.asterix.om.types.ARecordType","name":"OrderType","open":false,"fields":[{"o_orderkey":{"type":"AInt64"}},{"o_custkey":{"type":"AInt64"}},{"o_orderstatus":{"type":"AString"}},{"o_totalprice":{"type":"ADouble"}},{"o_orderdate":{"type":"AString"}},{"o_orderpriority":{"type":"AString"}},{"o_clerk":{"type":"AString"}},{"o_shippriority":{"type":"AInt64"}},{"o_comment":{"type":"AString"}}]},"splits":[{"ip":"127.0.0.1","path":"storage/partition_0/tpch/2/Orders_idx_Orders"},{"ip":"127.0.0.1","path":"storage/partition_1/tpch/2/Orders_idx_Orders"},{"ip":"127.0.0.1","path":"storage/partition_2/tpch/2/Orders_idx_Orders"},{"ip":"127.0.0.1","path":"storage/partition_3/tpch/2/Orders_idx_Orders"}]}
\ No newline at end of file
+{"keys":"o_orderkey","type":{"type":"org.apache.asterix.om.types.ARecordType","name":"OrderType","open":false,"fields":[{"o_orderkey":{"type":"AInt64"}},{"o_custkey":{"type":"AInt64"}},{"o_orderstatus":{"type":"AString"}},{"o_totalprice":{"type":"ADouble"}},{"o_orderdate":{"type":"AString"}},{"o_orderpriority":{"type":"AString"}},{"o_clerk":{"type":"AString"}},{"o_shippriority":{"type":"AInt64"}},{"o_comment":{"type":"AString"}}]},"splits":[{"ip":"127.0.0.1","path":"storage/partition_0/tpch/Orders/2/Orders"},{"ip":"127.0.0.1","path":"storage/partition_1/tpch/Orders/2/Orders"},{"ip":"127.0.0.1","path":"storage/partition_2/tpch/Orders/2/Orders"},{"ip":"127.0.0.1","path":"storage/partition_3/tpch/Orders/2/Orders"}]}
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/rebalance/single_dataverse/single_dataverse.5.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/rebalance/single_dataverse/single_dataverse.5.adm
index 8ff99de..374e1af 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/results/rebalance/single_dataverse/single_dataverse.5.adm
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/rebalance/single_dataverse/single_dataverse.5.adm
@@ -1 +1 @@
-{"keys":"l_orderkey,l_linenumber","type":{"type":"org.apache.asterix.om.types.ARecordType","name":"LineItemType","open":false,"fields":[{"l_orderkey":{"type":"AInt64"}},{"l_partkey":{"type":"AInt64"}},{"l_suppkey":{"type":"AInt64"}},{"l_linenumber":{"type":"AInt64"}},{"l_quantity":{"type":"ADouble"}},{"l_extendedprice":{"type":"ADouble"}},{"l_discount":{"type":"ADouble"}},{"l_tax":{"type":"ADouble"}},{"l_returnflag":{"type":"AString"}},{"l_linestatus":{"type":"AString"}},{"l_shipdate":{"type":"AString"}},{"l_commitdate":{"type":"AString"}},{"l_receiptdate":{"type":"AString"}},{"l_shipinstruct":{"type":"AString"}},{"l_shipmode":{"type":"AString"}},{"l_comment":{"type":"AString"}}]},"splits":[{"ip":"127.0.0.1","path":"storage/partition_0/tpch/1/LineItem_idx_LineItem"},{"ip":"127.0.0.1","path":"storage/partition_1/tpch/1/LineItem_idx_LineItem"}]}
+{"keys":"l_orderkey,l_linenumber","type":{"type":"org.apache.asterix.om.types.ARecordType","name":"LineItemType","open":false,"fields":[{"l_orderkey":{"type":"AInt64"}},{"l_partkey":{"type":"AInt64"}},{"l_suppkey":{"type":"AInt64"}},{"l_linenumber":{"type":"AInt64"}},{"l_quantity":{"type":"ADouble"}},{"l_extendedprice":{"type":"ADouble"}},{"l_discount":{"type":"ADouble"}},{"l_tax":{"type":"ADouble"}},{"l_returnflag":{"type":"AString"}},{"l_linestatus":{"type":"AString"}},{"l_shipdate":{"type":"AString"}},{"l_commitdate":{"type":"AString"}},{"l_receiptdate":{"type":"AString"}},{"l_shipinstruct":{"type":"AString"}},{"l_shipmode":{"type":"AString"}},{"l_comment":{"type":"AString"}}]},"splits":[{"ip":"127.0.0.1","path":"storage/partition_0/tpch/LineItem/1/LineItem"},{"ip":"127.0.0.1","path":"storage/partition_1/tpch/LineItem/1/LineItem"}]}
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/rebalance/single_dataverse/single_dataverse.6.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/rebalance/single_dataverse/single_dataverse.6.adm
index 6889a70..bca061b 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/results/rebalance/single_dataverse/single_dataverse.6.adm
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/rebalance/single_dataverse/single_dataverse.6.adm
@@ -1 +1 @@
-{"keys":"o_orderkey","type":{"type":"org.apache.asterix.om.types.ARecordType","name":"OrderType","open":false,"fields":[{"o_orderkey":{"type":"AInt64"}},{"o_custkey":{"type":"AInt64"}},{"o_orderstatus":{"type":"AString"}},{"o_totalprice":{"type":"ADouble"}},{"o_orderdate":{"type":"AString"}},{"o_orderpriority":{"type":"AString"}},{"o_clerk":{"type":"AString"}},{"o_shippriority":{"type":"AInt64"}},{"o_comment":{"type":"AString"}}]},"splits":[{"ip":"127.0.0.1","path":"storage/partition_0/tpch/1/Orders_idx_Orders"},{"ip":"127.0.0.1","path":"storage/partition_1/tpch/1/Orders_idx_Orders"}]}
\ No newline at end of file
+{"keys":"o_orderkey","type":{"type":"org.apache.asterix.om.types.ARecordType","name":"OrderType","open":false,"fields":[{"o_orderkey":{"type":"AInt64"}},{"o_custkey":{"type":"AInt64"}},{"o_orderstatus":{"type":"AString"}},{"o_totalprice":{"type":"ADouble"}},{"o_orderdate":{"type":"AString"}},{"o_orderpriority":{"type":"AString"}},{"o_clerk":{"type":"AString"}},{"o_shippriority":{"type":"AInt64"}},{"o_comment":{"type":"AString"}}]},"splits":[{"ip":"127.0.0.1","path":"storage/partition_0/tpch/Orders/1/Orders"},{"ip":"127.0.0.1","path":"storage/partition_1/tpch/Orders/1/Orders"}]}
\ No newline at end of file
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/DatasetLocalResource.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/DatasetLocalResource.java
index 78b5cb2..48bbf00 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/DatasetLocalResource.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/DatasetLocalResource.java
@@ -59,6 +59,11 @@
}
@Override
+ public void setPath(String path) {
+ resource.setPath(path);
+ }
+
+ @Override
public IIndex createInstance(INCServiceContext ncServiceCtx) throws HyracksDataException {
return resource.createInstance(ncServiceCtx);
}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/IReplicaResourcesManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/IReplicaResourcesManager.java
index 6ffa095..1c3f030 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/IReplicaResourcesManager.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/IReplicaResourcesManager.java
@@ -20,11 +20,14 @@
import java.util.Set;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
public interface IReplicaResourcesManager {
/**
* @param partitions
* @return the minimum LSN of all indexes that belong to {@code partitions}.
+ * @throws HyracksDataException
*/
- public long getPartitionsMinLSN(Set<Integer> partitions);
+ long getPartitionsMinLSN(Set<Integer> partitions) throws HyracksDataException;
}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/DatasetResourceReference.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/DatasetResourceReference.java
new file mode 100644
index 0000000..d05321e
--- /dev/null
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/DatasetResourceReference.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.storage;
+
+import java.nio.file.Paths;
+
+import org.apache.asterix.common.dataflow.DatasetLocalResource;
+import org.apache.asterix.common.utils.StorageConstants;
+import org.apache.hyracks.storage.common.LocalResource;
+
+public class DatasetResourceReference extends ResourceReference {
+
+ private int datasetId;
+ private int partitionId;
+
+ private DatasetResourceReference() {
+ super();
+ }
+
+ public static DatasetResourceReference of(LocalResource localResource) {
+ return of(localResource, StorageConstants.VERSION);
+ }
+
+ public static DatasetResourceReference of(LocalResource localResource, int version) {
+ if (version < StorageConstants.REBALANCE_STORAGE_VERSION) {
+ // to support legacy storage migration
+ return parseLegacyPath(localResource);
+ }
+ return parse(localResource);
+ }
+
+ public int getDatasetId() {
+ return datasetId;
+ }
+
+ public int getPartitionId() {
+ return partitionId;
+ }
+
+ private static DatasetResourceReference parse(LocalResource localResource) {
+ final DatasetResourceReference datasetResourceReference = new DatasetResourceReference();
+ final String filePath = Paths.get(localResource.getPath(), StorageConstants.METADATA_FILE_NAME).toString();
+ parse(datasetResourceReference, filePath);
+ assignIds(localResource, datasetResourceReference);
+ return datasetResourceReference;
+ }
+
+ private static DatasetResourceReference parseLegacyPath(LocalResource localResource) {
+ final DatasetResourceReference datasetResourceReference = new DatasetResourceReference();
+ final String filePath = Paths.get(localResource.getPath(), StorageConstants.METADATA_FILE_NAME).toString();
+ parseLegacyPath(datasetResourceReference, filePath);
+ assignIds(localResource, datasetResourceReference);
+ return datasetResourceReference;
+ }
+
+ private static void assignIds(LocalResource localResource, DatasetResourceReference lrr) {
+ final DatasetLocalResource dsResource = (DatasetLocalResource) localResource.getResource();
+ lrr.datasetId = dsResource.getDatasetId();
+ lrr.partitionId = dsResource.getPartition();
+ }
+}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IndexFileProperties.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IndexFileProperties.java
deleted file mode 100644
index ca6968f..0000000
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IndexFileProperties.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.common.storage;
-
-import java.io.File;
-
-import org.apache.asterix.common.utils.StoragePathUtil;
-
-/**
- * A holder class for an index file properties.
- */
-public class IndexFileProperties {
-
- private final String fileName;
- private final String idxName;
- private final String dataverseName;
- private final int partitionId;
- private final int datasetId;
-
- public IndexFileProperties(int partitionId, String dataverseName, String idxName, String fileName, int datasetId) {
- this.partitionId = partitionId;
- this.dataverseName = dataverseName;
- this.idxName = idxName;
- this.fileName = fileName;
- this.datasetId = datasetId;
- }
-
- public String getFileName() {
- return fileName;
- }
-
- public String getIdxName() {
- return idxName;
- }
-
- public String getDataverseName() {
- return dataverseName;
- }
-
- public int getPartitionId() {
- return partitionId;
- }
-
- public int getDatasetId() {
- return datasetId;
- }
-
- @Override
- public String toString() {
- StringBuilder sb = new StringBuilder();
- sb.append(StoragePathUtil.PARTITION_DIR_PREFIX + partitionId + File.separator);
- sb.append(dataverseName + File.separator);
- sb.append(idxName + File.separator);
- sb.append(fileName);
- sb.append(" [Dataset ID: " + datasetId + "]");
- return sb.toString();
- }
-}
\ No newline at end of file
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IndexPathElements.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IndexPathElements.java
new file mode 100644
index 0000000..4d0f3dd
--- /dev/null
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IndexPathElements.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.storage;
+
+public class IndexPathElements {
+
+ private final String datasetName;
+ private final String indexName;
+ private final String rebalanceCount;
+
+ public IndexPathElements(String datasetName, String indexName, String rebalanceCount) {
+ this.datasetName = datasetName;
+ this.indexName = indexName;
+ this.rebalanceCount = rebalanceCount;
+ }
+
+ public String getDatasetName() {
+ return datasetName;
+ }
+
+ public String getIndexName() {
+ return indexName;
+ }
+
+ public String getRebalanceCount() {
+ return rebalanceCount;
+ }
+}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/ResourceReference.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/ResourceReference.java
new file mode 100644
index 0000000..0d65067
--- /dev/null
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/ResourceReference.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.storage;
+
+import java.io.File;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+
+import org.apache.asterix.common.utils.StoragePathUtil;
+
+public class ResourceReference {
+
+ protected String root;
+ protected String partition;
+ protected String dataverse;
+ protected String dataset;
+ protected String rebalance;
+ protected String index;
+ protected String name;
+
+ protected ResourceReference() {
+ }
+
+ public static ResourceReference of(String localResourcePath) {
+ ResourceReference lrr = new ResourceReference();
+ parse(lrr, localResourcePath);
+ return lrr;
+ }
+
+ public String getPartition() {
+ return partition;
+ }
+
+ public String getDataverse() {
+ return dataverse;
+ }
+
+ public String getDataset() {
+ return dataset;
+ }
+
+ public String getRebalance() {
+ return rebalance;
+ }
+
+ public String getIndex() {
+ return index;
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public Path getRelativePath() {
+ return Paths.get(root, partition, dataverse, dataset, rebalance, index);
+ }
+
+ protected static void parse(ResourceReference ref, String path) {
+ // format: root/partition/dataverse/dataset/rebalanceCount/index/fileName
+ final String[] tokens = path.split(File.separator);
+ if (tokens.length < 6) {
+ throw new IllegalStateException("Unrecognized path structure: " + path);
+ }
+ int offset = tokens.length;
+ ref.name = tokens[--offset];
+ ref.index = tokens[--offset];
+ ref.rebalance = tokens[--offset];
+ ref.dataset = tokens[--offset];
+ ref.dataverse = tokens[--offset];
+ ref.partition = tokens[--offset];
+ ref.root = tokens[--offset];
+ }
+
+ protected static void parseLegacyPath(ResourceReference ref, String path) {
+ // old format: root/partition/dataverse/datasetName_idx_IndexName/fileName
+ final String[] tokens = path.split(File.separator);
+ if (tokens.length < 4) {
+ throw new IllegalStateException("Unrecognized legacy path structure: " + path);
+ }
+ int offset = tokens.length;
+ ref.name = tokens[--offset];
+ // split combined dataset/index name
+ final String[] indexTokens = tokens[--offset].split(StoragePathUtil.DATASET_INDEX_NAME_SEPARATOR);
+ if (indexTokens.length != 2) {
+ throw new IllegalStateException("Unrecognized legacy path structure: " + path);
+ }
+ ref.dataset = indexTokens[0];
+ ref.index = indexTokens[1];
+ ref.dataverse = tokens[--offset];
+ ref.partition = tokens[--offset];
+ ref.root = tokens[--offset];
+ ref.rebalance = String.valueOf(0);
+ }
+}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StorageConstants.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StorageConstants.java
index 49d64d6..48769d4 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StorageConstants.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StorageConstants.java
@@ -24,13 +24,25 @@
* A static class that stores storage constants
*/
public class StorageConstants {
- public static final String METADATA_ROOT = "root_metadata";
- /** The storage version of AsterixDB related artifacts (e.g. log files, checkpoint files, etc..). */
- private static final int LOCAL_STORAGE_VERSION = 1;
- /** The storage version of AsterixDB stack. */
+ public static final String METADATA_ROOT = "root_metadata";
+ public static final String METADATA_FILE_NAME = ".metadata";
+
+ /**
+ * The storage version of AsterixDB related artifacts (e.g. log files, checkpoint files, etc..).
+ */
+ private static final int LOCAL_STORAGE_VERSION = 2;
+
+ /**
+ * The storage version of AsterixDB stack.
+ */
public static final int VERSION = LOCAL_STORAGE_VERSION + ITreeIndexFrame.Constants.VERSION;
+ /**
+ * The storage version in which the rebalance storage structure was introduced
+ */
+ public static final int REBALANCE_STORAGE_VERSION = 8;
+
private StorageConstants() {
}
}
\ No newline at end of file
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StoragePathUtil.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StoragePathUtil.java
index 027f72c..5110d74 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StoragePathUtil.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StoragePathUtil.java
@@ -19,8 +19,12 @@
package org.apache.asterix.common.utils;
import java.io.File;
+import java.nio.file.Paths;
+import java.util.function.Function;
import org.apache.asterix.common.cluster.ClusterPartition;
+import org.apache.asterix.common.storage.IndexPathElements;
+import org.apache.asterix.common.storage.ResourceReference;
import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
import org.apache.hyracks.algebricks.common.utils.Pair;
@@ -36,6 +40,7 @@
private static final Logger LOGGER = Logger.getLogger(StoragePathUtil.class.getName());
public static final String PARTITION_DIR_PREFIX = "partition_";
public static final String DATASET_INDEX_NAME_SEPARATOR = "_idx_";
+ private static Function<IndexPathElements, String> indexPathProvider;
private StoragePathUtil() {
}
@@ -69,8 +74,10 @@
}
private static String prepareFullIndexName(String datasetName, String idxName, long rebalanceCount) {
- return (rebalanceCount == 0 ? "" : rebalanceCount + File.separator) + datasetName + DATASET_INDEX_NAME_SEPARATOR
- + idxName;
+ if (indexPathProvider != null) {
+ return indexPathProvider.apply(new IndexPathElements(datasetName, idxName, String.valueOf(rebalanceCount)));
+ }
+ return datasetName + File.separator + rebalanceCount + File.separator + idxName;
}
public static int getPartitionNumFromName(String name) {
@@ -88,10 +95,7 @@
* @return the file relative path starting from the partition directory
*/
public static String getIndexFileRelativePath(String fileAbsolutePath) {
- String[] tokens = fileAbsolutePath.split(File.separator);
- //partition/dataverse/idx/fileName
- return tokens[tokens.length - 4] + File.separator + tokens[tokens.length - 3] + File.separator
- + tokens[tokens.length - 2] + File.separator + tokens[tokens.length - 1];
+ return ResourceReference.of(fileAbsolutePath).getRelativePath().toString();
}
/**
@@ -136,7 +140,10 @@
* @return The index name
*/
public static String getIndexNameFromPath(String path) {
- int idx = path.lastIndexOf(DATASET_INDEX_NAME_SEPARATOR);
- return idx != -1 ? path.substring(idx + DATASET_INDEX_NAME_SEPARATOR.length()) : path;
+ return Paths.get(path).getFileName().toString();
+ }
+
+ public static void setIndexPathProvider(Function<IndexPathElements, String> indexPathProvider) {
+ StoragePathUtil.indexPathProvider = indexPathProvider;
}
}
diff --git a/asterixdb/asterix-replication/pom.xml b/asterixdb/asterix-replication/pom.xml
index 3138806..f209aae 100644
--- a/asterixdb/asterix-replication/pom.xml
+++ b/asterixdb/asterix-replication/pom.xml
@@ -43,11 +43,6 @@
<version>${project.version}</version>
</dependency>
<dependency>
- <groupId>org.apache.asterix</groupId>
- <artifactId>asterix-metadata</artifactId>
- <version>${project.version}</version>
- </dependency>
- <dependency>
<groupId>org.apache.hyracks</groupId>
<artifactId>hyracks-api</artifactId>
</dependency>
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationChannel.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationChannel.java
index 9d8c351..3143284 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationChannel.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationChannel.java
@@ -54,7 +54,7 @@
import org.apache.asterix.common.replication.IReplicationStrategy;
import org.apache.asterix.common.replication.IReplicationThread;
import org.apache.asterix.common.replication.ReplicaEvent;
-import org.apache.asterix.common.storage.IndexFileProperties;
+import org.apache.asterix.common.storage.DatasetResourceReference;
import org.apache.asterix.common.transactions.IAppRuntimeContextProvider;
import org.apache.asterix.common.transactions.ILogManager;
import org.apache.asterix.common.transactions.LogRecord;
@@ -392,7 +392,7 @@
//start sending files
for (String filePath : filesList) {
// Send only files of datasets that are replciated.
- IndexFileProperties indexFileRef = localResourceRep.getIndexFileRef(filePath);
+ DatasetResourceReference indexFileRef = localResourceRep.getLocalResourceReference(filePath);
if (!repStrategy.isMatch(indexFileRef.getDatasetId())) {
continue;
}
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationManager.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationManager.java
index b0aa0fb..48c7083 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationManager.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationManager.java
@@ -62,7 +62,7 @@
import org.apache.asterix.common.replication.Replica.ReplicaState;
import org.apache.asterix.common.replication.ReplicaEvent;
import org.apache.asterix.common.replication.ReplicationJob;
-import org.apache.asterix.common.storage.IndexFileProperties;
+import org.apache.asterix.common.storage.DatasetResourceReference;
import org.apache.asterix.common.transactions.IAppRuntimeContextProvider;
import org.apache.asterix.common.transactions.ILogManager;
import org.apache.asterix.common.transactions.ILogRecord;
@@ -280,7 +280,7 @@
//all of the job's files belong to a single storage partition.
//get any of them to determine the partition from the file path.
String jobFile = job.getJobFiles().iterator().next();
- IndexFileProperties indexFileRef = localResourceRepo.getIndexFileRef(jobFile);
+ DatasetResourceReference indexFileRef = localResourceRepo.getLocalResourceReference(jobFile);
if (!replicationStrategy.isMatch(indexFileRef.getDatasetId())) {
return;
}
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/storage/LSMComponentProperties.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/storage/LSMComponentProperties.java
index a8b15d2..7ca6f2f 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/storage/LSMComponentProperties.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/storage/LSMComponentProperties.java
@@ -23,9 +23,11 @@
import java.io.File;
import java.io.IOException;
import java.io.OutputStream;
+import java.nio.file.Paths;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.asterix.common.ioopcallbacks.AbstractLSMIOOperationCallback;
+import org.apache.asterix.common.storage.ResourceReference;
import org.apache.asterix.replication.logging.TxnLogUtil;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexOperationContext;
@@ -93,18 +95,16 @@
return lsmCompProp;
}
- public String getMaskPath(ReplicaResourcesManager resourceManager) {
+ public String getMaskPath(ReplicaResourcesManager resourceManager) throws HyracksDataException {
if (maskPath == null) {
LSMIndexFileProperties afp = new LSMIndexFileProperties(this);
- //split the index file path to get the LSM component file name
- afp.splitFileName();
maskPath = getReplicaComponentPath(resourceManager) + File.separator + afp.getFileName()
+ ReplicaResourcesManager.LSM_COMPONENT_MASK_SUFFIX;
}
return maskPath;
}
- public String getReplicaComponentPath(ReplicaResourcesManager resourceManager) {
+ public String getReplicaComponentPath(ReplicaResourcesManager resourceManager) throws HyracksDataException {
if (replicaPath == null) {
LSMIndexFileProperties afp = new LSMIndexFileProperties(this);
replicaPath = resourceManager.getIndexPath(afp);
@@ -118,23 +118,10 @@
* @return a unique id based on the timestamp of the component
*/
public static String getLSMComponentID(String filePath) {
- String[] tokens = filePath.split(File.separator);
-
- int arraySize = tokens.length;
- String fileName = tokens[arraySize - 1];
- String idxName = tokens[arraySize - 2];
- String dataverse = tokens[arraySize - 3];
- String partitionName = tokens[arraySize - 4];
-
- StringBuilder componentId = new StringBuilder();
- componentId.append(partitionName);
- componentId.append(File.separator);
- componentId.append(dataverse);
- componentId.append(File.separator);
- componentId.append(idxName);
- componentId.append(File.separator);
- componentId.append(fileName.substring(0, fileName.lastIndexOf(AbstractLSMIndexFileManager.DELIMITER)));
- return componentId.toString();
+ final ResourceReference ref = ResourceReference.of(filePath);
+ final String fileUniqueTimestamp =
+ ref.getName().substring(0, ref.getName().lastIndexOf(AbstractLSMIndexFileManager.DELIMITER));
+ return Paths.get(ref.getRelativePath().toString(), fileUniqueTimestamp).toString();
}
public String getComponentId() {
@@ -149,18 +136,10 @@
return nodeId;
}
- public int getNumberOfFiles() {
- return numberOfFiles.get();
- }
-
public int markFileComplete() {
return numberOfFiles.decrementAndGet();
}
- public void setNumberOfFiles(AtomicInteger numberOfFiles) {
- this.numberOfFiles = numberOfFiles;
- }
-
public Long getReplicaLSN() {
return replicaLSN;
}
@@ -173,10 +152,6 @@
return opType;
}
- public void setOpType(LSMOperationType opType) {
- this.opType = opType;
- }
-
public String getNodeUniqueLSN() {
return TxnLogUtil.getNodeUniqueLSN(nodeId, originalLSN);
}
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/storage/LSMIndexFileProperties.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/storage/LSMIndexFileProperties.java
index eb9e82d..f2747fe 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/storage/LSMIndexFileProperties.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/storage/LSMIndexFileProperties.java
@@ -20,24 +20,18 @@
import java.io.DataInput;
import java.io.DataOutputStream;
-import java.io.File;
import java.io.IOException;
import java.io.OutputStream;
-
-import org.apache.asterix.common.utils.StoragePathUtil;
+import java.nio.file.Paths;
public class LSMIndexFileProperties {
- private String fileName;
private long fileSize;
private String nodeId;
- private String dataverse;
- private String idxName;
private boolean lsmComponentFile;
private String filePath;
private boolean requiresAck = false;
private long LSNByteOffset;
- private int partition;
public LSMIndexFileProperties() {
}
@@ -61,15 +55,6 @@
this.requiresAck = requiresAck;
}
- public void splitFileName() {
- String[] tokens = filePath.split(File.separator);
- int arraySize = tokens.length;
- this.fileName = tokens[arraySize - 1];
- this.idxName = tokens[arraySize - 2];
- this.dataverse = tokens[arraySize - 3];
- this.partition = StoragePathUtil.getPartitionNumFromName(tokens[arraySize - 4]);
- }
-
public void serialize(OutputStream out) throws IOException {
DataOutputStream dos = new DataOutputStream(out);
dos.writeUTF(nodeId);
@@ -100,26 +85,10 @@
return fileSize;
}
- public String getFileName() {
- return fileName;
- }
-
public String getNodeId() {
return nodeId;
}
- public String getDataverse() {
- return dataverse;
- }
-
- public void setDataverse(String dataverse) {
- this.dataverse = dataverse;
- }
-
- public String getIdxName() {
- return idxName;
- }
-
public boolean isLSMComponentFile() {
return lsmComponentFile;
}
@@ -128,16 +97,17 @@
return requiresAck;
}
+ public String getFileName() {
+ return Paths.get(filePath).toFile().getName();
+ }
+
@Override
public String toString() {
StringBuilder sb = new StringBuilder();
- sb.append("File Name: " + fileName + " ");
+ sb.append("File Path: " + filePath + " ");
sb.append("File Size: " + fileSize + " ");
sb.append("Node ID: " + nodeId + " ");
- sb.append("Partition: " + partition + " ");
- sb.append("IDX Name: " + idxName + " ");
sb.append("isLSMComponentFile : " + lsmComponentFile + " ");
- sb.append("Dataverse: " + dataverse);
sb.append("LSN Byte Offset: " + LSNByteOffset);
return sb.toString();
}
@@ -145,8 +115,4 @@
public long getLSNByteOffset() {
return LSNByteOffset;
}
-
- public int getPartition() {
- return partition;
- }
}
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/storage/ReplicaResourcesManager.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/storage/ReplicaResourcesManager.java
index cf8e001..7eea4a4 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/storage/ReplicaResourcesManager.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/storage/ReplicaResourcesManager.java
@@ -38,14 +38,14 @@
import java.util.logging.Logger;
import org.apache.asterix.common.cluster.ClusterPartition;
-import org.apache.asterix.common.config.ClusterProperties;
import org.apache.asterix.common.config.MetadataProperties;
import org.apache.asterix.common.replication.IReplicaResourcesManager;
+import org.apache.asterix.common.utils.StorageConstants;
import org.apache.asterix.common.utils.StoragePathUtil;
-import org.apache.asterix.metadata.utils.SplitsAndConstraintsUtil;
import org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository;
import org.apache.commons.io.FileUtils;
import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.io.FileReference;
import org.apache.hyracks.storage.common.ILocalResourceRepository;
import org.apache.hyracks.storage.common.LocalResource;
@@ -63,7 +63,7 @@
nodePartitions = metadataProperties.getNodePartitions();
}
- public void deleteIndexFile(LSMIndexFileProperties afp) {
+ public void deleteIndexFile(LSMIndexFileProperties afp) throws HyracksDataException {
String indexPath = getIndexPath(afp);
if (indexPath != null) {
if (afp.isLSMComponentFile()) {
@@ -78,20 +78,12 @@
}
}
- public String getIndexPath(LSMIndexFileProperties fileProperties) {
- fileProperties.splitFileName();
- //get partition path in this node
- String partitionPath = localRepository.getPartitionPath(fileProperties.getPartition());
- //get index path
- String indexPath = SplitsAndConstraintsUtil.getIndexPath(partitionPath, fileProperties.getPartition(),
- fileProperties.getDataverse(), fileProperties.getIdxName());
-
- Path path = Paths.get(indexPath);
- if (!Files.exists(path)) {
- File indexFolder = new File(indexPath);
- indexFolder.mkdirs();
+ public String getIndexPath(LSMIndexFileProperties fileProperties) throws HyracksDataException {
+ final FileReference indexPath = localRepository.getIndexPath(Paths.get(fileProperties.getFilePath()));
+ if (!indexPath.getFile().exists()) {
+ indexPath.getFile().mkdirs();
}
- return indexPath;
+ return indexPath.toString();
}
public void initializeReplicaIndexLSNMap(String indexPath, long currentLSN) throws IOException {
@@ -123,21 +115,21 @@
updateReplicaIndexLSNMap(lsmComponentProperties.getReplicaComponentPath(this), lsnMap);
}
- public Set<File> getReplicaIndexes(String replicaId) {
+ public Set<File> getReplicaIndexes(String replicaId) throws HyracksDataException {
Set<File> remoteIndexesPaths = new HashSet<File>();
ClusterPartition[] partitions = nodePartitions.get(replicaId);
for (ClusterPartition partition : partitions) {
- remoteIndexesPaths.addAll(getPartitionIndexes(partition.getPartitionId()));
+ remoteIndexesPaths.addAll(localRepository.getPartitionIndexes(partition.getPartitionId()));
}
return remoteIndexesPaths;
}
@Override
- public long getPartitionsMinLSN(Set<Integer> partitions) {
+ public long getPartitionsMinLSN(Set<Integer> partitions) throws HyracksDataException {
long minRemoteLSN = Long.MAX_VALUE;
for (Integer partition : partitions) {
//for every index in replica
- Set<File> remoteIndexes = getPartitionIndexes(partition);
+ Set<File> remoteIndexes = localRepository.getPartitionIndexes(partition);
for (File indexFolder : remoteIndexes) {
//read LSN map
try {
@@ -164,7 +156,7 @@
for (File indexFolder : remoteIndexes) {
if (getReplicaIndexMaxLSN(indexFolder) < targetLSN) {
File localResource = new File(
- indexFolder + File.separator + PersistentLocalResourceRepository.METADATA_FILE_NAME);
+ indexFolder + File.separator + StorageConstants.METADATA_FILE_NAME);
LocalResource resource = PersistentLocalResourceRepository.readLocalResource(localResource);
laggingReplicaIndexes.put(resource.getId(), indexFolder.getAbsolutePath());
}
@@ -190,7 +182,12 @@
public void cleanInvalidLSMComponents(String replicaId) {
//for every index in replica
- Set<File> remoteIndexes = getReplicaIndexes(replicaId);
+ Set<File> remoteIndexes = null;
+ try {
+ remoteIndexes = getReplicaIndexes(replicaId);
+ } catch (HyracksDataException e) {
+ throw new IllegalStateException(e);
+ }
for (File remoteIndexFile : remoteIndexes) {
//search for any mask
File[] masks = remoteIndexFile.listFiles(LSM_COMPONENTS_MASKS_FILTER);
@@ -241,41 +238,11 @@
/**
* @param partition
- * @return Set of file references to each index in the partition
- */
- public Set<File> getPartitionIndexes(int partition) {
- Set<File> partitionIndexes = new HashSet<File>();
- String storageDirName = ClusterProperties.INSTANCE.getStorageDirectoryName();
- String partitionStoragePath = localRepository.getPartitionPath(partition)
- + StoragePathUtil.prepareStoragePartitionPath(storageDirName, partition);
- File partitionRoot = new File(partitionStoragePath);
- if (partitionRoot.exists() && partitionRoot.isDirectory()) {
- File[] dataverseFileList = partitionRoot.listFiles();
- if (dataverseFileList != null) {
- for (File dataverseFile : dataverseFileList) {
- if (dataverseFile.isDirectory()) {
- File[] indexFileList = dataverseFile.listFiles();
- if (indexFileList != null) {
- for (File indexFile : indexFileList) {
- if (indexFile.isDirectory()) {
- partitionIndexes.add(indexFile);
- }
- }
- }
- }
- }
- }
- }
- return partitionIndexes;
- }
-
- /**
- * @param partition
* @return Absolute paths to all partition files
*/
- public List<String> getPartitionIndexesFiles(int partition, boolean relativePath) {
+ public List<String> getPartitionIndexesFiles(int partition, boolean relativePath) throws HyracksDataException {
List<String> partitionFiles = new ArrayList<String>();
- Set<File> partitionIndexes = getPartitionIndexes(partition);
+ Set<File> partitionIndexes = localRepository.getPartitionIndexes(partition);
for (File indexDir : partitionIndexes) {
if (indexDir.isDirectory()) {
File[] indexFiles = indexDir.listFiles(LSM_INDEX_FILES_FILTER);
@@ -284,8 +251,7 @@
if (!relativePath) {
partitionFiles.add(file.getAbsolutePath());
} else {
- partitionFiles.add(
- StoragePathUtil.getIndexFileRelativePath(file.getAbsolutePath()));
+ partitionFiles.add(StoragePathUtil.getIndexFileRelativePath(file.getAbsolutePath()));
}
}
}
@@ -311,7 +277,7 @@
private static final FilenameFilter LSM_INDEX_FILES_FILTER = new FilenameFilter() {
@Override
public boolean accept(File dir, String name) {
- return name.equalsIgnoreCase(PersistentLocalResourceRepository.METADATA_FILE_NAME) || !name.startsWith(".");
+ return name.equalsIgnoreCase(StorageConstants.METADATA_FILE_NAME) || !name.startsWith(".");
}
};
}
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
index db3647e..04cbef9 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
@@ -23,22 +23,26 @@
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
-import java.io.FilenameFilter;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
+import java.nio.file.Path;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.Set;
import java.util.SortedMap;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.Predicate;
import java.util.logging.Level;
import java.util.logging.Logger;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
import org.apache.asterix.common.cluster.ClusterPartition;
import org.apache.asterix.common.config.MetadataProperties;
@@ -47,7 +51,8 @@
import org.apache.asterix.common.exceptions.ErrorCode;
import org.apache.asterix.common.replication.IReplicationManager;
import org.apache.asterix.common.replication.ReplicationJob;
-import org.apache.asterix.common.storage.IndexFileProperties;
+import org.apache.asterix.common.storage.DatasetResourceReference;
+import org.apache.asterix.common.storage.ResourceReference;
import org.apache.asterix.common.utils.StorageConstants;
import org.apache.asterix.common.utils.StoragePathUtil;
import org.apache.commons.io.FileUtils;
@@ -68,15 +73,14 @@
public class PersistentLocalResourceRepository implements ILocalResourceRepository {
- // Public constants
- public static final String METADATA_FILE_NAME = ".metadata";
+ public static final Predicate<Path> INDEX_COMPONENTS = path -> !path.endsWith(StorageConstants.METADATA_FILE_NAME);
// Private constants
private static final Logger LOGGER = Logger.getLogger(PersistentLocalResourceRepository.class.getName());
private static final String STORAGE_METADATA_DIRECTORY = StorageConstants.METADATA_ROOT;
private static final String STORAGE_METADATA_FILE_NAME_PREFIX = "." + StorageConstants.METADATA_ROOT;
private static final int MAX_CACHED_RESOURCES = 1000;
- private static final FilenameFilter METADATA_FILES_FILTER =
- (File dir, String name) -> name.equalsIgnoreCase(METADATA_FILE_NAME);
+ public static final int RESOURCES_TREE_DEPTH_FROM_STORAGE_ROOT = 6;
+
// Finals
private final IIOManager ioManager;
private final String[] mountPoints;
@@ -157,8 +161,9 @@
//make dirs for the storage metadata file
boolean success = storageMetadataDir.mkdirs();
if (!success) {
- throw HyracksDataException.create(ErrorCode.ROOT_LOCAL_RESOURCE_COULD_NOT_BE_CREATED,
- getClass().getSimpleName(), storageMetadataDir.getAbsolutePath());
+ throw HyracksDataException
+ .create(ErrorCode.ROOT_LOCAL_RESOURCE_COULD_NOT_BE_CREATED, getClass().getSimpleName(),
+ storageMetadataDir.getAbsolutePath());
}
LOGGER.log(Level.INFO,
"created the root-metadata-file's directory: " + storageMetadataDir.getAbsolutePath());
@@ -198,8 +203,8 @@
throw HyracksDataException.create(CANNOT_CREATE_FILE, parent.getAbsolutePath());
}
- try (FileOutputStream fos = new FileOutputStream(resourceFile.getFile());
- ObjectOutputStream oosToFos = new ObjectOutputStream(fos)) {
+ try (FileOutputStream fos = new FileOutputStream(
+ resourceFile.getFile()); ObjectOutputStream oosToFos = new ObjectOutputStream(fos)) {
oosToFos.writeObject(resource);
oosToFos.flush();
} catch (IOException e) {
@@ -226,27 +231,23 @@
} finally {
// Regardless of successfully deleted or not, the operation should be replicated.
//if replication enabled, delete resource from remote replicas
- if (isReplicationEnabled
- && !resourceFile.getFile().getName().startsWith(STORAGE_METADATA_FILE_NAME_PREFIX)) {
+ if (isReplicationEnabled && !resourceFile.getFile().getName()
+ .startsWith(STORAGE_METADATA_FILE_NAME_PREFIX)) {
createReplicationJob(ReplicationOperation.DELETE, resourceFile);
}
}
} else {
- throw HyracksDataException.create(org.apache.hyracks.api.exceptions.ErrorCode.RESOURCE_DOES_NOT_EXIST,
- relativePath);
+ throw HyracksDataException
+ .create(org.apache.hyracks.api.exceptions.ErrorCode.RESOURCE_DOES_NOT_EXIST, relativePath);
}
}
private static FileReference getLocalResourceFileByName(IIOManager ioManager, String resourcePath)
throws HyracksDataException {
- String fileName = resourcePath + File.separator + METADATA_FILE_NAME;
+ String fileName = resourcePath + File.separator + StorageConstants.METADATA_FILE_NAME;
return ioManager.resolve(fileName);
}
-
- public Map<Long, LocalResource> loadAndGetAllResources() throws IOException {
- //TODO During recovery, the memory usage currently is proportional to the number of resources available.
- //This could be fixed by traversing all resources on disk until the required resource is found.
- LOGGER.log(Level.INFO, "Loading all resources");
+ public Map<Long, LocalResource> getResources(Predicate<LocalResource> filter) throws HyracksDataException {
Map<Long, LocalResource> resourcesMap = new HashMap<>();
for (int i = 0; i < mountPoints.length; i++) {
File storageRootDir = getStorageRootDirectoryIfExists(ioManager, nodeId, i);
@@ -255,108 +256,43 @@
continue;
}
LOGGER.log(Level.INFO, "Getting storage root dir returned " + storageRootDir.getAbsolutePath());
- //load all local resources.
- File[] partitions = storageRootDir.listFiles();
- LOGGER.log(Level.INFO, "Number of partitions found = " + partitions.length);
- for (File partition : partitions) {
- File[] dataverseFileList = partition.listFiles();
- LOGGER.log(Level.INFO, "Reading partition = " + partition.getName() + ". Number of dataverses found: "
- + dataverseFileList.length);
- if (dataverseFileList != null) {
- for (File dataverseFile : dataverseFileList) {
- loadDataverse(dataverseFile, resourcesMap);
+ try (Stream<Path> stream = Files.find(storageRootDir.toPath(), RESOURCES_TREE_DEPTH_FROM_STORAGE_ROOT,
+ (path, attr) -> path.getFileName().toString().equals(StorageConstants.METADATA_FILE_NAME))) {
+ final List<File> resourceMetadataFiles = stream.map(Path::toFile).collect(Collectors.toList());
+ for (File file : resourceMetadataFiles) {
+ final LocalResource localResource = PersistentLocalResourceRepository.readLocalResource(file);
+ if (filter.test(localResource)) {
+ resourcesMap.put(localResource.getId(), localResource);
}
}
+ } catch (IOException e) {
+ throw HyracksDataException.create(e);
}
}
return resourcesMap;
+
}
- private void loadDataverse(File dataverseFile, Map<Long, LocalResource> resourcesMap) throws HyracksDataException {
- LOGGER.log(Level.INFO, "Loading dataverse:" + dataverseFile.getName());
- if (dataverseFile.isDirectory()) {
- File[] indexFileList = dataverseFile.listFiles();
- if (indexFileList != null) {
- for (File indexFile : indexFileList) {
- loadIndex(indexFile, resourcesMap);
- }
- }
- }
- }
-
- private void loadIndex(File indexFile, Map<Long, LocalResource> resourcesMap) throws HyracksDataException {
- LOGGER.log(Level.INFO, "Loading index:" + indexFile.getName());
- if (indexFile.isDirectory()) {
- File[] metadataFiles = indexFile.listFiles(METADATA_FILES_FILTER);
- if (metadataFiles != null) {
- for (File metadataFile : metadataFiles) {
- LocalResource localResource = readLocalResource(metadataFile);
- LOGGER.log(Level.INFO, "Resource loaded " + localResource.getId() + ":" + localResource.getPath());
- resourcesMap.put(localResource.getId(), localResource);
- }
- }
- }
+ public Map<Long, LocalResource> loadAndGetAllResources() throws HyracksDataException {
+ return getResources(p -> true);
}
@Override
public long maxId() throws HyracksDataException {
- long maxResourceId = 0;
- for (int i = 0; i < mountPoints.length; i++) {
- File storageRootDir = getStorageRootDirectoryIfExists(ioManager, nodeId, i);
- if (storageRootDir == null) {
- continue;
- }
-
- //load all local resources.
- File[] partitions = storageRootDir.listFiles();
- for (File partition : partitions) {
- //traverse all local resources.
- File[] dataverseFileList = partition.listFiles();
- if (dataverseFileList != null) {
- for (File dataverseFile : dataverseFileList) {
- maxResourceId = getMaxResourceIdForDataverse(dataverseFile, maxResourceId);
- }
- }
- }
- }
- return maxResourceId;
- }
-
- private long getMaxResourceIdForDataverse(File dataverseFile, long maxSoFar) throws HyracksDataException {
- long maxResourceId = maxSoFar;
- if (dataverseFile.isDirectory()) {
- File[] indexFileList = dataverseFile.listFiles();
- if (indexFileList != null) {
- for (File indexFile : indexFileList) {
- maxResourceId = getMaxResourceIdForIndex(indexFile, maxResourceId);
- }
- }
- }
- return maxResourceId;
- }
-
- private long getMaxResourceIdForIndex(File indexFile, long maxSoFar) throws HyracksDataException {
- long maxResourceId = maxSoFar;
- if (indexFile.isDirectory()) {
- File[] metadataFiles = indexFile.listFiles(METADATA_FILES_FILTER);
- if (metadataFiles != null) {
- for (File metadataFile : metadataFiles) {
- LocalResource localResource = readLocalResource(metadataFile);
- maxResourceId = Math.max(maxResourceId, localResource.getId());
- }
- }
- }
- return maxResourceId;
+ final Map<Long, LocalResource> allResources = loadAndGetAllResources();
+ final Optional<Long> max = allResources.keySet().stream().max(Long::compare);
+ return max.isPresent() ? max.get() : 0;
}
private static String getFileName(String path) {
- return path.endsWith(File.separator) ? (path + METADATA_FILE_NAME)
- : (path + File.separator + METADATA_FILE_NAME);
+ return path.endsWith(File.separator) ?
+ (path + StorageConstants.METADATA_FILE_NAME) :
+ (path + File.separator + StorageConstants.METADATA_FILE_NAME);
}
public static LocalResource readLocalResource(File file) throws HyracksDataException {
- try (FileInputStream fis = new FileInputStream(file);
- ObjectInputStream oisFromFis = new ObjectInputStream(fis)) {
+ try (FileInputStream fis = new FileInputStream(file); ObjectInputStream oisFromFis = new ObjectInputStream(
+ fis)) {
LocalResource resource = (LocalResource) oisFromFis.readObject();
if (resource.getVersion() == ITreeIndexFrame.Constants.VERSION) {
return resource;
@@ -425,8 +361,9 @@
* @return A file reference to the storage metadata file.
*/
private static FileReference getStorageMetadataFile(IIOManager ioManager, String nodeId, int ioDeviceId) {
- String storageMetadataFileName = STORAGE_METADATA_DIRECTORY + File.separator + nodeId + "_" + "iodevice"
- + ioDeviceId + File.separator + STORAGE_METADATA_FILE_NAME_PREFIX;
+ String storageMetadataFileName =
+ STORAGE_METADATA_DIRECTORY + File.separator + nodeId + "_" + "iodevice" + ioDeviceId + File.separator
+ + STORAGE_METADATA_FILE_NAME_PREFIX;
return new FileReference(ioManager.getIODevices().get(ioDeviceId), storageMetadataFileName);
}
@@ -483,10 +420,6 @@
return Collections.unmodifiableSet(nodeInactivePartitions);
}
- public Set<Integer> getNodeOrignalPartitions() {
- return Collections.unmodifiableSet(nodeOriginalPartitions);
- }
-
public synchronized void addActivePartition(int partitonId) {
nodeActivePartitions.add(partitonId);
nodeInactivePartitions.remove(partitonId);
@@ -497,27 +430,43 @@
nodeActivePartitions.remove(partitonId);
}
- private static String getLocalResourceRelativePath(String absolutePath) {
- final String[] tokens = absolutePath.split(File.separator);
- // Format: storage_dir/partition/dataverse/idx
- return tokens[tokens.length - 5] + File.separator + tokens[tokens.length - 4] + File.separator
- + tokens[tokens.length - 3] + File.separator + tokens[tokens.length - 2];
+ public DatasetResourceReference getLocalResourceReference(String absoluteFilePath) throws HyracksDataException {
+ //TODO pass relative path
+ final String localResourcePath = StoragePathUtil.getIndexFileRelativePath(absoluteFilePath);
+ final LocalResource lr = get(localResourcePath);
+ return DatasetResourceReference.of(lr);
}
- public IndexFileProperties getIndexFileRef(String absoluteFilePath) throws HyracksDataException {
- //TODO pass relative path
- final String[] tokens = absoluteFilePath.split(File.separator);
- if (tokens.length < 5) {
- throw new HyracksDataException("Invalid file format");
+ /**
+ * Gets a set of files for the indexes in partition {@code partition}. Each file points
+ * the to where the index's files are stored.
+ *
+ * @param partition
+ * @return The set of indexes files
+ * @throws HyracksDataException
+ */
+ public Set<File> getPartitionIndexes(int partition) throws HyracksDataException {
+ final Map<Long, LocalResource> partitionResourcesMap = getResources(resource -> {
+ DatasetLocalResource dsResource = (DatasetLocalResource) resource.getResource();
+ return dsResource.getPartition() == partition;
+ });
+ Set<File> indexes = new HashSet<>();
+ for (LocalResource localResource : partitionResourcesMap.values()) {
+ indexes.add(ioManager.resolve(localResource.getPath()).getFile());
}
- String fileName = tokens[tokens.length - 1];
- String index = tokens[tokens.length - 2];
- String dataverse = tokens[tokens.length - 3];
- String partition = tokens[tokens.length - 4];
- int partitionId = StoragePathUtil.getPartitionNumFromName(partition);
- String relativePath = getLocalResourceRelativePath(absoluteFilePath);
- final LocalResource lr = get(relativePath);
- int datasetId = lr == null ? -1 : ((DatasetLocalResource) lr.getResource()).getDatasetId();
- return new IndexFileProperties(partitionId, dataverse, index, fileName, datasetId);
+ return indexes;
+ }
+
+ /**
+ * Given any index file, an absolute {@link FileReference} is returned which points to where the index of
+ * {@code indexFile} is located.
+ *
+ * @param indexFile
+ * @return
+ * @throws HyracksDataException
+ */
+ public FileReference getIndexPath(Path indexFile) throws HyracksDataException {
+ final ResourceReference ref = ResourceReference.of(indexFile.toString());
+ return ioManager.resolve(ref.getRelativePath().toString());
}
}
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/ReplicationCheckpointManager.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/ReplicationCheckpointManager.java
index 6ce543b..9f5b83c 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/ReplicationCheckpointManager.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/ReplicationCheckpointManager.java
@@ -119,7 +119,7 @@
return minFirstLSN;
}
- private long getDeadReplicasMinFirstLSN(Set<String> deadReplicaIds) {
+ private long getDeadReplicasMinFirstLSN(Set<String> deadReplicaIds) throws HyracksDataException {
final IReplicaResourcesManager remoteResourcesManager =
txnSubsystem.getAsterixAppRuntimeContextProvider().getAppContext().getReplicaResourcesManager();
final IApplicationContext propertiesProvider =
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/dataflow/BTreeResource.java b/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/dataflow/BTreeResource.java
index b9ad1b1..4cf145b 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/dataflow/BTreeResource.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/dataflow/BTreeResource.java
@@ -35,7 +35,7 @@
public class BTreeResource implements IResource {
private static final long serialVersionUID = 1L;
- private final String path;
+ private String path;
private final IStorageManager storageManager;
private final ITypeTraits[] typeTraits;
private final IBinaryComparatorFactory[] comparatorFactories;
@@ -63,4 +63,9 @@
public String getPath() {
return path;
}
+
+ @Override
+ public void setPath(String path) {
+ this.path = path;
+ }
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/dataflow/LsmResource.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/dataflow/LsmResource.java
index 6255c1d..b541750 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/dataflow/LsmResource.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/dataflow/LsmResource.java
@@ -18,13 +18,10 @@
*/
package org.apache.hyracks.storage.am.lsm.common.dataflow;
-import java.util.List;
import java.util.Map;
import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
import org.apache.hyracks.api.dataflow.value.ITypeTraits;
-import org.apache.hyracks.api.io.IIOManager;
-import org.apache.hyracks.api.io.IODeviceHandle;
import org.apache.hyracks.storage.am.common.api.IMetadataPageManagerFactory;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallbackFactory;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationSchedulerProvider;
@@ -43,7 +40,7 @@
public abstract class LsmResource implements IResource {
private static final long serialVersionUID = 1L;
- protected final String path;
+ protected String path;
protected final IStorageManager storageManager;
protected final ITypeTraits[] typeTraits;
protected final IBinaryComparatorFactory[] cmpFactories;
@@ -88,14 +85,8 @@
return path;
}
- public static int getIoDeviceNum(IIOManager ioManager, IODeviceHandle deviceHandle) {
- List<IODeviceHandle> ioDevices = ioManager.getIODevices();
- for (int i = 0; i < ioDevices.size(); i++) {
- IODeviceHandle device = ioDevices.get(i);
- if (device == deviceHandle) {
- return i;
- }
- }
- return -1;
+ @Override
+ public void setPath(String path) {
+ this.path = path;
}
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-rtree/src/main/java/org/apache/hyracks/storage/am/rtree/dataflow/RTreeResource.java b/hyracks-fullstack/hyracks/hyracks-storage-am-rtree/src/main/java/org/apache/hyracks/storage/am/rtree/dataflow/RTreeResource.java
index df4fbf2..f9eb844 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-rtree/src/main/java/org/apache/hyracks/storage/am/rtree/dataflow/RTreeResource.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-rtree/src/main/java/org/apache/hyracks/storage/am/rtree/dataflow/RTreeResource.java
@@ -35,7 +35,7 @@
public class RTreeResource implements IResource {
private static final long serialVersionUID = 1L;
- private final String path;
+ private String path;
private final IStorageManager storageManager;
private final ITypeTraits[] typeTraits;
private final IBinaryComparatorFactory[] comparatorFactories;
@@ -68,4 +68,8 @@
return path;
}
+ @Override
+ public void setPath(String path) {
+ this.path = path;
+ }
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/IResource.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/IResource.java
index bb27023..7b9166d 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/IResource.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/IResource.java
@@ -28,4 +28,11 @@
IIndex createInstance(INCServiceContext ncServiceCtx) throws HyracksDataException;
String getPath();
+
+ /**
+ * Sets the path of {@link IResource}.
+ *
+ * @param path
+ */
+ void setPath(String path);
}