[ASTERIXDB-3144][RT] Implement Static Partitioning
- user model changes: no
- storage format changes: no
- interface changes: yes
Details:
- Implement static partitioning based on storage/compute
partitions map.
- Fixes for LSMPrimaryInsertOperatorNodePushable state
keeping for working on multiple storage partitions.
Change-Id: Ieca7ffb0f48e16fba4dc5beb0868c1ef8ac9245e
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/17509
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
Reviewed-by: Ali Alsuliman <ali.al.solaiman@gmail.com>
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/CcApplicationContext.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/CcApplicationContext.java
index a2d99a0..e4247f0 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/CcApplicationContext.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/CcApplicationContext.java
@@ -157,7 +157,7 @@
requestTracker = new RequestTracker(this);
configValidator = configValidatorFactory.create();
this.adapterFactoryService = adapterFactoryService;
- dataPartitioningProvider = new DataPartitioningProvider(this);
+ dataPartitioningProvider = DataPartitioningProvider.create(this);
}
@Override
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/runtime/ClusterStateManagerTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/runtime/ClusterStateManagerTest.java
index a2a3b48..53b9294 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/runtime/ClusterStateManagerTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/runtime/ClusterStateManagerTest.java
@@ -33,8 +33,10 @@
import org.apache.asterix.common.cluster.ClusterPartition;
import org.apache.asterix.common.cluster.IGlobalRecoveryManager;
import org.apache.asterix.common.config.MetadataProperties;
+import org.apache.asterix.common.config.StorageProperties;
import org.apache.asterix.common.metadata.IMetadataBootstrap;
import org.apache.asterix.common.utils.NcLocalCounters;
+import org.apache.asterix.common.utils.PartitioningScheme;
import org.apache.asterix.hyracks.bootstrap.CCApplication;
import org.apache.asterix.runtime.transaction.ResourceIdManager;
import org.apache.asterix.runtime.utils.BulkTxnIdFactory;
@@ -231,6 +233,9 @@
MetadataProperties metadataProperties = mockMetadataProperties();
Mockito.when(ccApplicationContext.getMetadataProperties()).thenReturn(metadataProperties);
+ StorageProperties storageProperties = mockStorageProperties();
+ Mockito.when(ccApplicationContext.getStorageProperties()).thenReturn(storageProperties);
+
ResourceIdManager resourceIdManager = new ResourceIdManager(csm);
Mockito.when(ccApplicationContext.getResourceIdManager()).thenReturn(resourceIdManager);
@@ -258,6 +263,12 @@
return metadataProperties;
}
+ private StorageProperties mockStorageProperties() {
+ StorageProperties storageProperties = Mockito.mock(StorageProperties.class);
+ Mockito.when(storageProperties.getPartitioningScheme()).thenReturn(PartitioningScheme.DYNAMIC);
+ return storageProperties;
+ }
+
private NcLocalCounters mockLocalCounters() {
final NcLocalCounters localCounters = Mockito.mock(NcLocalCounters.class);
Mockito.when(localCounters.getMaxJobId()).thenReturn(1000L);
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/ComputePartition.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/ComputePartition.java
new file mode 100644
index 0000000..1d11c30
--- /dev/null
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/ComputePartition.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.cluster;
+
+public class ComputePartition {
+ private final String nodeId;
+ private final int id;
+
+ public ComputePartition(int id, String nodeId) {
+ this.id = id;
+ this.nodeId = nodeId;
+ }
+
+ public String getNodeId() {
+ return nodeId;
+ }
+
+ public int getId() {
+ return id;
+ }
+}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/SplitComputeLocations.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/SplitComputeLocations.java
new file mode 100644
index 0000000..b58c39f
--- /dev/null
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/SplitComputeLocations.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.cluster;
+
+import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
+import org.apache.hyracks.dataflow.std.file.IFileSplitProvider;
+
+public class SplitComputeLocations {
+
+ private final IFileSplitProvider splitsProvider;
+ private final AlgebricksPartitionConstraint constraints;
+
+ public SplitComputeLocations(IFileSplitProvider splitsProvider, AlgebricksPartitionConstraint constraints) {
+ this.splitsProvider = splitsProvider;
+ this.constraints = constraints;
+ }
+
+ public IFileSplitProvider getSplitsProvider() {
+ return splitsProvider;
+ }
+
+ public AlgebricksPartitionConstraint getConstraints() {
+ return constraints;
+ }
+}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/StorageComputePartitionsMap.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/StorageComputePartitionsMap.java
new file mode 100644
index 0000000..874371e
--- /dev/null
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/StorageComputePartitionsMap.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.cluster;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.asterix.common.utils.StorageConstants;
+
+public class StorageComputePartitionsMap {
+
+ private final Map<Integer, ComputePartition> stoToComputeLocation = new HashMap<>();
+
+ public void addStoragePartition(int stoPart, ComputePartition compute) {
+ stoToComputeLocation.put(stoPart, compute);
+ }
+
+ public int[][] getComputeToStorageMap(boolean metadataDataset) {
+ Map<Integer, List<Integer>> computeToStoragePartitions = new HashMap<>();
+ if (metadataDataset) {
+ final int computePartitionIdForMetadata = 0;
+ computeToStoragePartitions.put(computePartitionIdForMetadata,
+ Collections.singletonList(computePartitionIdForMetadata));
+ } else {
+ for (int i = 0; i < StorageConstants.NUM_STORAGE_PARTITIONS; i++) {
+ ComputePartition computePartition = getComputePartition(i);
+ int computeId = computePartition.getId();
+ List<Integer> storagePartitions =
+ computeToStoragePartitions.computeIfAbsent(computeId, k -> new ArrayList<>());
+ storagePartitions.add(i);
+ }
+ }
+ int[][] computerToStoArray = new int[computeToStoragePartitions.size()][];
+ for (Map.Entry<Integer, List<Integer>> integerListEntry : computeToStoragePartitions.entrySet()) {
+ computerToStoArray[integerListEntry.getKey()] =
+ integerListEntry.getValue().stream().mapToInt(i -> i).toArray();
+ }
+ return computerToStoArray;
+ }
+
+ public ComputePartition getComputePartition(int storagePartition) {
+ return stoToComputeLocation.get(storagePartition);
+ }
+
+ public static StorageComputePartitionsMap computePartitionsMap(IClusterStateManager clusterStateManager) {
+ ClusterPartition metadataPartition = clusterStateManager.getMetadataPartition();
+ Map<Integer, ClusterPartition> clusterPartitions = clusterStateManager.getClusterPartitions();
+ StorageComputePartitionsMap newMap = new StorageComputePartitionsMap();
+ newMap.addStoragePartition(metadataPartition.getPartitionId(),
+ new ComputePartition(metadataPartition.getPartitionId(), metadataPartition.getActiveNodeId()));
+ int storagePartitionsPerComputePartition = StorageConstants.NUM_STORAGE_PARTITIONS / clusterPartitions.size();
+ int storagePartitionId = 0;
+ int lastComputePartition = 1;
+ int remainingStoragePartition = StorageConstants.NUM_STORAGE_PARTITIONS % clusterPartitions.size();
+ for (Map.Entry<Integer, ClusterPartition> cp : clusterPartitions.entrySet()) {
+ ClusterPartition clusterPartition = cp.getValue();
+ for (int i = 0; i < storagePartitionsPerComputePartition; i++) {
+ newMap.addStoragePartition(storagePartitionId,
+ new ComputePartition(clusterPartition.getPartitionId(), clusterPartition.getActiveNodeId()));
+ storagePartitionId++;
+ }
+ if (lastComputePartition == clusterPartitions.size() && remainingStoragePartition != 0) {
+ // assign all remaining partitions to last compute partition
+ for (int k = 0; k < remainingStoragePartition; k++) {
+ newMap.addStoragePartition(storagePartitionId, new ComputePartition(
+ clusterPartition.getPartitionId(), clusterPartition.getActiveNodeId()));
+ storagePartitionId++;
+ }
+ }
+ lastComputePartition++;
+ }
+ return newMap;
+ }
+}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StorageConstants.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StorageConstants.java
index 2d231d3..c26fe76 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StorageConstants.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StorageConstants.java
@@ -46,6 +46,8 @@
public static final String DEFAULT_COMPACTION_POLICY_NAME = ConcurrentMergePolicyFactory.NAME;
public static final String DEFAULT_FILTERED_DATASET_COMPACTION_POLICY_NAME = "correlated-prefix";
public static final Map<String, String> DEFAULT_COMPACTION_POLICY_PROPERTIES;
+ public static final int METADATA_PARTITION = -1;
+ public static final int NUM_STORAGE_PARTITIONS = 8;
/**
* The storage version of AsterixDB related artifacts (e.g. log files, checkpoint files, etc..).
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/GenericAdapterFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/GenericAdapterFactory.java
index 9ca87b8..3ac8a02 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/GenericAdapterFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/GenericAdapterFactory.java
@@ -145,6 +145,7 @@
throws HyracksDataException, AlgebricksException {
this.isFeed = ExternalDataUtils.isFeed(configuration);
if (isFeed) {
+ //TODO(partitioning) make this code reuse DataPartitioningProvider
feedLogFileSplits = FeedUtils.splitsForAdapter(appCtx, ExternalDataUtils.getDatasetDataverse(configuration),
ExternalDataUtils.getFeedName(configuration), dataSourceFactory.getPartitionConstraint());
}
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
index a8df92a..0b011d0 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
@@ -933,7 +933,8 @@
public FileSplit[] splitsForIndex(MetadataTransactionContext mdTxnCtx, Dataset dataset, String indexName)
throws AlgebricksException {
- return SplitsAndConstraintsUtil.getIndexSplits(dataset, indexName, mdTxnCtx, appCtx.getClusterStateManager());
+ return dataPartitioningProvider.getPartitioningProperties(mdTxnCtx, dataset, indexName).getSpiltsProvider()
+ .getFileSplits();
}
public DatasourceAdapter getAdapter(MetadataTransactionContext mdTxnCtx, DataverseName dataverseName,
@@ -1788,8 +1789,6 @@
}
public PartitioningProperties getPartitioningProperties(Dataset ds, String indexName) throws AlgebricksException {
- //TODO(partitioning) pass splits rather than mdTxnCtx?
- // FileSplit[] splits = splitsForIndex(mdTxnCtx, ds, indexName);
return dataPartitioningProvider.getPartitioningProperties(mdTxnCtx, ds, indexName);
}
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DataPartitioningProvider.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DataPartitioningProvider.java
index ec4c985..7257958 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DataPartitioningProvider.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DataPartitioningProvider.java
@@ -18,9 +18,6 @@
*/
package org.apache.asterix.metadata.utils;
-import static org.apache.asterix.common.utils.PartitioningScheme.DYNAMIC;
-import static org.apache.asterix.common.utils.PartitioningScheme.STATIC;
-
import java.util.Arrays;
import java.util.Set;
import java.util.TreeSet;
@@ -31,78 +28,71 @@
import org.apache.asterix.common.dataflow.IDataPartitioningProvider;
import org.apache.asterix.common.exceptions.AsterixException;
import org.apache.asterix.common.metadata.DataverseName;
+import org.apache.asterix.common.metadata.MetadataIndexImmutableProperties;
import org.apache.asterix.common.utils.PartitioningScheme;
+import org.apache.asterix.common.utils.StorageConstants;
import org.apache.asterix.common.utils.StoragePathUtil;
import org.apache.asterix.external.util.FeedUtils;
import org.apache.asterix.metadata.MetadataTransactionContext;
import org.apache.asterix.metadata.entities.Dataset;
import org.apache.asterix.metadata.entities.Feed;
+import org.apache.asterix.runtime.utils.ClusterStateManager;
import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
import org.apache.hyracks.algebricks.common.constraints.AlgebricksCountPartitionConstraint;
import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
-import org.apache.hyracks.algebricks.common.exceptions.NotImplementedException;
import org.apache.hyracks.algebricks.common.utils.Pair;
import org.apache.hyracks.api.io.FileSplit;
import org.apache.hyracks.dataflow.std.file.IFileSplitProvider;
-public class DataPartitioningProvider implements IDataPartitioningProvider {
+public abstract class DataPartitioningProvider implements IDataPartitioningProvider {
- private final ICcApplicationContext appCtx;
- private final PartitioningScheme scheme;
+ protected final ICcApplicationContext appCtx;
+ protected final ClusterStateManager clusterStateManager;
- public DataPartitioningProvider(ICcApplicationContext appCtx) {
+ DataPartitioningProvider(ICcApplicationContext appCtx) {
this.appCtx = appCtx;
- scheme = appCtx.getStorageProperties().getPartitioningScheme();
+ this.clusterStateManager = (ClusterStateManager) appCtx.getClusterStateManager();
}
- public PartitioningProperties getPartitioningProperties(DataverseName dataverseName) {
- if (scheme == DYNAMIC) {
- Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraints = SplitsAndConstraintsUtil
- .getDataverseSplitProviderAndConstraints(appCtx.getClusterStateManager(), dataverseName);
- int[][] partitionsMap = getPartitionsMap(getNumPartitions(splitsAndConstraints.second));
- return PartitioningProperties.of(splitsAndConstraints.first, splitsAndConstraints.second, partitionsMap);
- } else if (scheme == STATIC) {
- throw new NotImplementedException();
+ public static DataPartitioningProvider create(ICcApplicationContext appCtx) {
+ PartitioningScheme partitioningScheme = appCtx.getStorageProperties().getPartitioningScheme();
+ switch (partitioningScheme) {
+ case DYNAMIC:
+ return new DynamicDataPartitioningProvider(appCtx);
+ case STATIC:
+ return new StaticDataPartitioningProvider(appCtx);
+ default:
+ throw new IllegalStateException("unknown partitioning scheme: " + partitioningScheme);
}
- throw new IllegalStateException();
}
- public PartitioningProperties getPartitioningProperties(MetadataTransactionContext mdTxnCtx, Dataset ds,
- String indexName) throws AlgebricksException {
- if (scheme == DYNAMIC) {
- FileSplit[] splits =
- SplitsAndConstraintsUtil.getIndexSplits(ds, indexName, mdTxnCtx, appCtx.getClusterStateManager());
- Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraints =
- StoragePathUtil.splitProviderAndPartitionConstraints(splits);
- int[][] partitionsMap = getPartitionsMap(getNumPartitions(splitsAndConstraints.second));
- return PartitioningProperties.of(splitsAndConstraints.first, splitsAndConstraints.second, partitionsMap);
- } else if (scheme == STATIC) {
- throw new NotImplementedException();
- }
- throw new IllegalStateException();
- }
+ public abstract PartitioningProperties getPartitioningProperties(DataverseName dataverseName);
+
+ public abstract PartitioningProperties getPartitioningProperties(MetadataTransactionContext mdTxnCtx, Dataset ds,
+ String indexName) throws AlgebricksException;
public PartitioningProperties getPartitioningProperties(Feed feed) throws AsterixException {
- if (scheme == DYNAMIC) {
- IClusterStateManager csm = appCtx.getClusterStateManager();
- AlgebricksAbsolutePartitionConstraint allCluster = csm.getClusterLocations();
- Set<String> nodes = new TreeSet<>(Arrays.asList(allCluster.getLocations()));
- AlgebricksAbsolutePartitionConstraint locations =
- new AlgebricksAbsolutePartitionConstraint(nodes.toArray(new String[0]));
- FileSplit[] feedLogFileSplits =
- FeedUtils.splitsForAdapter(appCtx, feed.getDataverseName(), feed.getFeedName(), locations);
- Pair<IFileSplitProvider, AlgebricksPartitionConstraint> spC =
- StoragePathUtil.splitProviderAndPartitionConstraints(feedLogFileSplits);
- int[][] partitionsMap = getPartitionsMap(getNumPartitions(spC.second));
- return PartitioningProperties.of(spC.first, spC.second, partitionsMap);
- } else if (scheme == STATIC) {
- throw new NotImplementedException();
- }
- throw new IllegalStateException();
+ IClusterStateManager csm = appCtx.getClusterStateManager();
+ AlgebricksAbsolutePartitionConstraint allCluster = csm.getClusterLocations();
+ Set<String> nodes = new TreeSet<>(Arrays.asList(allCluster.getLocations()));
+ AlgebricksAbsolutePartitionConstraint locations =
+ new AlgebricksAbsolutePartitionConstraint(nodes.toArray(new String[0]));
+ FileSplit[] feedLogFileSplits =
+ FeedUtils.splitsForAdapter(appCtx, feed.getDataverseName(), feed.getFeedName(), locations);
+ Pair<IFileSplitProvider, AlgebricksPartitionConstraint> spC =
+ StoragePathUtil.splitProviderAndPartitionConstraints(feedLogFileSplits);
+ int[][] partitionsMap = getOneToOnePartitionsMap(getLocationsCount(spC.second));
+ return PartitioningProperties.of(spC.first, spC.second, partitionsMap);
}
- private static int getNumPartitions(AlgebricksPartitionConstraint constraint) {
+ protected static int getNumberOfPartitions(Dataset ds) {
+ return MetadataIndexImmutableProperties.isMetadataDataset(ds.getDatasetId())
+ ? MetadataIndexImmutableProperties.METADATA_DATASETS_PARTITIONS
+ : StorageConstants.NUM_STORAGE_PARTITIONS;
+ }
+
+ protected static int getLocationsCount(AlgebricksPartitionConstraint constraint) {
if (constraint.getPartitionConstraintType() == AlgebricksPartitionConstraint.PartitionConstraintType.COUNT) {
return ((AlgebricksCountPartitionConstraint) constraint).getCount();
} else {
@@ -110,7 +100,7 @@
}
}
- private static int[][] getPartitionsMap(int numPartitions) {
+ protected static int[][] getOneToOnePartitionsMap(int numPartitions) {
int[][] map = new int[numPartitions][1];
for (int i = 0; i < numPartitions; i++) {
map[i] = new int[] { i };
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DynamicDataPartitioningProvider.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DynamicDataPartitioningProvider.java
new file mode 100644
index 0000000..95dae4a
--- /dev/null
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DynamicDataPartitioningProvider.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.metadata.utils;
+
+import org.apache.asterix.common.cluster.PartitioningProperties;
+import org.apache.asterix.common.dataflow.ICcApplicationContext;
+import org.apache.asterix.common.metadata.DataverseName;
+import org.apache.asterix.common.utils.StoragePathUtil;
+import org.apache.asterix.metadata.MetadataTransactionContext;
+import org.apache.asterix.metadata.entities.Dataset;
+import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.common.utils.Pair;
+import org.apache.hyracks.api.io.FileSplit;
+import org.apache.hyracks.dataflow.std.file.IFileSplitProvider;
+
+public class DynamicDataPartitioningProvider extends DataPartitioningProvider {
+
+ public DynamicDataPartitioningProvider(ICcApplicationContext appCtx) {
+ super(appCtx);
+ }
+
+ @Override
+ public PartitioningProperties getPartitioningProperties(DataverseName dataverseName) {
+ Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraints = SplitsAndConstraintsUtil
+ .getDataverseSplitProviderAndConstraints(appCtx.getClusterStateManager(), dataverseName);
+ int[][] partitionsMap = getOneToOnePartitionsMap(getLocationsCount(splitsAndConstraints.second));
+ return PartitioningProperties.of(splitsAndConstraints.first, splitsAndConstraints.second, partitionsMap);
+ }
+
+ @Override
+ public PartitioningProperties getPartitioningProperties(MetadataTransactionContext mdTxnCtx, Dataset ds,
+ String indexName) throws AlgebricksException {
+ FileSplit[] splits =
+ SplitsAndConstraintsUtil.getIndexSplits(ds, indexName, mdTxnCtx, appCtx.getClusterStateManager());
+ Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraints =
+ StoragePathUtil.splitProviderAndPartitionConstraints(splits);
+ int[][] partitionsMap = getOneToOnePartitionsMap(getLocationsCount(splitsAndConstraints.second));
+ return PartitioningProperties.of(splitsAndConstraints.first, splitsAndConstraints.second, partitionsMap);
+ }
+}
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/StaticDataPartitioningProvider.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/StaticDataPartitioningProvider.java
new file mode 100644
index 0000000..eaafc6c
--- /dev/null
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/StaticDataPartitioningProvider.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.metadata.utils;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.asterix.common.cluster.ComputePartition;
+import org.apache.asterix.common.cluster.PartitioningProperties;
+import org.apache.asterix.common.cluster.SplitComputeLocations;
+import org.apache.asterix.common.cluster.StorageComputePartitionsMap;
+import org.apache.asterix.common.dataflow.ICcApplicationContext;
+import org.apache.asterix.common.metadata.DataverseName;
+import org.apache.asterix.common.metadata.MetadataIndexImmutableProperties;
+import org.apache.asterix.common.utils.StorageConstants;
+import org.apache.asterix.common.utils.StoragePathUtil;
+import org.apache.asterix.metadata.MetadataTransactionContext;
+import org.apache.asterix.metadata.entities.Dataset;
+import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
+import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.api.io.FileSplit;
+import org.apache.hyracks.api.io.MappedFileSplit;
+import org.apache.hyracks.dataflow.std.file.IFileSplitProvider;
+
+public class StaticDataPartitioningProvider extends DataPartitioningProvider {
+
+ public StaticDataPartitioningProvider(ICcApplicationContext appCtx) {
+ super(appCtx);
+ }
+
+ @Override
+ public PartitioningProperties getPartitioningProperties(DataverseName dataverseName) {
+ SplitComputeLocations dataverseSplits = getDataverseSplits(dataverseName);
+ StorageComputePartitionsMap partitionMap = clusterStateManager.getStorageComputeMap();
+ int[][] partitionsMap = partitionMap.getComputeToStorageMap(false);
+ return PartitioningProperties.of(dataverseSplits.getSplitsProvider(), dataverseSplits.getConstraints(),
+ partitionsMap);
+ }
+
+ @Override
+ public PartitioningProperties getPartitioningProperties(MetadataTransactionContext mdTxnCtx, Dataset ds,
+ String indexName) throws AlgebricksException {
+ SplitComputeLocations dataverseSplits = getDatasetSplits(ds, indexName);
+ StorageComputePartitionsMap partitionMap = clusterStateManager.getStorageComputeMap();
+ int[][] partitionsMap = partitionMap
+ .getComputeToStorageMap(MetadataIndexImmutableProperties.isMetadataDataset(ds.getDatasetId()));
+ return PartitioningProperties.of(dataverseSplits.getSplitsProvider(), dataverseSplits.getConstraints(),
+ partitionsMap);
+ }
+
+ private SplitComputeLocations getDataverseSplits(DataverseName dataverseName) {
+ List<FileSplit> splits = new ArrayList<>();
+ List<String> locations = new ArrayList<>();
+ Set<Integer> uniqueLocations = new HashSet<>();
+ StorageComputePartitionsMap partitionMap = clusterStateManager.getStorageComputeMap();
+ for (int i = 0; i < StorageConstants.NUM_STORAGE_PARTITIONS; i++) {
+ File f = new File(StoragePathUtil.prepareStoragePartitionPath(i),
+ StoragePathUtil.prepareDataverseName(dataverseName));
+ ComputePartition computePartition = partitionMap.getComputePartition(i);
+ splits.add(new MappedFileSplit(computePartition.getNodeId(), f.getPath(), 0));
+ if (!uniqueLocations.contains(computePartition.getId())) {
+ locations.add(computePartition.getNodeId());
+ }
+ uniqueLocations.add(computePartition.getId());
+ }
+ IFileSplitProvider splitProvider = StoragePathUtil.splitProvider(splits.toArray(new FileSplit[0]));
+ AlgebricksPartitionConstraint constraints =
+ new AlgebricksAbsolutePartitionConstraint(locations.toArray(new String[0]));
+ return new SplitComputeLocations(splitProvider, constraints);
+ }
+
+ private SplitComputeLocations getDatasetSplits(Dataset dataset, String indexName) {
+ List<FileSplit> splits = new ArrayList<>();
+ List<String> locations = new ArrayList<>();
+ Set<Integer> uniqueLocations = new HashSet<>();
+ StorageComputePartitionsMap partitionMap = clusterStateManager.getStorageComputeMap();
+ final int datasetPartitons = getNumberOfPartitions(dataset);
+ boolean metadataDataset = MetadataIndexImmutableProperties.isMetadataDataset(dataset.getDatasetId());
+ for (int i = 0; i < datasetPartitons; i++) {
+ int storagePartition = metadataDataset ? StorageConstants.METADATA_PARTITION : i;
+ final String relPath = StoragePathUtil.prepareDataverseIndexName(dataset.getDataverseName(),
+ dataset.getDatasetName(), indexName, dataset.getRebalanceCount());
+ File f = new File(StoragePathUtil.prepareStoragePartitionPath(storagePartition), relPath);
+ ComputePartition computePartition = partitionMap.getComputePartition(storagePartition);
+ splits.add(new MappedFileSplit(computePartition.getNodeId(), f.getPath(), 0));
+ if (!uniqueLocations.contains(computePartition.getId())) {
+ locations.add(computePartition.getNodeId());
+ }
+ uniqueLocations.add(computePartition.getId());
+ }
+ IFileSplitProvider splitProvider = StoragePathUtil.splitProvider(splits.toArray(new FileSplit[0]));
+ AlgebricksPartitionConstraint constraints =
+ new AlgebricksAbsolutePartitionConstraint(locations.toArray(new String[0]));
+ return new SplitComputeLocations(splitProvider, constraints);
+ }
+}
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryInsertOperatorNodePushable.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryInsertOperatorNodePushable.java
index 7e51ec1..3762e82 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryInsertOperatorNodePushable.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryInsertOperatorNodePushable.java
@@ -65,6 +65,8 @@
import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
+import it.unimi.dsi.fastutil.ints.IntArrayList;
+import it.unimi.dsi.fastutil.ints.IntList;
import it.unimi.dsi.fastutil.ints.IntOpenHashSet;
import it.unimi.dsi.fastutil.ints.IntSet;
@@ -82,11 +84,10 @@
private final IFrameOperationCallback[] frameOpCallbacks;
private boolean flushedPartialTuples;
- private int currentTupleIdx;
- private int lastFlushedTupleIdx;
-
private final PermutingFrameTupleReference keyTuple;
private final Int2ObjectMap<IntSet> partition2TuplesMap = new Int2ObjectOpenHashMap<>();
+ private final IntSet processedTuples = new IntOpenHashSet();
+ private final IntSet flushedTuples = new IntOpenHashSet();
private final SourceLocation sourceLoc;
public LSMPrimaryInsertOperatorNodePushable(IHyracksTaskContext ctx, int partition,
@@ -142,7 +143,7 @@
@Override
public void process(FrameTupleAccessor accessor, ITupleReference tuple, int index)
throws HyracksDataException {
- if (index < currentTupleIdx) {
+ if (processedTuples.contains(index)) {
// already processed; skip
return;
}
@@ -174,7 +175,7 @@
throw HyracksDataException.create(ErrorCode.ERROR_PROCESSING_TUPLE,
HyracksDataException.create(ErrorCode.DUPLICATE_KEY), sourceLoc, index);
}
- currentTupleIdx = index + 1;
+ processedTuples.add(index);
}
@Override
@@ -197,15 +198,14 @@
@Override
public void open() throws HyracksDataException {
- currentTupleIdx = 0;
- lastFlushedTupleIdx = 0;
flushedPartialTuples = false;
accessor = new FrameTupleAccessor(inputRecDesc);
writeBuffer = new VSizeFrame(ctx);
try {
INcApplicationContext appCtx =
(INcApplicationContext) ctx.getJobletContext().getServiceContext().getApplicationContext();
-
+ writer.open();
+ writerOpen = true;
for (int i = 0; i < partitions.length; i++) {
IIndexDataflowHelper indexHelper = indexHelpers[i];
indexHelper.open();
@@ -224,8 +224,6 @@
new PrimaryIndexLogMarkerCallback((AbstractLSMIndex) indexes[0]);
TaskUtil.put(ILogMarkerCallback.KEY_MARKER_CALLBACK, callback, ctx);
}
- writer.open();
- writerOpen = true;
modCallbacks[i] =
modOpCallbackFactory.createModificationOperationCallback(indexHelper.getResource(), ctx, this);
searchCallbacks[i] = (LockThenSearchOperationCallback) searchCallbackFactory
@@ -283,9 +281,9 @@
FrameUtils.copyAndFlip(buffer, writeBuffer.getBuffer());
FrameUtils.flushFrame(writeBuffer.getBuffer(), writer);
}
- currentTupleIdx = 0;
- lastFlushedTupleIdx = 0;
flushedPartialTuples = false;
+ processedTuples.clear();
+ flushedTuples.clear();
}
/**
@@ -293,15 +291,17 @@
*/
@Override
public void flushPartialFrame() throws HyracksDataException {
- if (lastFlushedTupleIdx == currentTupleIdx) {
- //nothing to flush
- return;
- }
- for (int i = lastFlushedTupleIdx; i < currentTupleIdx; i++) {
- FrameUtils.appendToWriter(writer, appender, accessor, i);
+ IntList tuplesToFlush = new IntArrayList();
+ processedTuples.iterator().forEachRemaining(tIdx -> {
+ if (!flushedTuples.contains(tIdx)) {
+ tuplesToFlush.add(tIdx);
+ flushedTuples.add(tIdx);
+ }
+ });
+ for (int i = 0; i < tuplesToFlush.size(); i++) {
+ FrameUtils.appendToWriter(writer, appender, accessor, tuplesToFlush.getInt(i));
}
appender.write(writer, true);
- lastFlushedTupleIdx = currentTupleIdx;
flushedPartialTuples = true;
}
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java
index d3c87ff..984b3ce 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java
@@ -34,12 +34,15 @@
import org.apache.asterix.common.api.IClusterManagementWork.ClusterState;
import org.apache.asterix.common.cluster.ClusterPartition;
import org.apache.asterix.common.cluster.IClusterStateManager;
+import org.apache.asterix.common.cluster.StorageComputePartitionsMap;
import org.apache.asterix.common.dataflow.ICcApplicationContext;
import org.apache.asterix.common.exceptions.AsterixException;
import org.apache.asterix.common.exceptions.ErrorCode;
import org.apache.asterix.common.replication.INcLifecycleCoordinator;
import org.apache.asterix.common.transactions.IResourceIdManager;
import org.apache.asterix.common.utils.NcLocalCounters;
+import org.apache.asterix.common.utils.PartitioningScheme;
+import org.apache.asterix.common.utils.StorageConstants;
import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.api.config.IOption;
@@ -79,6 +82,7 @@
private ICcApplicationContext appCtx;
private ClusterPartition metadataPartition;
private boolean rebalanceRequired;
+ private StorageComputePartitionsMap storageComputePartitionsMap;
@Override
public void setCcAppCtx(ICcApplicationContext appCtx) {
@@ -86,7 +90,14 @@
node2PartitionsMap = appCtx.getMetadataProperties().getNodePartitions();
clusterPartitions = appCtx.getMetadataProperties().getClusterPartitions();
currentMetadataNode = appCtx.getMetadataProperties().getMetadataNodeName();
- metadataPartition = node2PartitionsMap.get(currentMetadataNode)[0];
+ PartitioningScheme partitioningScheme = appCtx.getStorageProperties().getPartitioningScheme();
+ if (partitioningScheme == PartitioningScheme.DYNAMIC) {
+ metadataPartition = node2PartitionsMap.get(currentMetadataNode)[0];
+ } else {
+ final ClusterPartition fixedMetadataPartition = new ClusterPartition(StorageConstants.METADATA_PARTITION,
+ appCtx.getMetadataProperties().getMetadataNodeName(), 0);
+ metadataPartition = fixedMetadataPartition;
+ }
lifecycleCoordinator = appCtx.getNcLifecycleCoordinator();
lifecycleCoordinator.bindTo(this);
}
@@ -299,6 +310,9 @@
clusterActiveLocations.removeAll(pendingRemoval);
clusterPartitionConstraint =
new AlgebricksAbsolutePartitionConstraint(clusterActiveLocations.toArray(new String[] {}));
+ if (appCtx.getStorageProperties().getPartitioningScheme() == PartitioningScheme.STATIC) {
+ storageComputePartitionsMap = StorageComputePartitionsMap.computePartitionsMap(this);
+ }
}
@Override
@@ -489,6 +503,10 @@
return nodeIds.stream().anyMatch(failedNodes::contains);
}
+ public synchronized StorageComputePartitionsMap getStorageComputeMap() {
+ return storageComputePartitionsMap;
+ }
+
private void updateClusterCounters(String nodeId, NcLocalCounters localCounters) {
final IResourceIdManager resourceIdManager = appCtx.getResourceIdManager();
resourceIdManager.report(nodeId, localCounters.getMaxResourceId());
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
index bb3cde5..1e81346 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
@@ -211,7 +211,7 @@
public void delete(String relativePath) throws HyracksDataException {
FileReference resourceFile = getLocalResourceFileByName(ioManager, relativePath);
boolean resourceExists = resourceFile.getFile().exists();
- if (resourceExists) {
+ if (isReplicationEnabled && resourceExists) {
try {
createReplicationJob(ReplicationOperation.DELETE, resourceFile);
} catch (Exception e) {