[ASTERIXDB-3177][CONF] Store ingestion logs outside of data storage dir

- user model changes: no
- storage format changes: no
- interface changes: no

Details:

- Store ingestion logs on the first iodevice of each node rather
  than inside the data storage dir.
- Move columnar dataset rebalance test to rebalance test suite.

Change-Id: Ia5f836fa99f3c982b6420b092d40d88f5f6429c1
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/17517
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
Reviewed-by: Ali Alsuliman <ali.al.solaiman@gmail.com>
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/rebalance.xml b/asterixdb/asterix-app/src/test/resources/runtimets/rebalance.xml
index 511e27b..a4158ea 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/rebalance.xml
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/rebalance.xml
@@ -99,5 +99,10 @@
         <output-dir compare="Text">all_datasets</output-dir>
       </compilation-unit>
     </test-case>
+    <test-case FilePath="column">
+      <compilation-unit name="rebalance">
+        <output-dir compare="Text">rebalance</output-dir>
+      </compilation-unit>
+    </test-case>
   </test-group>
 </test-suite>
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
index 08f71f7..299e32a 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
@@ -16274,11 +16274,6 @@
         <output-dir compare="Text">secondary-index/create-index/after-upsert-with-meta</output-dir>
       </compilation-unit>
     </test-case>
