fix merge issues
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryBTreeCreator.java b/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryBTreeCreator.java
index 144131f..baf16de 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryBTreeCreator.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryBTreeCreator.java
@@ -112,11 +112,12 @@
// If any of the secondary fields are nullable, then add a select op that filters nulls.
AlgebricksMetaOperatorDescriptor selectOp = null;
if (anySecondaryKeyIsNullable) {
- selectOp = createFilterNullsSelectOp(spec, numSecondaryKeys,RIDScanOpAndConstraints.second);
+ selectOp = createFilterNullsSelectOp(spec, numSecondaryKeys);
}
// Sort by secondary keys.
ExternalSortOperatorDescriptor sortOp = createSortOp(spec, secondaryComparatorFactories, secondaryRecDesc,RIDScanOpAndConstraints.second);
+ AsterixStorageProperties storageProperties = propertiesProvider.getStorageProperties();
// Create secondary BTree bulk load op.
TreeIndexBulkLoadOperatorDescriptor secondaryBulkLoadOp = createTreeIndexBulkLoadOp(
spec,
@@ -199,3 +200,4 @@
}
}
}
+
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryIndexCreator.java b/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryIndexCreator.java
index 60a4451..5da336f 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryIndexCreator.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryIndexCreator.java
@@ -19,6 +19,9 @@
import java.io.IOException;
import java.util.List;
+import edu.uci.ics.asterix.external.adapter.factory.HDFSAdapterFactory;
+import edu.uci.ics.asterix.external.data.operator.ExternalDataIndexingOperatorDescriptor;
+import edu.uci.ics.asterix.external.dataset.adapter.HiveAdapter;
import edu.uci.ics.asterix.common.config.AsterixStorageProperties;
import edu.uci.ics.asterix.common.config.DatasetConfig.DatasetType;
import edu.uci.ics.asterix.common.config.IAsterixPropertiesProvider;
@@ -28,22 +31,18 @@
import edu.uci.ics.asterix.common.exceptions.AsterixException;
import edu.uci.ics.asterix.common.transactions.IRecoveryManager.ResourceType;
import edu.uci.ics.asterix.common.transactions.JobId;
-import edu.uci.ics.asterix.external.adapter.factory.HDFSAdapterFactory;
-import edu.uci.ics.asterix.external.adapter.factory.HiveAdapterFactory;
-import edu.uci.ics.asterix.external.data.operator.ExternalDataIndexingOperatorDescriptor;
-import edu.uci.ics.asterix.external.dataset.adapter.HiveAdapter;
import edu.uci.ics.asterix.formats.nontagged.AqlBinaryBooleanInspectorImpl;
import edu.uci.ics.asterix.formats.nontagged.AqlBinaryComparatorFactoryProvider;
import edu.uci.ics.asterix.formats.nontagged.AqlSerializerDeserializerProvider;
import edu.uci.ics.asterix.formats.nontagged.AqlTypeTraitProvider;
+import edu.uci.ics.asterix.metadata.entities.ExternalDatasetDetails;
import edu.uci.ics.asterix.metadata.MetadataException;
import edu.uci.ics.asterix.metadata.declared.AqlMetadataProvider;
import edu.uci.ics.asterix.metadata.entities.Dataset;
-import edu.uci.ics.asterix.metadata.entities.ExternalDatasetDetails;
import edu.uci.ics.asterix.metadata.entities.Index;
import edu.uci.ics.asterix.metadata.utils.DatasetUtils;
-import edu.uci.ics.asterix.om.types.ARecordType;
import edu.uci.ics.asterix.om.types.BuiltinType;
+import edu.uci.ics.asterix.om.types.ARecordType;
import edu.uci.ics.asterix.om.types.IAType;
import edu.uci.ics.asterix.om.util.AsterixAppContextInfo;
import edu.uci.ics.asterix.runtime.evaluators.functions.AndDescriptor;
@@ -51,7 +50,6 @@
import edu.uci.ics.asterix.runtime.evaluators.functions.NotDescriptor;
import edu.uci.ics.asterix.runtime.job.listener.JobEventListenerFactory;
import edu.uci.ics.asterix.transaction.management.opcallbacks.PrimaryIndexInstantSearchOperationCallbackFactory;
-import edu.uci.ics.asterix.runtime.formats.NonTaggedDataFormat;
import edu.uci.ics.asterix.transaction.management.opcallbacks.PrimaryIndexOperationTrackerProvider;
import edu.uci.ics.asterix.transaction.management.service.transaction.AsterixRuntimeComponentsProvider;
import edu.uci.ics.asterix.transaction.management.service.transaction.JobIdFactory;
@@ -85,6 +83,7 @@
import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
import edu.uci.ics.hyracks.dataflow.std.misc.ConstantTupleSourceOperatorDescriptor;
import edu.uci.ics.hyracks.dataflow.std.sort.ExternalSortOperatorDescriptor;
+import edu.uci.ics.asterix.runtime.formats.NonTaggedDataFormat;
import edu.uci.ics.hyracks.storage.am.btree.dataflow.BTreeSearchOperatorDescriptor;
import edu.uci.ics.hyracks.storage.am.common.api.ISearchOperationCallbackFactory;
import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
@@ -92,6 +91,7 @@
import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallbackFactory;
import edu.uci.ics.hyracks.storage.am.lsm.btree.dataflow.LSMBTreeDataflowHelperFactory;
+
@SuppressWarnings("rawtypes")
// TODO: We should eventually have a hierarchy of classes that can create all
// possible index job specs,
@@ -109,11 +109,11 @@
protected ISerializerDeserializer payloadSerde;
protected IFileSplitProvider primaryFileSplitProvider;
protected AlgebricksPartitionConstraint primaryPartitionConstraint;
+ protected List<String> secondaryKeyFields;
protected IFileSplitProvider secondaryFileSplitProvider;
protected AlgebricksPartitionConstraint secondaryPartitionConstraint;
protected String secondaryIndexName;
protected boolean anySecondaryKeyIsNullable = false;
-
protected long numElementsHint;
protected IBinaryComparatorFactory[] primaryComparatorFactories;
protected int[] primaryBloomFilterKeyFields;
@@ -122,7 +122,6 @@
protected int[] secondaryBloomFilterKeyFields;
protected RecordDescriptor secondaryRecDesc;
protected ICopyEvaluatorFactory[] secondaryFieldAccessEvalFactories;
-
protected IAsterixPropertiesProvider propertiesProvider;
// Prevent public construction. Should be created via createIndexCreator().
@@ -166,37 +165,68 @@
public abstract JobSpecification buildLoadingJobSpec() throws AsterixException, AlgebricksException;
protected void init(CompiledCreateIndexStatement createIndexStmt, AqlMetadataProvider metadataProvider)
- throws AsterixException, AlgebricksException {
- this.metadataProvider = metadataProvider;
- dataverseName = createIndexStmt.getDataverseName() == null ? metadataProvider.getDefaultDataverseName()
- : createIndexStmt.getDataverseName();
- datasetName = createIndexStmt.getDatasetName();
- secondaryIndexName = createIndexStmt.getIndexName();
- dataset = metadataProvider.findDataset(dataverseName, datasetName);
- if (dataset == null) {
- throw new AsterixException("Unknown dataset " + datasetName);
+ throws AsterixException, AlgebricksException {
+ this.metadataProvider = metadataProvider;
+ dataverseName = createIndexStmt.getDataverseName() == null ? metadataProvider.getDefaultDataverseName()
+ : createIndexStmt.getDataverseName();
+ datasetName = createIndexStmt.getDatasetName();
+ secondaryIndexName = createIndexStmt.getIndexName();
+ dataset = metadataProvider.findDataset(dataverseName, datasetName);
+ if (dataset == null) {
+ throw new AsterixException("Unknown dataset " + datasetName);
+ }
+ if (dataset.getDatasetType() == DatasetType.EXTERNAL) {
+ //get external dataset details
+ ExternalDatasetDetails edsd = ((ExternalDatasetDetails)dataset.getDatasetDetails());
+ //get adapter name
+ String adapter = edsd.getAdapter();
+ //if not an hdfs adapter, throw an exception
+ if(!adapter.equals(HDFSAdapterFactory.HDFS_ADAPTER_NAME) && !adapter.equals(HiveAdapter.class.getName()))
+ {
+ throw new AsterixException("Cannot index an external dataset with adapter type(" + adapter + ").");
+ }
+ //get the item type
+ ARecordType externalItemType = (ARecordType) metadataProvider.findType(dataset.getDataverseName(), dataset.getItemTypeName());
+ //number of primary keys here depends on the file input, 3 for rcfiles and 2 for text and sequence files.
+ numPrimaryKeys = DatasetUtils.getExternalRIDSize(dataset);
+ itemType = createExternalItemTypeWithRID(externalItemType);
+ payloadSerde = AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(itemType);
+ numSecondaryKeys = createIndexStmt.getKeyFields().size();
+ //splits and constraints <--They don't exist-->
+ primaryFileSplitProvider = null;
+ primaryPartitionConstraint = null;
+ //create secondary split and constraints
+ Pair<IFileSplitProvider, AlgebricksPartitionConstraint> secondarySplitsAndConstraint = metadataProvider
+ .splitProviderAndPartitionConstraintsForExternalDataset(dataverseName, datasetName,
+ secondaryIndexName);
+ secondaryFileSplitProvider = secondarySplitsAndConstraint.first;
+ secondaryPartitionConstraint = secondarySplitsAndConstraint.second;
+ // Must be called in this order.
+ setExternalRIDDescAndComparators();
+ setExternalSecondaryRecDescAndComparators(createIndexStmt, metadataProvider);
+ numElementsHint = metadataProvider.getCardinalityPerPartitionHint(dataset);
+ }
+ else
+ {
+ itemType = (ARecordType) metadataProvider.findType(dataset.getDataverseName(), dataset.getItemTypeName());
+ payloadSerde = AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(itemType);
+ numPrimaryKeys = DatasetUtils.getPartitioningKeys(dataset).size();
+ numSecondaryKeys = createIndexStmt.getKeyFields().size();
+ Pair<IFileSplitProvider, AlgebricksPartitionConstraint> primarySplitsAndConstraint = metadataProvider
+ .splitProviderAndPartitionConstraintsForInternalOrFeedDataset(dataverseName, datasetName, datasetName);
+ primaryFileSplitProvider = primarySplitsAndConstraint.first;
+ primaryPartitionConstraint = primarySplitsAndConstraint.second;
+ Pair<IFileSplitProvider, AlgebricksPartitionConstraint> secondarySplitsAndConstraint = metadataProvider
+ .splitProviderAndPartitionConstraintsForInternalOrFeedDataset(dataverseName, datasetName,
+ secondaryIndexName);
+ secondaryFileSplitProvider = secondarySplitsAndConstraint.first;
+ secondaryPartitionConstraint = secondarySplitsAndConstraint.second;
+ // Must be called in this order.
+ setPrimaryRecDescAndComparators();
+ setSecondaryRecDescAndComparators(createIndexStmt, metadataProvider);
+ numElementsHint = metadataProvider.getCardinalityPerPartitionHint(dataset);
+ }
}
- if (dataset.getDatasetType() == DatasetType.EXTERNAL) {
- throw new AsterixException("Cannot index an external dataset (" + datasetName + ").");
- }
- itemType = (ARecordType) metadataProvider.findType(dataset.getDataverseName(), dataset.getItemTypeName());
- payloadSerde = AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(itemType);
- numPrimaryKeys = DatasetUtils.getPartitioningKeys(dataset).size();
- numSecondaryKeys = createIndexStmt.getKeyFields().size();
- Pair<IFileSplitProvider, AlgebricksPartitionConstraint> primarySplitsAndConstraint = metadataProvider
- .splitProviderAndPartitionConstraintsForInternalOrFeedDataset(dataverseName, datasetName, datasetName);
- primaryFileSplitProvider = primarySplitsAndConstraint.first;
- primaryPartitionConstraint = primarySplitsAndConstraint.second;
- Pair<IFileSplitProvider, AlgebricksPartitionConstraint> secondarySplitsAndConstraint = metadataProvider
- .splitProviderAndPartitionConstraintsForInternalOrFeedDataset(dataverseName, datasetName,
- secondaryIndexName);
- secondaryFileSplitProvider = secondarySplitsAndConstraint.first;
- secondaryPartitionConstraint = secondarySplitsAndConstraint.second;
- // Must be called in this order.
- setPrimaryRecDescAndComparators();
- setSecondaryRecDescAndComparators(createIndexStmt, metadataProvider);
- numElementsHint = metadataProvider.getCardinalityPerPartitionHint(dataset);
- }
protected void setPrimaryRecDescAndComparators() throws AlgebricksException {
List<String> partitioningKeys = DatasetUtils.getPartitioningKeys(dataset);
@@ -280,23 +310,235 @@
return keyProviderOp;
}
+protected ARecordType createExternalItemTypeWithRID(
+ ARecordType externalItemType) throws AsterixException {
+
+ String[] fieldsNames = new String[externalItemType.getFieldNames().length+numPrimaryKeys];
+ IAType[] fieldsTypes = new IAType[externalItemType.getFieldTypes().length+numPrimaryKeys];
+
+ //add RID fields names and types
+ if(AqlMetadataProvider.isOptimizeExternalIndexes())
+ {
+ fieldsNames[0] = "_file-number";
+ fieldsTypes[0] = BuiltinType.AINT32;
+ }
+ else
+ {
+ fieldsNames[0] = "_file-name";
+ fieldsTypes[0] = BuiltinType.ASTRING;
+ }
+ fieldsNames[1] = "_byte-location";
+ fieldsTypes[1] = BuiltinType.AINT64;
+ if(numPrimaryKeys == 3)
+ {
+ //add the row number for rc files
+ fieldsNames[2] = "_row-number";
+ fieldsTypes[2] = BuiltinType.AINT32;
+ }
+
+ //add the original fields names and types
+ for(int i=0; i < externalItemType.getFieldNames().length; i++)
+ {
+ fieldsNames[i+numPrimaryKeys] = externalItemType.getFieldNames()[i];
+ fieldsTypes[i+numPrimaryKeys] = externalItemType.getFieldTypes()[i];
+ }
+ return new ARecordType(externalItemType.getTypeName(), fieldsNames, fieldsTypes, externalItemType.isOpen());
+ }
+
+ protected void setExternalRIDDescAndComparators() throws AlgebricksException {
+
+ ISerializerDeserializer[] externalRecFields = new ISerializerDeserializer[itemType.getFieldNames().length];
+ ITypeTraits[] externalTypeTraits = new ITypeTraits[itemType.getFieldNames().length];
+
+ primaryComparatorFactories = new IBinaryComparatorFactory[numPrimaryKeys];
+ primaryBloomFilterKeyFields = new int[numPrimaryKeys];
+ ISerializerDeserializerProvider serdeProvider = metadataProvider.getFormat().getSerdeProvider();
+
+ if(AqlMetadataProvider.isOptimizeExternalIndexes())
+ {
+ primaryComparatorFactories[0] = AqlBinaryComparatorFactoryProvider.INSTANCE.getBinaryComparatorFactory(BuiltinType.AINT32, true);
+ }
+ else
+ {
+ primaryComparatorFactories[0] = AqlBinaryComparatorFactoryProvider.INSTANCE.getBinaryComparatorFactory(BuiltinType.ASTRING, true);
+ }
+ primaryComparatorFactories[1] = AqlBinaryComparatorFactoryProvider.INSTANCE.getBinaryComparatorFactory(BuiltinType.AINT64, true);
+
+ primaryBloomFilterKeyFields[0]=0;
+ primaryBloomFilterKeyFields[1]=1;
+
+ if(numPrimaryKeys == 3)
+ {
+ primaryComparatorFactories[2] = AqlBinaryComparatorFactoryProvider.INSTANCE.getBinaryComparatorFactory(BuiltinType.AINT32, true);
+ primaryBloomFilterKeyFields[2]=2;
+ }
+
+ for(int i=0; i < itemType.getFieldNames().length; i++)
+ {
+ externalRecFields[i] = serdeProvider.getSerializerDeserializer(itemType.getFieldTypes()[i]);
+ externalTypeTraits[i] = AqlTypeTraitProvider.INSTANCE.getTypeTrait(itemType.getFieldTypes()[i]);
+ }
+ primaryRecDesc = new RecordDescriptor(externalRecFields, externalTypeTraits);
+ }
+
+protected void setExternalSecondaryRecDescAndComparators(CompiledCreateIndexStatement createIndexStmt,
+ AqlMetadataProvider metadataProvider) throws AlgebricksException, AsterixException {
+ secondaryKeyFields = createIndexStmt.getKeyFields();
+ secondaryFieldAccessEvalFactories = new ICopyEvaluatorFactory[numSecondaryKeys+ numPrimaryKeys];
+ secondaryComparatorFactories = new IBinaryComparatorFactory[numSecondaryKeys + numPrimaryKeys];
+ secondaryBloomFilterKeyFields = new int[numSecondaryKeys];
+ ISerializerDeserializer[] secondaryRecFields = new ISerializerDeserializer[numPrimaryKeys + numSecondaryKeys];
+ ITypeTraits[] secondaryTypeTraits = new ITypeTraits[numSecondaryKeys + numPrimaryKeys];
+ ISerializerDeserializerProvider serdeProvider = metadataProvider.getFormat().getSerdeProvider();
+ ITypeTraitProvider typeTraitProvider = metadataProvider.getFormat().getTypeTraitProvider();
+ IBinaryComparatorFactoryProvider comparatorFactoryProvider = metadataProvider.getFormat()
+ .getBinaryComparatorFactoryProvider();
+
+ for (int i = 0; i < numSecondaryKeys; i++) {
+ secondaryFieldAccessEvalFactories[i] = metadataProvider.getFormat().getFieldAccessEvaluatorFactory(
+ itemType, secondaryKeyFields.get(i), 0);
+ Pair<IAType, Boolean> keyTypePair = Index.getNonNullableKeyFieldType(secondaryKeyFields.get(i), itemType);
+ IAType keyType = keyTypePair.first;
+ anySecondaryKeyIsNullable = anySecondaryKeyIsNullable || keyTypePair.second;
+ ISerializerDeserializer keySerde = serdeProvider.getSerializerDeserializer(keyType);
+ secondaryRecFields[i] = keySerde;
+ secondaryComparatorFactories[i] = comparatorFactoryProvider.getBinaryComparatorFactory(keyType, true);
+ secondaryTypeTraits[i] = typeTraitProvider.getTypeTrait(keyType);
+ secondaryBloomFilterKeyFields[i] = i;
+ }
+
+ if(AqlMetadataProvider.isOptimizeExternalIndexes())
+ {
+ secondaryFieldAccessEvalFactories[numSecondaryKeys] = metadataProvider.getFormat().getFieldAccessEvaluatorFactory(
+ itemType, "_file-number", 0);
+ }
+ else
+ {
+ secondaryFieldAccessEvalFactories[numSecondaryKeys] = metadataProvider.getFormat().getFieldAccessEvaluatorFactory(
+ itemType, "_file-name", 0);
+ }
+ secondaryFieldAccessEvalFactories[numSecondaryKeys+1] = metadataProvider.getFormat().getFieldAccessEvaluatorFactory(
+ itemType, "_byte-location", 0);
+ if(numPrimaryKeys == 3)
+ {
+ secondaryFieldAccessEvalFactories[numSecondaryKeys+2] = metadataProvider.getFormat().getFieldAccessEvaluatorFactory(
+ itemType, "_row-number", 0);
+ }
+
+ for (int i = 0; i < numPrimaryKeys; i++) {
+ secondaryRecFields[numSecondaryKeys + i] = primaryRecDesc.getFields()[i];
+ secondaryTypeTraits[numSecondaryKeys + i] = primaryRecDesc.getTypeTraits()[i];
+ secondaryComparatorFactories[numSecondaryKeys + i] = primaryComparatorFactories[i];
+ }
+ secondaryRecDesc = new RecordDescriptor(secondaryRecFields, secondaryTypeTraits);
+ }
+
+protected Pair<ExternalDataIndexingOperatorDescriptor, AlgebricksPartitionConstraint> createExternalIndexingOp(JobSpecification spec) throws Exception {
+ Pair<ExternalDataIndexingOperatorDescriptor,AlgebricksPartitionConstraint> indexingOpAndConstraints = metadataProvider.buildExternalDataIndexingRuntime(spec, itemType, dataset, NonTaggedDataFormat.INSTANCE);
+ AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, indexingOpAndConstraints.first,
+ indexingOpAndConstraints.second);
+ return indexingOpAndConstraints;
+ }
+
+protected AlgebricksMetaOperatorDescriptor createExternalAssignOp(JobSpecification spec) throws AlgebricksException {
+ int[] outColumns = new int[numSecondaryKeys + numPrimaryKeys];
+ int[] projectionList = new int[numSecondaryKeys + numPrimaryKeys];
+ for (int i = 0; i < numSecondaryKeys + numPrimaryKeys; i++) {
+ outColumns[i] = i;
+ projectionList[i] = i;
+ }
+
+ IScalarEvaluatorFactory[] sefs = new IScalarEvaluatorFactory[secondaryFieldAccessEvalFactories.length];
+ for (int i = 0; i < secondaryFieldAccessEvalFactories.length; ++i) {
+ sefs[i] = new LogicalExpressionJobGenToExpressionRuntimeProviderAdapter.ScalarEvaluatorFactoryAdapter(
+ secondaryFieldAccessEvalFactories[i]);
+ }
+ AssignRuntimeFactory assign = new AssignRuntimeFactory(outColumns, sefs, projectionList);
+ AlgebricksMetaOperatorDescriptor asterixAssignOp = new AlgebricksMetaOperatorDescriptor(spec, 1, 1,
+ new IPushRuntimeFactory[] { assign }, new RecordDescriptor[] { secondaryRecDesc });
+ return asterixAssignOp;
+ }
+
+ protected ExternalSortOperatorDescriptor createSortOp(JobSpecification spec,
+ IBinaryComparatorFactory[] secondaryComparatorFactories, RecordDescriptor secondaryRecDesc,
+ AlgebricksPartitionConstraint partitionConstraints) {
+ int[] sortFields = new int[secondaryComparatorFactories.length];
+ for (int i = 0; i < secondaryComparatorFactories.length; i++) {
+ sortFields[i] = i;
+ }
+ ExternalSortOperatorDescriptor sortOp = new ExternalSortOperatorDescriptor(spec,
+ physOptConf.getMaxFramesExternalSort(), sortFields, secondaryComparatorFactories, secondaryRecDesc);
+ AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, sortOp, partitionConstraints);
+ return sortOp;
+ }
+
+protected ARecordType createSecondaryItemType(ARecordType externalItemType, boolean isRCFile) throws AsterixException
+ {
+
+ String[] fieldsNames = new String[numSecondaryKeys+numPrimaryKeys];
+ IAType[] fieldsTypes = new IAType[numSecondaryKeys+numPrimaryKeys];
+
+ //first create the secondary index fields
+ for(int i=0; i<numSecondaryKeys; i++)
+ {
+ fieldsNames[i] = secondaryKeyFields.get(i);
+ try {
+ fieldsTypes[i] = externalItemType.getFieldType(fieldsNames[i]);
+ } catch (IOException e) {
+ // TODO Auto-generated catch block
+ throw new AsterixException(e);
+ }
+ }
+
+ //second add RID fields (File name or number and byte location)
+ if(AqlMetadataProvider.isOptimizeExternalIndexes())
+ {
+ fieldsNames[numSecondaryKeys] = "_file-number";
+ fieldsTypes[numSecondaryKeys] = BuiltinType.ASTRING;
+ }
+ else
+ {
+ fieldsNames[numSecondaryKeys] = "_file-name";
+ fieldsTypes[numSecondaryKeys] = BuiltinType.ASTRING;
+ }
+ fieldsNames[numSecondaryKeys+1] = "_byte-location";
+ fieldsTypes[numSecondaryKeys+1] = BuiltinType.AINT64;
+
+ if(isRCFile)
+ {
+ fieldsNames[numSecondaryKeys+2] = "_row-Number";
+ fieldsTypes[numSecondaryKeys+2] = BuiltinType.AINT32;
+ }
+
+ //return type
+ return new ARecordType(externalItemType.getTypeName(), fieldsNames, fieldsTypes, externalItemType.isOpen());
+ }
+
protected BTreeSearchOperatorDescriptor createPrimaryIndexScanOp(JobSpecification spec) throws AlgebricksException {
// -Infinity
int[] lowKeyFields = null;
// +Infinity
int[] highKeyFields = null;
+ ITransactionSubsystemProvider txnSubsystemProvider = new TransactionSubsystemProvider();
+ JobId jobId = JobIdFactory.generateJobId();
+ metadataProvider.setJobId(jobId);
+ boolean isWriteTransaction = metadataProvider.isWriteTransaction();
+ IJobletEventListenerFactory jobEventListenerFactory = new JobEventListenerFactory(jobId, isWriteTransaction);
+ spec.setJobletEventListenerFactory(jobEventListenerFactory);
+
+ ISearchOperationCallbackFactory searchCallbackFactory = new PrimaryIndexInstantSearchOperationCallbackFactory(
+ jobId, dataset.getDatasetId(), primaryBloomFilterKeyFields, txnSubsystemProvider,
+ ResourceType.LSM_BTREE);
AsterixStorageProperties storageProperties = propertiesProvider.getStorageProperties();
BTreeSearchOperatorDescriptor primarySearchOp = new BTreeSearchOperatorDescriptor(spec, primaryRecDesc,
- AsterixRuntimeComponentsProvider.NOINDEX_PROVIDER, AsterixRuntimeComponentsProvider.NOINDEX_PROVIDER,
+ AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
primaryFileSplitProvider, primaryRecDesc.getTypeTraits(), primaryComparatorFactories,
primaryBloomFilterKeyFields, lowKeyFields, highKeyFields, true, true,
new LSMBTreeDataflowHelperFactory(new AsterixVirtualBufferCacheProvider(dataset.getDatasetId()),
- AsterixRuntimeComponentsProvider.LSMBTREE_PRIMARY_PROVIDER,
- new PrimaryIndexOperationTrackerProvider(dataset.getDatasetId()),
- AsterixRuntimeComponentsProvider.LSMBTREE_PRIMARY_PROVIDER,
- AsterixRuntimeComponentsProvider.LSMBTREE_PRIMARY_PROVIDER, storageProperties
- .getBloomFilterFalsePositiveRate()), false,
- NoOpOperationCallbackFactory.INSTANCE);
+ AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, new PrimaryIndexOperationTrackerProvider(
+ dataset.getDatasetId()), AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
+ AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
+ storageProperties.getBloomFilterFalsePositiveRate()), false, searchCallbackFactory);
AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, primarySearchOp,
primaryPartitionConstraint);
return primarySearchOp;
@@ -349,11 +591,10 @@
fieldPermutation[i] = i;
}
TreeIndexBulkLoadOperatorDescriptor treeIndexBulkLoadOp = new TreeIndexBulkLoadOperatorDescriptor(spec,
- AsterixRuntimeComponentsProvider.LSMBTREE_SECONDARY_PROVIDER,
- AsterixRuntimeComponentsProvider.LSMBTREE_SECONDARY_PROVIDER, secondaryFileSplitProvider,
- secondaryRecDesc.getTypeTraits(), secondaryComparatorFactories, secondaryBloomFilterKeyFields,
- fieldPermutation, fillFactor, false, numElementsHint, false, dataflowHelperFactory,
- NoOpOperationCallbackFactory.INSTANCE);
+ AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
+ secondaryFileSplitProvider, secondaryRecDesc.getTypeTraits(), secondaryComparatorFactories,
+ secondaryBloomFilterKeyFields, fieldPermutation, fillFactor, false, numElementsHint, false,
+ dataflowHelperFactory, NoOpOperationCallbackFactory.INSTANCE);
AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, treeIndexBulkLoadOp,
secondaryPartitionConstraint);
return treeIndexBulkLoadOp;
@@ -392,3 +633,4 @@
return asterixSelectOp;
}
}
+
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryRTreeCreator.java b/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryRTreeCreator.java
index c91903f..6ce694c 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryRTreeCreator.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryRTreeCreator.java
@@ -228,7 +228,7 @@
// If any of the secondary fields are nullable, then add a select op that filters nulls.
AlgebricksMetaOperatorDescriptor selectOp = null;
if (anySecondaryKeyIsNullable) {
- selectOp = createFilterNullsSelectOp(spec, numSecondaryKeys,RIDScanOpAndConstraints.second);
+ selectOp = createFilterNullsSelectOp(spec, numSecondaryKeys);
}
// Create secondary RTree bulk load op.
@@ -237,13 +237,13 @@
spec,
numNestedSecondaryKeyFields,
new LSMRTreeDataflowHelperFactory(valueProviderFactories, RTreePolicyType.RTREE,
- primaryComparatorFactories, new AsterixVirtualBufferCacheProvider(dataset.getDatasetId()),
- AsterixRuntimeComponentsProvider.LSMRTREE_PROVIDER,
- AsterixRuntimeComponentsProvider.LSMRTREE_PROVIDER,
- AsterixRuntimeComponentsProvider.LSMRTREE_PROVIDER,
- AsterixRuntimeComponentsProvider.LSMRTREE_PROVIDER, AqlMetadataProvider.proposeLinearizer(
- keyType, secondaryComparatorFactories.length), storageProperties
- .getBloomFilterFalsePositiveRate()), BTree.DEFAULT_FILL_FACTOR);
+ primaryComparatorFactories, new AsterixVirtualBufferCacheProvider(dataset.getDatasetId()),
+ AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, new SecondaryIndexOperationTrackerProvider(
+ LSMRTreeIOOperationCallbackFactory.INSTANCE, dataset.getDatasetId()),
+ AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
+ AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, AqlMetadataProvider.proposeLinearizer(
+ keyType, secondaryComparatorFactories.length), storageProperties
+ .getBloomFilterFalsePositiveRate()), BTree.DEFAULT_FILL_FACTOR);
// Connect the operators.
// Create a hash partitioning connector
ExternalDatasetDetails edsd = (ExternalDatasetDetails)dataset.getDatasetDetails();
@@ -322,3 +322,4 @@
}
}
}
+
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataNode.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataNode.java
index 6509da4..81ce4f8 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataNode.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataNode.java
@@ -29,7 +29,6 @@
import edu.uci.ics.asterix.common.transactions.DatasetId;
import edu.uci.ics.asterix.common.transactions.IRecoveryManager.ResourceType;
import edu.uci.ics.asterix.common.transactions.ITransactionContext;
-import edu.uci.ics.asterix.common.transactions.ITransactionContext.TransactionType;
import edu.uci.ics.asterix.common.transactions.ITransactionSubsystem;
import edu.uci.ics.asterix.common.transactions.JobId;
import edu.uci.ics.asterix.formats.nontagged.AqlSerializerDeserializerProvider;
@@ -1250,6 +1249,22 @@
throw new MetadataException(e);
}
}
+
+ @Override
+ public void addExternalDatasetFile(JobId jobId, ExternalFile externalFile)
+ throws MetadataException, RemoteException {
+ try {
+ // Insert into the 'externalFiles' dataset.
+ ExternalFileTupleTranslator tupleReaderWriter = new ExternalFileTupleTranslator(true);
+ ITupleReference externalFileTuple = tupleReaderWriter.getTupleFromMetadataEntity(externalFile);
+ insertTupleIntoIndex(jobId, MetadataPrimaryIndexes.EXTERNAL_FILE_DATASET, externalFileTuple);
+ } catch (TreeIndexDuplicateKeyException e) {
+ throw new MetadataException("An externalFile with this number " + externalFile.getFileNumber()
+ + " already exists in dataset '" + externalFile.getDatasetName() + "' in dataverse '"+externalFile.getDataverseName()+"'.", e);
+ } catch (Exception e) {
+ throw new MetadataException(e);
+ }
+ }
@Override
@@ -1257,3 +1272,4 @@
return DatasetIdFactory.getMostRecentDatasetId();
}
}
+
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java
index f0c3a63..078b340d 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java
@@ -178,22 +178,6 @@
private static final Map<String, String> adapterFactoryMapping = initializeAdapterFactoryMapping();
private static Scheduler hdfsScheduler;
- public String getPropertyValue(String propertyName) {
- return config.get(propertyName);
- }
-
- public void setConfig(Map<String, String> config) {
- this.config = config;
- }
-
- public Map<String, String[]> getAllStores() {
- return stores;
- }
-
- public Map<String, String> getConfig() {
- return config;
- }
-
public AqlMetadataProvider(Dataverse defaultDataverse) {
this.defaultDataverse = defaultDataverse;
this.stores = AsterixAppContextInfo.getInstance().getMetadataProperties().getStores();
@@ -780,15 +764,17 @@
}
ISearchOperationCallbackFactory searchCallbackFactory = null;
searchCallbackFactory = new SecondaryIndexSearchOperationCallbackFactory();
- AsterixRuntimeComponentsProvider rtcProvider = AsterixRuntimeComponentsProvider.LSMBTREE_SECONDARY_PROVIDER;
+ AsterixRuntimeComponentsProvider rtcProvider = AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER;
BTreeSearchOperatorDescriptor btreeSearchOp = new BTreeSearchOperatorDescriptor(jobSpec, outputRecDesc,
- appContext.getStorageManagerInterface(), appContext.getIndexLifecycleManagerProvider(), spPc.first,
- typeTraits, comparatorFactories, bloomFilterKeyFields, lowKeyFields, highKeyFields,
- lowKeyInclusive, highKeyInclusive, new LSMBTreeDataflowHelperFactory(
- new AsterixVirtualBufferCacheProvider(dataset.getDatasetId()), rtcProvider,
- AsterixRuntimeComponentsProvider.LSMBTREE_SECONDARY_PROVIDER, rtcProvider,
- rtcProvider, storageProperties.getBloomFilterFalsePositiveRate()), retainInput,
- searchCallbackFactory);
+ appContext.getStorageManagerInterface(), appContext.getIndexLifecycleManagerProvider(), spPc.first,
+ typeTraits, comparatorFactories, bloomFilterKeyFields, lowKeyFields, highKeyFields,
+ lowKeyInclusive, highKeyInclusive, new LSMBTreeDataflowHelperFactory(
+ new AsterixVirtualBufferCacheProvider(dataset.getDatasetId()), rtcProvider,
+ isSecondary ? new SecondaryIndexOperationTrackerProvider(
+ LSMBTreeIOOperationCallbackFactory.INSTANCE, dataset.getDatasetId())
+ : new PrimaryIndexOperationTrackerProvider(dataset.getDatasetId()), rtcProvider,
+ rtcProvider, storageProperties.getBloomFilterFalsePositiveRate()), retainInput,
+ searchCallbackFactory);
return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(btreeSearchOp, spPc.second);
} catch (MetadataException me) {
throw new AlgebricksException(me);
@@ -936,10 +922,10 @@
typeTraits, comparatorFactories, keyFields, new LSMRTreeDataflowHelperFactory(
valueProviderFactories, RTreePolicyType.RTREE, primaryComparatorFactories,
new AsterixVirtualBufferCacheProvider(dataset.getDatasetId()),
- AsterixRuntimeComponentsProvider.LSMRTREE_PROVIDER,
- AsterixRuntimeComponentsProvider.LSMRTREE_PROVIDER,
- AsterixRuntimeComponentsProvider.LSMRTREE_PROVIDER,
- AsterixRuntimeComponentsProvider.LSMRTREE_PROVIDER, proposeLinearizer(
+ AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
+ new SecondaryIndexOperationTrackerProvider(LSMRTreeIOOperationCallbackFactory.INSTANCE,
+ dataset.getDatasetId()), AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
+ AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, proposeLinearizer(
nestedKeyType.getTypeTag(), comparatorFactories.length),
storageProperties.getBloomFilterFalsePositiveRate()), retainInput, searchCallbackFactory);
return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(rtreeSearchOp, spPc.second);
@@ -1162,10 +1148,10 @@
appContext.getIndexLifecycleManagerProvider(), splitsAndConstraint.first, typeTraits,
comparatorFactories, bloomFilterKeyFields, fieldPermutation, indexOp,
new LSMBTreeDataflowHelperFactory(new AsterixVirtualBufferCacheProvider(datasetId),
- AsterixRuntimeComponentsProvider.LSMBTREE_PRIMARY_PROVIDER,
+ AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
new PrimaryIndexOperationTrackerProvider(dataset.getDatasetId()),
- AsterixRuntimeComponentsProvider.LSMBTREE_PRIMARY_PROVIDER,
- AsterixRuntimeComponentsProvider.LSMBTREE_PRIMARY_PROVIDER, storageProperties
+ AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
+ AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, storageProperties
.getBloomFilterFalsePositiveRate()), null, modificationCallbackFactory, true);
return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(insertDeleteOp,
@@ -1924,3 +1910,4 @@
}
}
+