[NO ISSUE][MTD] Customizable dataset compaction policy in metadata
- user model changes: no
- storage format changes: no
- interface changes: no
Details:
- Allow product extensions to customize how dataset's
compaction policy is stored in the metadata
Change-Id: I0216af5eabdf5ff269ba2d3feccf1371d273315b
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/5224
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Dmitry Lychagin <dmitry.lychagin@couchbase.com>
Reviewed-by: Hussain Towaileb <hussainht@gmail.com>
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entitytupletranslators/DatasetTupleTranslator.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entitytupletranslators/DatasetTupleTranslator.java
index dae6152..74f5076 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entitytupletranslators/DatasetTupleTranslator.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entitytupletranslators/DatasetTupleTranslator.java
@@ -25,6 +25,7 @@
import java.io.DataOutput;
import java.util.ArrayList;
import java.util.Calendar;
+import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.LinkedHashMap;
@@ -68,6 +69,7 @@
import org.apache.asterix.om.types.IAType;
import org.apache.asterix.runtime.compression.CompressionManager;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.common.utils.Pair;
import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
@@ -135,20 +137,9 @@
String nodeGroupName =
((AString) datasetRecord.getValueByPos(MetadataRecordTypes.DATASET_ARECORD_GROUPNAME_FIELD_INDEX))
.getStringValue();
- String compactionPolicy = ((AString) datasetRecord
- .getValueByPos(MetadataRecordTypes.DATASET_ARECORD_COMPACTION_POLICY_FIELD_INDEX)).getStringValue();
- IACursor cursor = ((AOrderedList) datasetRecord
- .getValueByPos(MetadataRecordTypes.DATASET_ARECORD_COMPACTION_POLICY_PROPERTIES_FIELD_INDEX))
- .getCursor();
- Map<String, String> compactionPolicyProperties = new LinkedHashMap<>();
- String key;
- String value;
- while (cursor.next()) {
- ARecord field = (ARecord) cursor.get();
- key = ((AString) field.getValueByPos(MetadataRecordTypes.PROPERTIES_NAME_FIELD_INDEX)).getStringValue();
- value = ((AString) field.getValueByPos(MetadataRecordTypes.PROPERTIES_VALUE_FIELD_INDEX)).getStringValue();
- compactionPolicyProperties.put(key, value);
- }
+
+ Pair<String, Map<String, String>> compactionPolicy = readCompactionPolicy(datasetType, datasetRecord);
+
switch (datasetType) {
case INTERNAL: {
ARecord datasetDetailsRecord = (ARecord) datasetRecord
@@ -159,7 +150,7 @@
PartitioningStrategy partitioningStrategy = PartitioningStrategy.valueOf(((AString) datasetDetailsRecord
.getValueByPos(MetadataRecordTypes.INTERNAL_DETAILS_ARECORD_PARTITIONSTRATEGY_FIELD_INDEX))
.getStringValue());
- cursor = ((AOrderedList) datasetDetailsRecord
+ IACursor cursor = ((AOrderedList) datasetDetailsRecord
.getValueByPos(MetadataRecordTypes.INTERNAL_DETAILS_ARECORD_PARTITIONKEY_FIELD_INDEX))
.getCursor();
List<List<String>> partitioningKey = new ArrayList<>();
@@ -220,15 +211,15 @@
String adapter = ((AString) datasetDetailsRecord
.getValueByPos(MetadataRecordTypes.EXTERNAL_DETAILS_ARECORD_DATASOURCE_ADAPTER_FIELD_INDEX))
.getStringValue();
- cursor = ((AOrderedList) datasetDetailsRecord
+ IACursor cursor = ((AOrderedList) datasetDetailsRecord
.getValueByPos(MetadataRecordTypes.EXTERNAL_DETAILS_ARECORD_PROPERTIES_FIELD_INDEX))
.getCursor();
Map<String, String> properties = new HashMap<>();
while (cursor.next()) {
ARecord field = (ARecord) cursor.get();
- key = ((AString) field.getValueByPos(MetadataRecordTypes.PROPERTIES_NAME_FIELD_INDEX))
+ String key = ((AString) field.getValueByPos(MetadataRecordTypes.PROPERTIES_NAME_FIELD_INDEX))
.getStringValue();
- value = ((AString) field.getValueByPos(MetadataRecordTypes.PROPERTIES_VALUE_FIELD_INDEX))
+ String value = ((AString) field.getValueByPos(MetadataRecordTypes.PROPERTIES_VALUE_FIELD_INDEX))
.getStringValue();
properties.put(key, value);
}
@@ -262,10 +253,34 @@
String compressionScheme = getCompressionScheme(datasetRecord);
return new Dataset(dataverseName, datasetName, typeDataverseName, typeName, metaTypeDataverseName, metaTypeName,
- nodeGroupName, compactionPolicy, compactionPolicyProperties, datasetDetails, hints, datasetType,
+ nodeGroupName, compactionPolicy.first, compactionPolicy.second, datasetDetails, hints, datasetType,
datasetId, pendingOp, rebalanceCount, compressionScheme);
}
+ protected Pair<String, Map<String, String>> readCompactionPolicy(DatasetType datasetType, ARecord datasetRecord) {
+
+ String compactionPolicy = ((AString) datasetRecord
+ .getValueByPos(MetadataRecordTypes.DATASET_ARECORD_COMPACTION_POLICY_FIELD_INDEX)).getStringValue();
+ AOrderedList compactionPolicyPropertiesList = ((AOrderedList) datasetRecord
+ .getValueByPos(MetadataRecordTypes.DATASET_ARECORD_COMPACTION_POLICY_PROPERTIES_FIELD_INDEX));
+
+ Map<String, String> compactionPolicyProperties;
+ if (compactionPolicyPropertiesList.size() > 0) {
+ compactionPolicyProperties = new LinkedHashMap<>();
+ for (IACursor cursor = compactionPolicyPropertiesList.getCursor(); cursor.next();) {
+ ARecord field = (ARecord) cursor.get();
+ String key = ((AString) field.getValueByPos(MetadataRecordTypes.PROPERTIES_NAME_FIELD_INDEX))
+ .getStringValue();
+ String value = ((AString) field.getValueByPos(MetadataRecordTypes.PROPERTIES_VALUE_FIELD_INDEX))
+ .getStringValue();
+ compactionPolicyProperties.put(key, value);
+ }
+ } else {
+ compactionPolicyProperties = Collections.emptyMap();
+ }
+ return new Pair<>(compactionPolicy, compactionPolicyProperties);
+ }
+
private long getRebalanceCount(ARecord datasetRecord) {
// Read the rebalance count if there is one.
int rebalanceCountIndex =
@@ -342,29 +357,9 @@
stringSerde.serialize(aString, fieldValue.getDataOutput());
recordBuilder.addField(MetadataRecordTypes.DATASET_ARECORD_GROUPNAME_FIELD_INDEX, fieldValue);
- // write field 6
- fieldValue.reset();
- aString.setValue(dataset.getCompactionPolicy());
- stringSerde.serialize(aString, fieldValue.getDataOutput());
- recordBuilder.addField(MetadataRecordTypes.DATASET_ARECORD_COMPACTION_POLICY_FIELD_INDEX, fieldValue);
-
- // write field 7
- listBuilder.reset((AOrderedListType) MetadataRecordTypes.DATASET_RECORDTYPE
- .getFieldTypes()[MetadataRecordTypes.DATASET_ARECORD_COMPACTION_POLICY_PROPERTIES_FIELD_INDEX]);
- if (dataset.getCompactionPolicyProperties() != null) {
- for (Map.Entry<String, String> property : dataset.getCompactionPolicyProperties().entrySet()) {
- String name = property.getKey();
- String value = property.getValue();
- itemValue.reset();
- DatasetUtil.writePropertyTypeRecord(name, value, itemValue.getDataOutput(),
- MetadataRecordTypes.COMPACTION_POLICY_PROPERTIES_RECORDTYPE);
- listBuilder.addItem(itemValue);
- }
- }
- fieldValue.reset();
- listBuilder.write(fieldValue.getDataOutput(), true);
- recordBuilder.addField(MetadataRecordTypes.DATASET_ARECORD_COMPACTION_POLICY_PROPERTIES_FIELD_INDEX,
- fieldValue);
+ // write field 6/7
+ writeCompactionPolicy(dataset.getDatasetType(), dataset.getCompactionPolicy(),
+ dataset.getCompactionPolicyProperties(), listBuilder, itemValue);
// write field 8/9
fieldValue.reset();
@@ -414,6 +409,34 @@
return tuple;
}
+ protected void writeCompactionPolicy(DatasetType datasetType, String compactionPolicy,
+ Map<String, String> compactionPolicyProperties, OrderedListBuilder listBuilder,
+ ArrayBackedValueStorage itemValue) throws HyracksDataException {
+ // write field 6
+ fieldValue.reset();
+ aString.setValue(compactionPolicy);
+ stringSerde.serialize(aString, fieldValue.getDataOutput());
+ recordBuilder.addField(MetadataRecordTypes.DATASET_ARECORD_COMPACTION_POLICY_FIELD_INDEX, fieldValue);
+
+ // write field 7
+ listBuilder.reset((AOrderedListType) MetadataRecordTypes.DATASET_RECORDTYPE
+ .getFieldTypes()[MetadataRecordTypes.DATASET_ARECORD_COMPACTION_POLICY_PROPERTIES_FIELD_INDEX]);
+ if (compactionPolicyProperties != null && !compactionPolicyProperties.isEmpty()) {
+ for (Map.Entry<String, String> property : compactionPolicyProperties.entrySet()) {
+ String name = property.getKey();
+ String value = property.getValue();
+ itemValue.reset();
+ DatasetUtil.writePropertyTypeRecord(name, value, itemValue.getDataOutput(),
+ MetadataRecordTypes.COMPACTION_POLICY_PROPERTIES_RECORDTYPE);
+ listBuilder.addItem(itemValue);
+ }
+ }
+ fieldValue.reset();
+ listBuilder.write(fieldValue.getDataOutput(), true);
+ recordBuilder.addField(MetadataRecordTypes.DATASET_ARECORD_COMPACTION_POLICY_PROPERTIES_FIELD_INDEX,
+ fieldValue);
+ }
+
/**
* Keep protected to allow other extensions to add additional fields
*