-    <test-case FilePath="column">
-      <compilation-unit name="rebalance">
-        <output-dir compare="Text">rebalance</output-dir>
-      </compilation-unit>
-    </test-case>
   </test-group>
   <test-group name="copy">
     <test-case FilePath="copy">
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StorageConstants.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StorageConstants.java
index c26fe76..5d6322b 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StorageConstants.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StorageConstants.java
@@ -31,6 +31,7 @@
 public class StorageConstants {
 
     public static final String STORAGE_ROOT_DIR_NAME = "storage";
+    public static final String INGESTION_LOGS_DIR_NAME = "ingestion_logs";
     public static final String PARTITION_DIR_PREFIX = "partition_";
     /**
      * Any file that shares the same directory as the LSM index files must
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StoragePathUtil.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StoragePathUtil.java
index c87f368..a9ed066 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StoragePathUtil.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StoragePathUtil.java
@@ -30,6 +30,7 @@
 import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
 import org.apache.hyracks.algebricks.common.utils.Pair;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.io.DefaultIoDeviceFileSplit;
 import org.apache.hyracks.api.io.FileSplit;
 import org.apache.hyracks.api.io.IIOManager;
 import org.apache.hyracks.api.io.MappedFileSplit;
@@ -65,11 +66,19 @@
         return new MappedFileSplit(partition.getActiveNodeId(), relativePath, partition.getIODeviceNum());
     }
 
+    public static FileSplit getDefaultIoDeviceFileSpiltForNode(String nodeId, String relativePath) {
+        return new DefaultIoDeviceFileSplit(nodeId, relativePath);
+    }
+
     public static String prepareStoragePartitionPath(int partitonId) {
         return Paths.get(StorageConstants.STORAGE_ROOT_DIR_NAME, StorageConstants.PARTITION_DIR_PREFIX + partitonId)
                 .toString();
     }
 
+    public static String prepareIngestionLogPath() {
+        return Paths.get(StorageConstants.INGESTION_LOGS_DIR_NAME).toString();
+    }
+
     public static String prepareDataverseIndexName(DataverseName dataverseName, String datasetName, String idxName,
             long rebalanceCount) {
         return prepareDataverseComponentName(dataverseName, prepareFullIndexName(datasetName, idxName, rebalanceCount));
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/GenericAdapterFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/GenericAdapterFactory.java
index 3ac8a02..214f3b3 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/GenericAdapterFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/GenericAdapterFactory.java
@@ -94,7 +94,8 @@
         }
         if (isFeed) {
             if (feedLogManager == null) {
-                feedLogManager = FeedUtils.getFeedLogManager(ctx, partition, feedLogFileSplits);
+                feedLogManager =
+                        new FeedLogManager(feedLogFileSplits[partition].getFileReference(ctx.getIoManager()).getFile());
             }
             feedLogManager.touch();
         }
@@ -146,7 +147,7 @@
         this.isFeed = ExternalDataUtils.isFeed(configuration);
         if (isFeed) {
             //TODO(partitioning) make this code reuse DataPartitioningProvider
-            feedLogFileSplits = FeedUtils.splitsForAdapter(appCtx, ExternalDataUtils.getDatasetDataverse(configuration),
+            feedLogFileSplits = FeedUtils.splitsForAdapter(ExternalDataUtils.getDatasetDataverse(configuration),
                     ExternalDataUtils.getFeedName(configuration), dataSourceFactory.getPartitionConstraint());
         }
     }
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedUtils.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedUtils.java
index 0a91ae8..5baefcb 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedUtils.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedUtils.java
@@ -28,7 +28,6 @@
 import java.util.Random;
 import java.util.Set;
 
-import org.apache.asterix.common.cluster.ClusterPartition;
 import org.apache.asterix.common.dataflow.ICcApplicationContext;
 import org.apache.asterix.common.exceptions.AsterixException;
 import org.apache.asterix.common.exceptions.CompilationException;
@@ -45,11 +44,8 @@
 import org.apache.hyracks.api.application.IServiceContext;
 import org.apache.hyracks.api.comm.FrameHelper;
 import org.apache.hyracks.api.comm.VSizeFrame;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.api.io.FileReference;
 import org.apache.hyracks.api.io.FileSplit;
-import org.apache.hyracks.api.io.IIOManager;
 import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
 import org.apache.hyracks.util.IntSerDeUtils;
 
@@ -84,46 +80,28 @@
     private FeedUtils() {
     }
 
-    public static FileSplit splitsForAdapter(DataverseName dataverseName, String feedName, String nodeName,
-            ClusterPartition partition) {
+    private static FileSplit splitsForAdapter(DataverseName dataverseName, String feedName, String nodeName) {
         String relPathFile = StoragePathUtil.prepareDataverseComponentName(dataverseName, feedName);
-        String storagePartitionPath = StoragePathUtil.prepareStoragePartitionPath(partition.getPartitionId());
+        String storagePartitionPath = StoragePathUtil.prepareIngestionLogPath();
         // Note: feed adapter instances in a single node share the feed logger
-        // format: 'storage dir name'/partition_#/dataverse_part1[^dataverse_part2[...]]/feed/node
+        // format: 'ingestion logs dir name'/dataverse_part1[^dataverse_part2[...]]/feed/node
         File f = new File(storagePartitionPath + File.separator + relPathFile + File.separator + nodeName);
-        return StoragePathUtil.getFileSplitForClusterPartition(partition, f.getPath());
+        return StoragePathUtil.getDefaultIoDeviceFileSpiltForNode(nodeName, f.getPath());
     }
 
-    public static FileSplit[] splitsForAdapter(ICcApplicationContext appCtx, DataverseName dataverseName,
-            String feedName, AlgebricksPartitionConstraint partitionConstraints) throws AsterixException {
+    public static FileSplit[] splitsForAdapter(DataverseName dataverseName, String feedName,
+            AlgebricksPartitionConstraint partitionConstraints) throws AsterixException {
         if (partitionConstraints.getPartitionConstraintType() == PartitionConstraintType.COUNT) {
             throw new AsterixException("Can't create file splits for adapter with count partitioning constraints");
         }
         String[] locations = ((AlgebricksAbsolutePartitionConstraint) partitionConstraints).getLocations();
         List<FileSplit> splits = new ArrayList<>();
         for (String nd : locations) {
-            splits.add(splitsForAdapter(dataverseName, feedName, nd,
-                    appCtx.getClusterStateManager().getNodePartitions(nd)[0]));
+            splits.add(splitsForAdapter(dataverseName, feedName, nd));
         }
         return splits.toArray(new FileSplit[] {});
     }
 
-    public static FileReference getAbsoluteFileRef(String relativePath, int ioDeviceId, IIOManager ioManager) {
-        return ioManager.getFileReference(ioDeviceId, relativePath);
-    }
-
-    public static FeedLogManager getFeedLogManager(IHyracksTaskContext ctx, int partition,
-            FileSplit[] feedLogFileSplits) throws HyracksDataException {
-        return new FeedLogManager(
-                FeedUtils.getAbsoluteFileRef(feedLogFileSplits[partition].getPath(), 0, ctx.getIoManager()).getFile());
-    }
-
-    public static FeedLogManager getFeedLogManager(IHyracksTaskContext ctx, FileSplit feedLogFileSplit)
-            throws HyracksDataException {
-        return new FeedLogManager(
-                FeedUtils.getAbsoluteFileRef(feedLogFileSplit.getPath(), 0, ctx.getIoManager()).getFile());
-    }
-
     public static void processFeedMessage(ByteBuffer input, VSizeFrame message, FrameTupleAccessor fta)
             throws HyracksDataException {
         // read the message and reduce the number of tuples
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DataPartitioningProvider.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DataPartitioningProvider.java
index 7257958..63af664a 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DataPartitioningProvider.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DataPartitioningProvider.java
@@ -79,7 +79,7 @@
         AlgebricksAbsolutePartitionConstraint locations =
                 new AlgebricksAbsolutePartitionConstraint(nodes.toArray(new String[0]));
         FileSplit[] feedLogFileSplits =
-                FeedUtils.splitsForAdapter(appCtx, feed.getDataverseName(), feed.getFeedName(), locations);
+                FeedUtils.splitsForAdapter(feed.getDataverseName(), feed.getFeedName(), locations);
         Pair<IFileSplitProvider, AlgebricksPartitionConstraint> spC =
                 StoragePathUtil.splitProviderAndPartitionConstraints(feedLogFileSplits);
         int[][] partitionsMap = getOneToOnePartitionsMap(getLocationsCount(spC.second));
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/io/DefaultIoDeviceFileSplit.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/io/DefaultIoDeviceFileSplit.java
new file mode 100644
index 0000000..9f88f31
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/io/DefaultIoDeviceFileSplit.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.api.io;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+/**
+ * A FileSplit that is mapped to the default IO device
+ */
+public class DefaultIoDeviceFileSplit extends MappedFileSplit {
+
+    public static final int DEFAULT_IO_DEVICE_IDX = 0;
+    private static final long serialVersionUID = 1L;
+    private transient FileReference cached;
+
+    /**
+     * Construct a managed File split that is mapped to the default IO device
+     * @param node
+     * @param path
+     */
+    public DefaultIoDeviceFileSplit(String node, String path) {
+        super(node, path, DEFAULT_IO_DEVICE_IDX);
+    }
+
+    @Override
+    public FileReference getFileReference(IIOManager ioManager) throws HyracksDataException {
+        if (cached == null) {
+            cached = ioManager.getFileReference(DEFAULT_IO_DEVICE_IDX, getPath());
+        }
+        return cached;
+    }
+}