[ASTERIXDB-3177][CONF] Store ingestion logs outside of data storage dir
- user model changes: no
- storage format changes: no
- interface changes: no
Details:
- Store ingestion logs on the first iodevice of each node rather
than inside the data storage dir.
- Move columnar dataset rebalance test to rebalance test suite.
Change-Id: Ia5f836fa99f3c982b6420b092d40d88f5f6429c1
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/17517
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
Reviewed-by: Ali Alsuliman <ali.al.solaiman@gmail.com>
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/rebalance.xml b/asterixdb/asterix-app/src/test/resources/runtimets/rebalance.xml
index 511e27b..a4158ea 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/rebalance.xml
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/rebalance.xml
@@ -99,5 +99,10 @@
<output-dir compare="Text">all_datasets</output-dir>
</compilation-unit>
</test-case>
+ <test-case FilePath="column">
+ <compilation-unit name="rebalance">
+ <output-dir compare="Text">rebalance</output-dir>
+ </compilation-unit>
+ </test-case>
</test-group>
</test-suite>
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
index 08f71f7..299e32a 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
@@ -16274,11 +16274,6 @@
<output-dir compare="Text">secondary-index/create-index/after-upsert-with-meta</output-dir>
</compilation-unit>
</test-case>
- <test-case FilePath="column">
- <compilation-unit name="rebalance">
- <output-dir compare="Text">rebalance</output-dir>
- </compilation-unit>
- </test-case>
</test-group>
<test-group name="copy">
<test-case FilePath="copy">
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StorageConstants.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StorageConstants.java
index c26fe76..5d6322b 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StorageConstants.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StorageConstants.java
@@ -31,6 +31,7 @@
public class StorageConstants {
public static final String STORAGE_ROOT_DIR_NAME = "storage";
+ public static final String INGESTION_LOGS_DIR_NAME = "ingestion_logs";
public static final String PARTITION_DIR_PREFIX = "partition_";
/**
* Any file that shares the same directory as the LSM index files must
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StoragePathUtil.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StoragePathUtil.java
index c87f368..a9ed066 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StoragePathUtil.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StoragePathUtil.java
@@ -30,6 +30,7 @@
import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
import org.apache.hyracks.algebricks.common.utils.Pair;
import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.io.DefaultIoDeviceFileSplit;
import org.apache.hyracks.api.io.FileSplit;
import org.apache.hyracks.api.io.IIOManager;
import org.apache.hyracks.api.io.MappedFileSplit;
@@ -65,11 +66,19 @@
return new MappedFileSplit(partition.getActiveNodeId(), relativePath, partition.getIODeviceNum());
}
+ public static FileSplit getDefaultIoDeviceFileSpiltForNode(String nodeId, String relativePath) {
+ return new DefaultIoDeviceFileSplit(nodeId, relativePath);
+ }
+
public static String prepareStoragePartitionPath(int partitonId) {
return Paths.get(StorageConstants.STORAGE_ROOT_DIR_NAME, StorageConstants.PARTITION_DIR_PREFIX + partitonId)
.toString();
}
+ public static String prepareIngestionLogPath() {
+ return Paths.get(StorageConstants.INGESTION_LOGS_DIR_NAME).toString();
+ }
+
public static String prepareDataverseIndexName(DataverseName dataverseName, String datasetName, String idxName,
long rebalanceCount) {
return prepareDataverseComponentName(dataverseName, prepareFullIndexName(datasetName, idxName, rebalanceCount));
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/GenericAdapterFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/GenericAdapterFactory.java
index 3ac8a02..214f3b3 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/GenericAdapterFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/GenericAdapterFactory.java
@@ -94,7 +94,8 @@
}
if (isFeed) {
if (feedLogManager == null) {
- feedLogManager = FeedUtils.getFeedLogManager(ctx, partition, feedLogFileSplits);
+ feedLogManager =
+ new FeedLogManager(feedLogFileSplits[partition].getFileReference(ctx.getIoManager()).getFile());
}
feedLogManager.touch();
}
@@ -146,7 +147,7 @@
this.isFeed = ExternalDataUtils.isFeed(configuration);
if (isFeed) {
//TODO(partitioning) make this code reuse DataPartitioningProvider
- feedLogFileSplits = FeedUtils.splitsForAdapter(appCtx, ExternalDataUtils.getDatasetDataverse(configuration),
+ feedLogFileSplits = FeedUtils.splitsForAdapter(ExternalDataUtils.getDatasetDataverse(configuration),
ExternalDataUtils.getFeedName(configuration), dataSourceFactory.getPartitionConstraint());
}
}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedUtils.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedUtils.java
index 0a91ae8..5baefcb 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedUtils.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedUtils.java
@@ -28,7 +28,6 @@
import java.util.Random;
import java.util.Set;
-import org.apache.asterix.common.cluster.ClusterPartition;
import org.apache.asterix.common.dataflow.ICcApplicationContext;
import org.apache.asterix.common.exceptions.AsterixException;
import org.apache.asterix.common.exceptions.CompilationException;
@@ -45,11 +44,8 @@
import org.apache.hyracks.api.application.IServiceContext;
import org.apache.hyracks.api.comm.FrameHelper;
import org.apache.hyracks.api.comm.VSizeFrame;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.api.io.FileReference;
import org.apache.hyracks.api.io.FileSplit;
-import org.apache.hyracks.api.io.IIOManager;
import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
import org.apache.hyracks.util.IntSerDeUtils;
@@ -84,46 +80,28 @@
private FeedUtils() {
}
- public static FileSplit splitsForAdapter(DataverseName dataverseName, String feedName, String nodeName,
- ClusterPartition partition) {
+ private static FileSplit splitsForAdapter(DataverseName dataverseName, String feedName, String nodeName) {
String relPathFile = StoragePathUtil.prepareDataverseComponentName(dataverseName, feedName);
- String storagePartitionPath = StoragePathUtil.prepareStoragePartitionPath(partition.getPartitionId());
+ String storagePartitionPath = StoragePathUtil.prepareIngestionLogPath();
// Note: feed adapter instances in a single node share the feed logger
- // format: 'storage dir name'/partition_#/dataverse_part1[^dataverse_part2[...]]/feed/node
+ // format: 'ingestion logs dir name'/dataverse_part1[^dataverse_part2[...]]/feed/node
File f = new File(storagePartitionPath + File.separator + relPathFile + File.separator + nodeName);
- return StoragePathUtil.getFileSplitForClusterPartition(partition, f.getPath());
+ return StoragePathUtil.getDefaultIoDeviceFileSpiltForNode(nodeName, f.getPath());
}
- public static FileSplit[] splitsForAdapter(ICcApplicationContext appCtx, DataverseName dataverseName,
- String feedName, AlgebricksPartitionConstraint partitionConstraints) throws AsterixException {
+ public static FileSplit[] splitsForAdapter(DataverseName dataverseName, String feedName,
+ AlgebricksPartitionConstraint partitionConstraints) throws AsterixException {
if (partitionConstraints.getPartitionConstraintType() == PartitionConstraintType.COUNT) {
throw new AsterixException("Can't create file splits for adapter with count partitioning constraints");
}
String[] locations = ((AlgebricksAbsolutePartitionConstraint) partitionConstraints).getLocations();
List<FileSplit> splits = new ArrayList<>();
for (String nd : locations) {
- splits.add(splitsForAdapter(dataverseName, feedName, nd,
- appCtx.getClusterStateManager().getNodePartitions(nd)[0]));
+ splits.add(splitsForAdapter(dataverseName, feedName, nd));
}
return splits.toArray(new FileSplit[] {});
}
- public static FileReference getAbsoluteFileRef(String relativePath, int ioDeviceId, IIOManager ioManager) {
- return ioManager.getFileReference(ioDeviceId, relativePath);
- }
-
- public static FeedLogManager getFeedLogManager(IHyracksTaskContext ctx, int partition,
- FileSplit[] feedLogFileSplits) throws HyracksDataException {
- return new FeedLogManager(
- FeedUtils.getAbsoluteFileRef(feedLogFileSplits[partition].getPath(), 0, ctx.getIoManager()).getFile());
- }
-
- public static FeedLogManager getFeedLogManager(IHyracksTaskContext ctx, FileSplit feedLogFileSplit)
- throws HyracksDataException {
- return new FeedLogManager(
- FeedUtils.getAbsoluteFileRef(feedLogFileSplit.getPath(), 0, ctx.getIoManager()).getFile());
- }
-
public static void processFeedMessage(ByteBuffer input, VSizeFrame message, FrameTupleAccessor fta)
throws HyracksDataException {
// read the message and reduce the number of tuples
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DataPartitioningProvider.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DataPartitioningProvider.java
index 7257958..63af664a 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DataPartitioningProvider.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DataPartitioningProvider.java
@@ -79,7 +79,7 @@
AlgebricksAbsolutePartitionConstraint locations =
new AlgebricksAbsolutePartitionConstraint(nodes.toArray(new String[0]));
FileSplit[] feedLogFileSplits =
- FeedUtils.splitsForAdapter(appCtx, feed.getDataverseName(), feed.getFeedName(), locations);
+ FeedUtils.splitsForAdapter(feed.getDataverseName(), feed.getFeedName(), locations);
Pair<IFileSplitProvider, AlgebricksPartitionConstraint> spC =
StoragePathUtil.splitProviderAndPartitionConstraints(feedLogFileSplits);
int[][] partitionsMap = getOneToOnePartitionsMap(getLocationsCount(spC.second));
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/io/DefaultIoDeviceFileSplit.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/io/DefaultIoDeviceFileSplit.java
new file mode 100644
index 0000000..9f88f31
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/io/DefaultIoDeviceFileSplit.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.api.io;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+/**
+ * A FileSplit that is mapped to the default IO device
+ */
+public class DefaultIoDeviceFileSplit extends MappedFileSplit {
+
+ public static final int DEFAULT_IO_DEVICE_IDX = 0;
+ private static final long serialVersionUID = 1L;
+ private transient FileReference cached;
+
+ /**
+ * Construct a managed File split that is mapped to the default IO device
+ * @param node
+ * @param path
+ */
+ public DefaultIoDeviceFileSplit(String node, String path) {
+ super(node, path, DEFAULT_IO_DEVICE_IDX);
+ }
+
+ @Override
+ public FileReference getFileReference(IIOManager ioManager) throws HyracksDataException {
+ if (cached == null) {
+ cached = ioManager.getFileReference(DEFAULT_IO_DEVICE_IDX, getPath());
+ }
+ return cached;
+ }
+}