Took care of Vinayak's review.
git-svn-id: https://asterixdb.googlecode.com/svn/branches/asterix-fix-issue-9@326 eaa15691-b419-025a-1212-ee371bd00084
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/file/IndexOperations.java b/asterix-app/src/main/java/edu/uci/ics/asterix/file/IndexOperations.java
index 103e429..7e8ede1 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/file/IndexOperations.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/file/IndexOperations.java
@@ -57,52 +57,4 @@
return spec;
}
-
- public static void main(String[] args) throws Exception {
- String host;
- String appName;
- String ddlFile;
-
- switch (args.length) {
- case 0: {
- host = "127.0.0.1";
- appName = "asterix";
- ddlFile = "/home/abehm/workspace/asterix/asterix-app/src/test/resources/demo0927/local/create-index.aql";
- System.out.println("No arguments specified, using defauls:");
- System.out.println("HYRACKS HOST: " + host);
- System.out.println("APPNAME: " + appName);
- System.out.println("DDLFILE: " + ddlFile);
- }
- break;
-
- case 3: {
- host = args[0];
- appName = args[1];
- ddlFile = args[2];
- }
- break;
-
- default: {
- System.out.println("USAGE:");
- System.out.println("ARG 1: Hyracks Host (IP or Hostname)");
- System.out.println("ARG 2: Application Name (e.g., asterix)");
- System.out.println("ARG 3: DDL File");
- host = null;
- appName = null;
- ddlFile = null;
- System.exit(0);
- }
- break;
-
- }
-
- // int port = HyracksIntegrationUtil.DEFAULT_HYRACKS_CC_PORT;
-
- // AsterixJavaClient q = compileQuery(ddlFile, true, false, true);
-
- // long start = System.currentTimeMillis();
- // q.execute(port);
- // long end = System.currentTimeMillis();
- // System.err.println(start + " " + end + " " + (end - start));
- }
-}
+}
\ No newline at end of file
diff --git a/asterix-hyracks-glue/src/main/java/edu/uci/ics/asterix/runtime/transaction/TreeIndexInsertUpdateDeleteOperatorDescriptor.java b/asterix-hyracks-glue/src/main/java/edu/uci/ics/asterix/runtime/transaction/TreeIndexInsertUpdateDeleteOperatorDescriptor.java
index 02b688b..7d1f64f 100644
--- a/asterix-hyracks-glue/src/main/java/edu/uci/ics/asterix/runtime/transaction/TreeIndexInsertUpdateDeleteOperatorDescriptor.java
+++ b/asterix-hyracks-glue/src/main/java/edu/uci/ics/asterix/runtime/transaction/TreeIndexInsertUpdateDeleteOperatorDescriptor.java
@@ -46,36 +46,32 @@
private final long transactionId;
- /*
- * TODO: Index operators should live in Hyracks. Right now, they are needed
- * here in Asterix as a hack to provide transactionIDs. The Asterix verions
- * of this operator will disappear and the operator will come from Hyracks
- * once the LSM/Recovery/Transactions world has been introduced.
- */
- public TreeIndexInsertUpdateDeleteOperatorDescriptor(JobSpecification spec,
- RecordDescriptor recDesc, IStorageManagerInterface storageManager,
- IIndexRegistryProvider<IIndex> indexRegistryProvider,
- IFileSplitProvider fileSplitProvider, ITypeTraits[] typeTraits,
- IBinaryComparatorFactory[] comparatorFactories,
- int[] fieldPermutation, IndexOp op,
- IIndexDataflowHelperFactory dataflowHelperFactory,
- ITupleFilterFactory tupleFilterFactory,
- IOperationCallbackProvider opCallbackProvider, long transactionId) {
- super(spec, 1, 1, recDesc, storageManager, indexRegistryProvider,
- fileSplitProvider, typeTraits, comparatorFactories,
- dataflowHelperFactory, tupleFilterFactory, opCallbackProvider);
- this.fieldPermutation = fieldPermutation;
- this.op = op;
- this.transactionId = transactionId;
- }
+ /**
+ * TODO: Index operators should live in Hyracks. Right now, they are needed
+ * here in Asterix as a hack to provide transactionIDs. The Asterix verions
+ * of this operator will disappear and the operator will come from Hyracks
+ * once the LSM/Recovery/Transactions world has been introduced.
+ */
+ public TreeIndexInsertUpdateDeleteOperatorDescriptor(JobSpecification spec, RecordDescriptor recDesc,
+ IStorageManagerInterface storageManager, IIndexRegistryProvider<IIndex> indexRegistryProvider,
+ IFileSplitProvider fileSplitProvider, ITypeTraits[] typeTraits,
+ IBinaryComparatorFactory[] comparatorFactories, int[] fieldPermutation, IndexOp op,
+ IIndexDataflowHelperFactory dataflowHelperFactory, ITupleFilterFactory tupleFilterFactory,
+ IOperationCallbackProvider opCallbackProvider, long transactionId) {
+ super(spec, 1, 1, recDesc, storageManager, indexRegistryProvider, fileSplitProvider, typeTraits,
+ comparatorFactories, dataflowHelperFactory, tupleFilterFactory, opCallbackProvider);
+ this.fieldPermutation = fieldPermutation;
+ this.op = op;
+ this.transactionId = transactionId;
+ }
@Override
public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx,
IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) {
TransactionContext txnContext;
try {
- ITransactionManager transactionManager = ((AsterixAppRuntimeContext) ctx.getJobletContext().getApplicationContext()
- .getApplicationObject()).getTransactionProvider().getTransactionManager();
+ ITransactionManager transactionManager = ((AsterixAppRuntimeContext) ctx.getJobletContext()
+ .getApplicationContext().getApplicationObject()).getTransactionProvider().getTransactionManager();
txnContext = transactionManager.getTransactionContext(transactionId);
} catch (ACIDException ae) {
throw new RuntimeException(" could not obtain context for invalid transaction id " + transactionId);
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java
index 856028c..d8e340c 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java
@@ -89,1011 +89,839 @@
import edu.uci.ics.hyracks.storage.am.rtree.dataflow.RTreeDataflowHelperFactory;
import edu.uci.ics.hyracks.storage.am.rtree.dataflow.RTreeSearchOperatorDescriptor;
-public class AqlMetadataProvider implements
- IMetadataProvider<AqlSourceId, String> {
- private final long txnId;
- private boolean isWriteTransaction;
- private final AqlCompiledMetadataDeclarations metadata;
+public class AqlMetadataProvider implements IMetadataProvider<AqlSourceId, String> {
+ private final long txnId;
+ private boolean isWriteTransaction;
+ private final AqlCompiledMetadataDeclarations metadata;
- public AqlMetadataProvider(long txnId, boolean isWriteTransaction,
- AqlCompiledMetadataDeclarations metadata) {
- this.txnId = txnId;
- this.isWriteTransaction = isWriteTransaction;
- this.metadata = metadata;
- }
+ public AqlMetadataProvider(long txnId, boolean isWriteTransaction, AqlCompiledMetadataDeclarations metadata) {
+ this.txnId = txnId;
+ this.isWriteTransaction = isWriteTransaction;
+ this.metadata = metadata;
+ }
- @Override
- public AqlDataSource findDataSource(AqlSourceId id)
- throws AlgebricksException {
- AqlSourceId aqlId = (AqlSourceId) id;
- return lookupSourceInMetadata(metadata, aqlId);
- }
+ @Override
+ public AqlDataSource findDataSource(AqlSourceId id) throws AlgebricksException {
+ AqlSourceId aqlId = (AqlSourceId) id;
+ return lookupSourceInMetadata(metadata, aqlId);
+ }
- public AqlCompiledMetadataDeclarations getMetadataDeclarations() {
- return metadata;
- }
+ public AqlCompiledMetadataDeclarations getMetadataDeclarations() {
+ return metadata;
+ }
- public boolean isWriteTransaction() {
- return isWriteTransaction;
- }
+ public boolean isWriteTransaction() {
+ return isWriteTransaction;
+ }
- @Override
- public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getScannerRuntime(
- IDataSource<AqlSourceId> dataSource,
- List<LogicalVariable> scanVariables,
- List<LogicalVariable> projectVariables, boolean projectPushed,
- JobGenContext context, JobSpecification jobSpec)
- throws AlgebricksException {
- AqlCompiledDatasetDecl adecl = metadata.findDataset(dataSource.getId()
- .getDatasetName());
- if (adecl == null) {
- throw new AlgebricksException("Unknown dataset "
- + dataSource.getId().getDatasetName());
- }
- switch (adecl.getDatasetType()) {
- case FEED:
- if (dataSource instanceof ExternalFeedDataSource) {
- return buildExternalDatasetScan(jobSpec, adecl, dataSource);
- } else {
- return buildInternalDatasetScan(jobSpec, adecl, dataSource,
- context);
- }
+ @Override
+ public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getScannerRuntime(
+ IDataSource<AqlSourceId> dataSource, List<LogicalVariable> scanVariables,
+ List<LogicalVariable> projectVariables, boolean projectPushed, JobGenContext context,
+ JobSpecification jobSpec) throws AlgebricksException {
+ AqlCompiledDatasetDecl adecl = metadata.findDataset(dataSource.getId().getDatasetName());
+ if (adecl == null) {
+ throw new AlgebricksException("Unknown dataset " + dataSource.getId().getDatasetName());
+ }
+ switch (adecl.getDatasetType()) {
+ case FEED:
+ if (dataSource instanceof ExternalFeedDataSource) {
+ return buildExternalDatasetScan(jobSpec, adecl, dataSource);
+ } else {
+ return buildInternalDatasetScan(jobSpec, adecl, dataSource, context);
+ }
- case INTERNAL: {
- return buildInternalDatasetScan(jobSpec, adecl, dataSource, context);
- }
+ case INTERNAL: {
+ return buildInternalDatasetScan(jobSpec, adecl, dataSource, context);
+ }
- case EXTERNAL: {
- return buildExternalDatasetScan(jobSpec, adecl, dataSource);
- }
+ case EXTERNAL: {
+ return buildExternalDatasetScan(jobSpec, adecl, dataSource);
+ }
- default: {
- throw new IllegalArgumentException();
- }
- }
- }
+ default: {
+ throw new IllegalArgumentException();
+ }
+ }
+ }
- private Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> buildInternalDatasetScan(
- JobSpecification jobSpec, AqlCompiledDatasetDecl acedl,
- IDataSource<AqlSourceId> dataSource, JobGenContext context)
- throws AlgebricksException {
- AqlSourceId asid = dataSource.getId();
- String datasetName = asid.getDatasetName();
- String indexName = DatasetUtils.getPrimaryIndex(acedl).getIndexName();
+ private Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> buildInternalDatasetScan(JobSpecification jobSpec,
+ AqlCompiledDatasetDecl acedl, IDataSource<AqlSourceId> dataSource, JobGenContext context)
+ throws AlgebricksException {
+ AqlSourceId asid = dataSource.getId();
+ String datasetName = asid.getDatasetName();
+ String indexName = DatasetUtils.getPrimaryIndex(acedl).getIndexName();
- try {
- return buildBtreeRuntime(metadata, context, jobSpec, datasetName,
- acedl, indexName, null, null, true, true);
- } catch (AlgebricksException e) {
- throw new AlgebricksException(e);
- }
- }
+ try {
+ return buildBtreeRuntime(metadata, context, jobSpec, datasetName, acedl, indexName, null, null, true, true);
+ } catch (AlgebricksException e) {
+ throw new AlgebricksException(e);
+ }
+ }
- private Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> buildExternalDatasetScan(
- JobSpecification jobSpec, AqlCompiledDatasetDecl acedl,
- IDataSource<AqlSourceId> dataSource) throws AlgebricksException {
- String itemTypeName = acedl.getItemTypeName();
- IAType itemType;
- try {
- itemType = metadata.findType(itemTypeName);
- } catch (Exception e) {
- throw new AlgebricksException(e);
- }
+ private Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> buildExternalDatasetScan(JobSpecification jobSpec,
+ AqlCompiledDatasetDecl acedl, IDataSource<AqlSourceId> dataSource) throws AlgebricksException {
+ String itemTypeName = acedl.getItemTypeName();
+ IAType itemType;
+ try {
+ itemType = metadata.findType(itemTypeName);
+ } catch (Exception e) {
+ throw new AlgebricksException(e);
+ }
- if (dataSource instanceof ExternalFeedDataSource) {
- AqlCompiledFeedDatasetDetails acfdd = (AqlCompiledFeedDatasetDetails) ((ExternalFeedDataSource) dataSource)
- .getCompiledDatasetDecl().getAqlCompiledDatasetDetails();
+ if (dataSource instanceof ExternalFeedDataSource) {
+ AqlCompiledFeedDatasetDetails acfdd = (AqlCompiledFeedDatasetDetails) ((ExternalFeedDataSource) dataSource)
+ .getCompiledDatasetDecl().getAqlCompiledDatasetDetails();
- return buildFeedIntakeRuntime(jobSpec, metadata.getDataverseName(),
- acedl.getName(), itemType, acfdd, metadata.getFormat());
- } else {
- return buildExternalDataScannerRuntime(jobSpec, itemType,
- (AqlCompiledExternalDatasetDetails) acedl
- .getAqlCompiledDatasetDetails(), metadata
- .getFormat());
- }
- }
+ return buildFeedIntakeRuntime(jobSpec, metadata.getDataverseName(), acedl.getName(), itemType, acfdd,
+ metadata.getFormat());
+ } else {
+ return buildExternalDataScannerRuntime(jobSpec, itemType,
+ (AqlCompiledExternalDatasetDetails) acedl.getAqlCompiledDatasetDetails(), metadata.getFormat());
+ }
+ }
- @SuppressWarnings("rawtypes")
- public static Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> buildExternalDataScannerRuntime(
- JobSpecification jobSpec, IAType itemType,
- AqlCompiledExternalDatasetDetails decl, IDataFormat format)
- throws AlgebricksException {
- if (itemType.getTypeTag() != ATypeTag.RECORD) {
- throw new AlgebricksException("Can only scan datasets of records.");
- }
+ @SuppressWarnings("rawtypes")
+ public static Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> buildExternalDataScannerRuntime(
+ JobSpecification jobSpec, IAType itemType, AqlCompiledExternalDatasetDetails decl, IDataFormat format)
+ throws AlgebricksException {
+ if (itemType.getTypeTag() != ATypeTag.RECORD) {
+ throw new AlgebricksException("Can only scan datasets of records.");
+ }
- IDatasourceReadAdapter adapter;
- try {
- adapter = (IDatasourceReadAdapter) Class.forName(decl.getAdapter())
- .newInstance();
- } catch (Exception e) {
- e.printStackTrace();
- throw new AlgebricksException("unable to load the adapter class "
- + e);
- }
+ IDatasourceReadAdapter adapter;
+ try {
+ adapter = (IDatasourceReadAdapter) Class.forName(decl.getAdapter()).newInstance();
+ } catch (Exception e) {
+ e.printStackTrace();
+ throw new AlgebricksException("unable to load the adapter class " + e);
+ }
- if (!(adapter.getAdapterType().equals(
- IDatasourceAdapter.AdapterType.READ) || adapter
- .getAdapterType().equals(
- IDatasourceAdapter.AdapterType.READ_WRITE))) {
- throw new AlgebricksException(
- "external dataset adapter does not support read operation");
- }
- ARecordType rt = (ARecordType) itemType;
+ if (!(adapter.getAdapterType().equals(IDatasourceAdapter.AdapterType.READ) || adapter.getAdapterType().equals(
+ IDatasourceAdapter.AdapterType.READ_WRITE))) {
+ throw new AlgebricksException("external dataset adapter does not support read operation");
+ }
+ ARecordType rt = (ARecordType) itemType;
- try {
- adapter.configure(decl.getProperties(), itemType);
- } catch (Exception e) {
- e.printStackTrace();
- throw new AlgebricksException(
- "unable to configure the datasource adapter " + e);
- }
+ try {
+ adapter.configure(decl.getProperties(), itemType);
+ } catch (Exception e) {
+ e.printStackTrace();
+ throw new AlgebricksException("unable to configure the datasource adapter " + e);
+ }
- ISerializerDeserializer payloadSerde = format.getSerdeProvider()
- .getSerializerDeserializer(itemType);
- RecordDescriptor scannerDesc = new RecordDescriptor(
- new ISerializerDeserializer[] { payloadSerde });
+ ISerializerDeserializer payloadSerde = format.getSerdeProvider().getSerializerDeserializer(itemType);
+ RecordDescriptor scannerDesc = new RecordDescriptor(new ISerializerDeserializer[] { payloadSerde });
- ExternalDataScanOperatorDescriptor dataScanner = new ExternalDataScanOperatorDescriptor(
- jobSpec, decl.getAdapter(), decl.getProperties(), rt,
- scannerDesc);
- dataScanner.setDatasourceAdapter(adapter);
- AlgebricksPartitionConstraint constraint = adapter
- .getPartitionConstraint();
- return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(
- dataScanner, constraint);
- }
+ ExternalDataScanOperatorDescriptor dataScanner = new ExternalDataScanOperatorDescriptor(jobSpec,
+ decl.getAdapter(), decl.getProperties(), rt, scannerDesc);
+ dataScanner.setDatasourceAdapter(adapter);
+ AlgebricksPartitionConstraint constraint = adapter.getPartitionConstraint();
+ return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(dataScanner, constraint);
+ }
- @SuppressWarnings("rawtypes")
- public static Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> buildScannerRuntime(
- JobSpecification jobSpec, IAType itemType,
- IParseFileSplitsDecl decl, IDataFormat format)
- throws AlgebricksException {
- if (itemType.getTypeTag() != ATypeTag.RECORD) {
- throw new AlgebricksException("Can only scan datasets of records.");
- }
- ARecordType rt = (ARecordType) itemType;
- ITupleParserFactory tupleParser = format.createTupleParser(rt, decl);
- FileSplit[] splits = decl.getSplits();
- IFileSplitProvider scannerSplitProvider = new ConstantFileSplitProvider(
- splits);
- ISerializerDeserializer payloadSerde = format.getSerdeProvider()
- .getSerializerDeserializer(itemType);
- RecordDescriptor scannerDesc = new RecordDescriptor(
- new ISerializerDeserializer[] { payloadSerde });
- IOperatorDescriptor scanner = new FileScanOperatorDescriptor(jobSpec,
- scannerSplitProvider, tupleParser, scannerDesc);
- String[] locs = new String[splits.length];
- for (int i = 0; i < splits.length; i++) {
- locs[i] = splits[i].getNodeName();
- }
- AlgebricksPartitionConstraint apc = new AlgebricksAbsolutePartitionConstraint(
- locs);
- return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(
- scanner, apc);
- }
+ @SuppressWarnings("rawtypes")
+ public static Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> buildScannerRuntime(
+ JobSpecification jobSpec, IAType itemType, IParseFileSplitsDecl decl, IDataFormat format)
+ throws AlgebricksException {
+ if (itemType.getTypeTag() != ATypeTag.RECORD) {
+ throw new AlgebricksException("Can only scan datasets of records.");
+ }
+ ARecordType rt = (ARecordType) itemType;
+ ITupleParserFactory tupleParser = format.createTupleParser(rt, decl);
+ FileSplit[] splits = decl.getSplits();
+ IFileSplitProvider scannerSplitProvider = new ConstantFileSplitProvider(splits);
+ ISerializerDeserializer payloadSerde = format.getSerdeProvider().getSerializerDeserializer(itemType);
+ RecordDescriptor scannerDesc = new RecordDescriptor(new ISerializerDeserializer[] { payloadSerde });
+ IOperatorDescriptor scanner = new FileScanOperatorDescriptor(jobSpec, scannerSplitProvider, tupleParser,
+ scannerDesc);
+ String[] locs = new String[splits.length];
+ for (int i = 0; i < splits.length; i++) {
+ locs[i] = splits[i].getNodeName();
+ }
+ AlgebricksPartitionConstraint apc = new AlgebricksAbsolutePartitionConstraint(locs);
+ return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(scanner, apc);
+ }
- @SuppressWarnings("rawtypes")
- public static Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> buildFeedIntakeRuntime(
- JobSpecification jobSpec, String dataverse, String dataset,
- IAType itemType, AqlCompiledFeedDatasetDetails decl,
- IDataFormat format) throws AlgebricksException {
- if (itemType.getTypeTag() != ATypeTag.RECORD) {
- throw new AlgebricksException("Can only consume records.");
- }
- IDatasourceAdapter adapter;
- try {
- adapter = (IDatasourceAdapter) Class.forName(decl.getAdapter())
- .newInstance();
- } catch (Exception e) {
- e.printStackTrace();
- throw new AlgebricksException("unable to load the adapter class "
- + e);
- }
+ @SuppressWarnings("rawtypes")
+ public static Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> buildFeedIntakeRuntime(
+ JobSpecification jobSpec, String dataverse, String dataset, IAType itemType,
+ AqlCompiledFeedDatasetDetails decl, IDataFormat format) throws AlgebricksException {
+ if (itemType.getTypeTag() != ATypeTag.RECORD) {
+ throw new AlgebricksException("Can only consume records.");
+ }
+ IDatasourceAdapter adapter;
+ try {
+ adapter = (IDatasourceAdapter) Class.forName(decl.getAdapter()).newInstance();
+ } catch (Exception e) {
+ e.printStackTrace();
+ throw new AlgebricksException("unable to load the adapter class " + e);
+ }
- ARecordType rt = (ARecordType) itemType;
- try {
- adapter.configure(decl.getProperties(), itemType);
- } catch (Exception e) {
- e.printStackTrace();
- throw new AlgebricksException(
- "unable to configure the datasource adapter " + e);
- }
+ ARecordType rt = (ARecordType) itemType;
+ try {
+ adapter.configure(decl.getProperties(), itemType);
+ } catch (Exception e) {
+ e.printStackTrace();
+ throw new AlgebricksException("unable to configure the datasource adapter " + e);
+ }
- ISerializerDeserializer payloadSerde = format.getSerdeProvider()
- .getSerializerDeserializer(itemType);
- RecordDescriptor feedDesc = new RecordDescriptor(
- new ISerializerDeserializer[] { payloadSerde });
+ ISerializerDeserializer payloadSerde = format.getSerdeProvider().getSerializerDeserializer(itemType);
+ RecordDescriptor feedDesc = new RecordDescriptor(new ISerializerDeserializer[] { payloadSerde });
- FeedIntakeOperatorDescriptor feedIngestor = new FeedIntakeOperatorDescriptor(
- jobSpec, new FeedId(dataverse, dataset), decl.getAdapter(),
- decl.getProperties(), rt, feedDesc);
+ FeedIntakeOperatorDescriptor feedIngestor = new FeedIntakeOperatorDescriptor(jobSpec, new FeedId(dataverse,
+ dataset), decl.getAdapter(), decl.getProperties(), rt, feedDesc);
- AlgebricksPartitionConstraint constraint = adapter
- .getPartitionConstraint();
- return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(
- feedIngestor, constraint);
- }
+ AlgebricksPartitionConstraint constraint = adapter.getPartitionConstraint();
+ return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(feedIngestor, constraint);
+ }
- @SuppressWarnings("rawtypes")
- public static Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> buildFeedMessengerRuntime(
- JobSpecification jobSpec, AqlCompiledMetadataDeclarations metadata,
- AqlCompiledFeedDatasetDetails decl, String dataverse,
- String dataset, List<IFeedMessage> feedMessages)
- throws AlgebricksException {
+ @SuppressWarnings("rawtypes")
+ public static Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> buildFeedMessengerRuntime(
+ JobSpecification jobSpec, AqlCompiledMetadataDeclarations metadata, AqlCompiledFeedDatasetDetails decl,
+ String dataverse, String dataset, List<IFeedMessage> feedMessages) throws AlgebricksException {
- Pair<IFileSplitProvider, AlgebricksPartitionConstraint> spPc;
- try {
- spPc = metadata
- .splitProviderAndPartitionConstraintsForInternalOrFeedDataset(
- dataset, dataset);
- } catch (Exception e) {
- throw new AlgebricksException(e);
- }
+ Pair<IFileSplitProvider, AlgebricksPartitionConstraint> spPc;
+ try {
+ spPc = metadata.splitProviderAndPartitionConstraintsForInternalOrFeedDataset(dataset, dataset);
+ } catch (Exception e) {
+ throw new AlgebricksException(e);
+ }
- FeedMessageOperatorDescriptor feedMessenger = new FeedMessageOperatorDescriptor(
- jobSpec, dataverse, dataset, feedMessages);
+ FeedMessageOperatorDescriptor feedMessenger = new FeedMessageOperatorDescriptor(jobSpec, dataverse, dataset,
+ feedMessages);
- return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(
- feedMessenger, spPc.second);
- }
+ return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(feedMessenger, spPc.second);
+ }
- @SuppressWarnings("rawtypes")
- public static Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> buildBtreeRuntime(
- AqlCompiledMetadataDeclarations metadata, JobGenContext context,
- JobSpecification jobSpec, String datasetName,
- AqlCompiledDatasetDecl ddecl, String indexName, int[] lowKeyFields,
- int[] highKeyFields, boolean lowKeyInclusive,
- boolean highKeyInclusive) throws AlgebricksException {
- String itemTypeName = ddecl.getItemTypeName();
- IAType itemType;
- try {
- itemType = metadata.findType(itemTypeName);
- } catch (Exception e) {
- throw new AlgebricksException(e);
- }
+ @SuppressWarnings("rawtypes")
+ public static Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> buildBtreeRuntime(
+ AqlCompiledMetadataDeclarations metadata, JobGenContext context, JobSpecification jobSpec,
+ String datasetName, AqlCompiledDatasetDecl ddecl, String indexName, int[] lowKeyFields,
+ int[] highKeyFields, boolean lowKeyInclusive, boolean highKeyInclusive) throws AlgebricksException {
+ String itemTypeName = ddecl.getItemTypeName();
+ IAType itemType;
+ try {
+ itemType = metadata.findType(itemTypeName);
+ } catch (Exception e) {
+ throw new AlgebricksException(e);
+ }
- boolean isSecondary = true;
- AqlCompiledIndexDecl primIdxDecl = DatasetUtils.getPrimaryIndex(ddecl);
+ boolean isSecondary = true;
+ AqlCompiledIndexDecl primIdxDecl = DatasetUtils.getPrimaryIndex(ddecl);
- if (primIdxDecl != null) {
- isSecondary = !indexName.equals(primIdxDecl.getIndexName());
- }
+ if (primIdxDecl != null) {
+ isSecondary = !indexName.equals(primIdxDecl.getIndexName());
+ }
- int numPrimaryKeys = DatasetUtils.getPartitioningFunctions(ddecl)
- .size();
- ISerializerDeserializer[] recordFields;
- IBinaryComparatorFactory[] comparatorFactories;
- ITypeTraits[] typeTraits;
- int numSecondaryKeys = 0;
- int i = 0;
- if (isSecondary) {
- AqlCompiledIndexDecl cid = DatasetUtils.findSecondaryIndexByName(
- ddecl, indexName);
- if (cid == null) {
- throw new AlgebricksException(
- "Code generation error: no index " + indexName
- + " for dataset " + datasetName);
- }
- List<String> secondaryKeyFields = cid.getFieldExprs();
- numSecondaryKeys = secondaryKeyFields.size();
- int numKeys = numSecondaryKeys + numPrimaryKeys;
- recordFields = new ISerializerDeserializer[numKeys];
- typeTraits = new ITypeTraits[numKeys];
- // comparatorFactories = new
- // IBinaryComparatorFactory[numSecondaryKeys];
- comparatorFactories = new IBinaryComparatorFactory[numKeys];
- if (itemType.getTypeTag() != ATypeTag.RECORD) {
- throw new AlgebricksException(
- "Only record types can be indexed.");
- }
- ARecordType recType = (ARecordType) itemType;
- for (i = 0; i < numSecondaryKeys; i++) {
- Pair<IAType, Boolean> keyTypePair = AqlCompiledIndexDecl.getNonNullableKeyFieldType(secondaryKeyFields.get(i), recType);
- IAType keyType = keyTypePair.first;
- ISerializerDeserializer keySerde = metadata.getFormat()
- .getSerdeProvider().getSerializerDeserializer(keyType);
- recordFields[i] = keySerde;
- comparatorFactories[i] = AqlBinaryComparatorFactoryProvider.INSTANCE
- .getBinaryComparatorFactory(keyType, true);
- typeTraits[i] = AqlTypeTraitProvider.INSTANCE
- .getTypeTrait(keyType);
- }
- } else {
- recordFields = new ISerializerDeserializer[numPrimaryKeys + 1];
- comparatorFactories = new IBinaryComparatorFactory[numPrimaryKeys];
- typeTraits = new ITypeTraits[numPrimaryKeys + 1];
- ISerializerDeserializer payloadSerde = metadata.getFormat()
- .getSerdeProvider().getSerializerDeserializer(itemType);
- recordFields[numPrimaryKeys] = payloadSerde;
- typeTraits[numPrimaryKeys] = AqlTypeTraitProvider.INSTANCE
- .getTypeTrait(itemType);
- }
+ int numPrimaryKeys = DatasetUtils.getPartitioningFunctions(ddecl).size();
+ ISerializerDeserializer[] recordFields;
+ IBinaryComparatorFactory[] comparatorFactories;
+ ITypeTraits[] typeTraits;
+ int numSecondaryKeys = 0;
+ int i = 0;
+ if (isSecondary) {
+ AqlCompiledIndexDecl cid = DatasetUtils.findSecondaryIndexByName(ddecl, indexName);
+ if (cid == null) {
+ throw new AlgebricksException("Code generation error: no index " + indexName + " for dataset "
+ + datasetName);
+ }
+ List<String> secondaryKeyFields = cid.getFieldExprs();
+ numSecondaryKeys = secondaryKeyFields.size();
+ int numKeys = numSecondaryKeys + numPrimaryKeys;
+ recordFields = new ISerializerDeserializer[numKeys];
+ typeTraits = new ITypeTraits[numKeys];
+ // comparatorFactories = new
+ // IBinaryComparatorFactory[numSecondaryKeys];
+ comparatorFactories = new IBinaryComparatorFactory[numKeys];
+ if (itemType.getTypeTag() != ATypeTag.RECORD) {
+ throw new AlgebricksException("Only record types can be indexed.");
+ }
+ ARecordType recType = (ARecordType) itemType;
+ for (i = 0; i < numSecondaryKeys; i++) {
+ Pair<IAType, Boolean> keyTypePair = AqlCompiledIndexDecl.getNonNullableKeyFieldType(
+ secondaryKeyFields.get(i), recType);
+ IAType keyType = keyTypePair.first;
+ ISerializerDeserializer keySerde = metadata.getFormat().getSerdeProvider()
+ .getSerializerDeserializer(keyType);
+ recordFields[i] = keySerde;
+ comparatorFactories[i] = AqlBinaryComparatorFactoryProvider.INSTANCE.getBinaryComparatorFactory(
+ keyType, true);
+ typeTraits[i] = AqlTypeTraitProvider.INSTANCE.getTypeTrait(keyType);
+ }
+ } else {
+ recordFields = new ISerializerDeserializer[numPrimaryKeys + 1];
+ comparatorFactories = new IBinaryComparatorFactory[numPrimaryKeys];
+ typeTraits = new ITypeTraits[numPrimaryKeys + 1];
+ ISerializerDeserializer payloadSerde = metadata.getFormat().getSerdeProvider()
+ .getSerializerDeserializer(itemType);
+ recordFields[numPrimaryKeys] = payloadSerde;
+ typeTraits[numPrimaryKeys] = AqlTypeTraitProvider.INSTANCE.getTypeTrait(itemType);
+ }
- for (Triple<IEvaluatorFactory, ScalarFunctionCallExpression, IAType> evalFactoryAndType : DatasetUtils
- .getPartitioningFunctions(ddecl)) {
- IAType keyType = evalFactoryAndType.third;
- ISerializerDeserializer keySerde = metadata.getFormat()
- .getSerdeProvider().getSerializerDeserializer(keyType);
- recordFields[i] = keySerde;
- // if (!isSecondary) {
- comparatorFactories[i] = AqlBinaryComparatorFactoryProvider.INSTANCE
- .getBinaryComparatorFactory(keyType, true);
- // }
- typeTraits[i] = AqlTypeTraitProvider.INSTANCE.getTypeTrait(keyType);
- ++i;
- }
+ for (Triple<IEvaluatorFactory, ScalarFunctionCallExpression, IAType> evalFactoryAndType : DatasetUtils
+ .getPartitioningFunctions(ddecl)) {
+ IAType keyType = evalFactoryAndType.third;
+ ISerializerDeserializer keySerde = metadata.getFormat().getSerdeProvider()
+ .getSerializerDeserializer(keyType);
+ recordFields[i] = keySerde;
+ // if (!isSecondary) {
+ comparatorFactories[i] = AqlBinaryComparatorFactoryProvider.INSTANCE.getBinaryComparatorFactory(keyType,
+ true);
+ // }
+ typeTraits[i] = AqlTypeTraitProvider.INSTANCE.getTypeTrait(keyType);
+ ++i;
+ }
- IAsterixApplicationContextInfo appContext = (IAsterixApplicationContextInfo) context
- .getAppContext();
- RecordDescriptor recDesc = new RecordDescriptor(recordFields);
+ IAsterixApplicationContextInfo appContext = (IAsterixApplicationContextInfo) context.getAppContext();
+ RecordDescriptor recDesc = new RecordDescriptor(recordFields);
- Pair<IFileSplitProvider, AlgebricksPartitionConstraint> spPc;
- try {
- spPc = metadata
- .splitProviderAndPartitionConstraintsForInternalOrFeedDataset(
- datasetName, indexName);
- } catch (Exception e) {
- throw new AlgebricksException(e);
- }
+ Pair<IFileSplitProvider, AlgebricksPartitionConstraint> spPc;
+ try {
+ spPc = metadata.splitProviderAndPartitionConstraintsForInternalOrFeedDataset(datasetName, indexName);
+ } catch (Exception e) {
+ throw new AlgebricksException(e);
+ }
- BTreeSearchOperatorDescriptor btreeSearchOp = new BTreeSearchOperatorDescriptor(
- jobSpec, recDesc, appContext.getStorageManagerInterface(),
- appContext.getIndexRegistryProvider(), spPc.first,
- typeTraits,
- comparatorFactories, lowKeyFields, highKeyFields,
- lowKeyInclusive, highKeyInclusive,
- new BTreeDataflowHelperFactory(), NoOpOperationCallbackProvider.INSTANCE);
- return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(
- btreeSearchOp, spPc.second);
- }
+ BTreeSearchOperatorDescriptor btreeSearchOp = new BTreeSearchOperatorDescriptor(jobSpec, recDesc,
+ appContext.getStorageManagerInterface(), appContext.getIndexRegistryProvider(), spPc.first, typeTraits,
+ comparatorFactories, lowKeyFields, highKeyFields, lowKeyInclusive, highKeyInclusive,
+ new BTreeDataflowHelperFactory(), NoOpOperationCallbackProvider.INSTANCE);
+ return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(btreeSearchOp, spPc.second);
+ }
- @SuppressWarnings("rawtypes")
- public static Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> buildRtreeRuntime(
- AqlCompiledMetadataDeclarations metadata, JobGenContext context,
- JobSpecification jobSpec, String datasetName,
- AqlCompiledDatasetDecl ddecl, String indexName, int[] keyFields)
- throws AlgebricksException {
- String itemTypeName = ddecl.getItemTypeName();
- IAType itemType;
- try {
- itemType = metadata.findType(itemTypeName);
- } catch (Exception e) {
- throw new AlgebricksException(e);
- }
+ @SuppressWarnings("rawtypes")
+ public static Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> buildRtreeRuntime(
+ AqlCompiledMetadataDeclarations metadata, JobGenContext context, JobSpecification jobSpec,
+ String datasetName, AqlCompiledDatasetDecl ddecl, String indexName, int[] keyFields)
+ throws AlgebricksException {
+ String itemTypeName = ddecl.getItemTypeName();
+ IAType itemType;
+ try {
+ itemType = metadata.findType(itemTypeName);
+ } catch (Exception e) {
+ throw new AlgebricksException(e);
+ }
- boolean isSecondary = true;
- AqlCompiledIndexDecl primIdxDecl = DatasetUtils.getPrimaryIndex(ddecl);
- if (primIdxDecl != null) {
- isSecondary = !indexName.equals(primIdxDecl.getIndexName());
- }
+ boolean isSecondary = true;
+ AqlCompiledIndexDecl primIdxDecl = DatasetUtils.getPrimaryIndex(ddecl);
+ if (primIdxDecl != null) {
+ isSecondary = !indexName.equals(primIdxDecl.getIndexName());
+ }
- int numPrimaryKeys = DatasetUtils.getPartitioningFunctions(ddecl)
- .size();
- ISerializerDeserializer[] recordFields;
- IBinaryComparatorFactory[] comparatorFactories;
- ITypeTraits[] typeTraits;
- IPrimitiveValueProviderFactory[] valueProviderFactories;
- int numSecondaryKeys = 0;
- int numNestedSecondaryKeyFields = 0;
- int i = 0;
- if (isSecondary) {
- AqlCompiledIndexDecl cid = DatasetUtils.findSecondaryIndexByName(
- ddecl, indexName);
- if (cid == null) {
- throw new AlgebricksException(
- "Code generation error: no index " + indexName
- + " for dataset " + datasetName);
- }
- List<String> secondaryKeyFields = cid.getFieldExprs();
- numSecondaryKeys = secondaryKeyFields.size();
+ int numPrimaryKeys = DatasetUtils.getPartitioningFunctions(ddecl).size();
+ ISerializerDeserializer[] recordFields;
+ IBinaryComparatorFactory[] comparatorFactories;
+ ITypeTraits[] typeTraits;
+ IPrimitiveValueProviderFactory[] valueProviderFactories;
+ int numSecondaryKeys = 0;
+ int numNestedSecondaryKeyFields = 0;
+ int i = 0;
+ if (isSecondary) {
+ AqlCompiledIndexDecl cid = DatasetUtils.findSecondaryIndexByName(ddecl, indexName);
+ if (cid == null) {
+ throw new AlgebricksException("Code generation error: no index " + indexName + " for dataset "
+ + datasetName);
+ }
+ List<String> secondaryKeyFields = cid.getFieldExprs();
+ numSecondaryKeys = secondaryKeyFields.size();
- if (numSecondaryKeys != 1) {
- throw new AlgebricksException(
- "Cannot use "
- + numSecondaryKeys
- + " fields as a key for the R-tree index. There can be only one field as a key for the R-tree index.");
- }
+ if (numSecondaryKeys != 1) {
+ throw new AlgebricksException(
+ "Cannot use "
+ + numSecondaryKeys
+ + " fields as a key for the R-tree index. There can be only one field as a key for the R-tree index.");
+ }
- if (itemType.getTypeTag() != ATypeTag.RECORD) {
- throw new AlgebricksException(
- "Only record types can be indexed.");
- }
- ARecordType recType = (ARecordType) itemType;
+ if (itemType.getTypeTag() != ATypeTag.RECORD) {
+ throw new AlgebricksException("Only record types can be indexed.");
+ }
+ ARecordType recType = (ARecordType) itemType;
- Pair<IAType, Boolean> keyTypePair = AqlCompiledIndexDecl.getNonNullableKeyFieldType(secondaryKeyFields.get(0), recType);
- IAType keyType = keyTypePair.first;
- if (keyType == null) {
- throw new AlgebricksException("Could not find field "
- + secondaryKeyFields.get(0) + " in the schema.");
- }
+ Pair<IAType, Boolean> keyTypePair = AqlCompiledIndexDecl.getNonNullableKeyFieldType(
+ secondaryKeyFields.get(0), recType);
+ IAType keyType = keyTypePair.first;
+ if (keyType == null) {
+ throw new AlgebricksException("Could not find field " + secondaryKeyFields.get(0) + " in the schema.");
+ }
- int dimension = NonTaggedFormatUtil.getNumDimensions(keyType
- .getTypeTag());
- numNestedSecondaryKeyFields = dimension * 2;
+ int dimension = NonTaggedFormatUtil.getNumDimensions(keyType.getTypeTag());
+ numNestedSecondaryKeyFields = dimension * 2;
- int numFields = numNestedSecondaryKeyFields + numPrimaryKeys;
- recordFields = new ISerializerDeserializer[numFields];
- typeTraits = new ITypeTraits[numFields];
- comparatorFactories = new IBinaryComparatorFactory[numNestedSecondaryKeyFields];
- valueProviderFactories = new IPrimitiveValueProviderFactory[numNestedSecondaryKeyFields];
+ int numFields = numNestedSecondaryKeyFields + numPrimaryKeys;
+ recordFields = new ISerializerDeserializer[numFields];
+ typeTraits = new ITypeTraits[numFields];
+ comparatorFactories = new IBinaryComparatorFactory[numNestedSecondaryKeyFields];
+ valueProviderFactories = new IPrimitiveValueProviderFactory[numNestedSecondaryKeyFields];
- IAType nestedKeyType = NonTaggedFormatUtil
- .getNestedSpatialType(keyType.getTypeTag());
- for (i = 0; i < numNestedSecondaryKeyFields; i++) {
- ISerializerDeserializer keySerde = AqlSerializerDeserializerProvider.INSTANCE
- .getSerializerDeserializer(nestedKeyType);
- recordFields[i] = keySerde;
- comparatorFactories[i] = AqlBinaryComparatorFactoryProvider.INSTANCE
- .getBinaryComparatorFactory(nestedKeyType,
- true);
- typeTraits[i] = AqlTypeTraitProvider.INSTANCE
- .getTypeTrait(nestedKeyType);
- valueProviderFactories[i] = AqlPrimitiveValueProviderFactory.INSTANCE;
- }
- } else {
- throw new AlgebricksException(
- "R-tree can only be used as a secondary index");
- }
+ IAType nestedKeyType = NonTaggedFormatUtil.getNestedSpatialType(keyType.getTypeTag());
+ for (i = 0; i < numNestedSecondaryKeyFields; i++) {
+ ISerializerDeserializer keySerde = AqlSerializerDeserializerProvider.INSTANCE
+ .getSerializerDeserializer(nestedKeyType);
+ recordFields[i] = keySerde;
+ comparatorFactories[i] = AqlBinaryComparatorFactoryProvider.INSTANCE.getBinaryComparatorFactory(
+ nestedKeyType, true);
+ typeTraits[i] = AqlTypeTraitProvider.INSTANCE.getTypeTrait(nestedKeyType);
+ valueProviderFactories[i] = AqlPrimitiveValueProviderFactory.INSTANCE;
+ }
+ } else {
+ throw new AlgebricksException("R-tree can only be used as a secondary index");
+ }
- for (Triple<IEvaluatorFactory, ScalarFunctionCallExpression, IAType> evalFactoryAndType : DatasetUtils
- .getPartitioningFunctions(ddecl)) {
- IAType keyType = evalFactoryAndType.third;
- ISerializerDeserializer keySerde = AqlSerializerDeserializerProvider.INSTANCE
- .getSerializerDeserializer(keyType);
- recordFields[i] = keySerde;
- typeTraits[i] = AqlTypeTraitProvider.INSTANCE.getTypeTrait(keyType);
- ++i;
- }
+ for (Triple<IEvaluatorFactory, ScalarFunctionCallExpression, IAType> evalFactoryAndType : DatasetUtils
+ .getPartitioningFunctions(ddecl)) {
+ IAType keyType = evalFactoryAndType.third;
+ ISerializerDeserializer keySerde = AqlSerializerDeserializerProvider.INSTANCE
+ .getSerializerDeserializer(keyType);
+ recordFields[i] = keySerde;
+ typeTraits[i] = AqlTypeTraitProvider.INSTANCE.getTypeTrait(keyType);
+ ++i;
+ }
- IAsterixApplicationContextInfo appContext = (IAsterixApplicationContextInfo) context
- .getAppContext();
- RecordDescriptor recDesc = new RecordDescriptor(recordFields);
+ IAsterixApplicationContextInfo appContext = (IAsterixApplicationContextInfo) context.getAppContext();
+ RecordDescriptor recDesc = new RecordDescriptor(recordFields);
+
+ Pair<IFileSplitProvider, AlgebricksPartitionConstraint> spPc;
+ try {
+ spPc = metadata.splitProviderAndPartitionConstraintsForInternalOrFeedDataset(datasetName, indexName);
+ } catch (Exception e) {
+ throw new AlgebricksException(e);
+ }
- Pair<IFileSplitProvider, AlgebricksPartitionConstraint> spPc;
- try {
- spPc = metadata
- .splitProviderAndPartitionConstraintsForInternalOrFeedDataset(
- datasetName, indexName);
- } catch (Exception e) {
- throw new AlgebricksException(e);
- }
+ RTreeSearchOperatorDescriptor rtreeSearchOp = new RTreeSearchOperatorDescriptor(jobSpec, recDesc,
+ appContext.getStorageManagerInterface(), appContext.getIndexRegistryProvider(), spPc.first, typeTraits,
+ comparatorFactories, keyFields, new RTreeDataflowHelperFactory(valueProviderFactories),
+ NoOpOperationCallbackProvider.INSTANCE);
- RTreeSearchOperatorDescriptor rtreeSearchOp = new RTreeSearchOperatorDescriptor(
- jobSpec, recDesc, appContext.getStorageManagerInterface(),
- appContext.getIndexRegistryProvider(), spPc.first,
- typeTraits,
- comparatorFactories, keyFields,
- new RTreeDataflowHelperFactory(valueProviderFactories), NoOpOperationCallbackProvider.INSTANCE);
+ return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(rtreeSearchOp, spPc.second);
+ }
- return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(
- rtreeSearchOp, spPc.second);
- }
+ @Override
+ public Pair<IPushRuntimeFactory, AlgebricksPartitionConstraint> getWriteFileRuntime(IDataSink sink,
+ int[] printColumns, IPrinterFactory[] printerFactories, RecordDescriptor inputDesc) {
+ FileSplitDataSink fsds = (FileSplitDataSink) sink;
+ FileSplitSinkId fssi = (FileSplitSinkId) fsds.getId();
+ FileSplit fs = fssi.getFileSplit();
+ File outFile = fs.getLocalFile().getFile();
+ String nodeId = fs.getNodeName();
- @Override
- public Pair<IPushRuntimeFactory, AlgebricksPartitionConstraint> getWriteFileRuntime(
- IDataSink sink, int[] printColumns,
- IPrinterFactory[] printerFactories, RecordDescriptor inputDesc) {
- FileSplitDataSink fsds = (FileSplitDataSink) sink;
- FileSplitSinkId fssi = (FileSplitSinkId) fsds.getId();
- FileSplit fs = fssi.getFileSplit();
- File outFile = fs.getLocalFile().getFile();
- String nodeId = fs.getNodeName();
+ SinkWriterRuntimeFactory runtime = new SinkWriterRuntimeFactory(printColumns, printerFactories, outFile,
+ metadata.getWriterFactory(), inputDesc);
+ AlgebricksPartitionConstraint apc = new AlgebricksAbsolutePartitionConstraint(new String[] { nodeId });
+ return new Pair<IPushRuntimeFactory, AlgebricksPartitionConstraint>(runtime, apc);
+ }
- SinkWriterRuntimeFactory runtime = new SinkWriterRuntimeFactory(
- printColumns, printerFactories, outFile, metadata
- .getWriterFactory(), inputDesc);
- AlgebricksPartitionConstraint apc = new AlgebricksAbsolutePartitionConstraint(
- new String[] { nodeId });
- return new Pair<IPushRuntimeFactory, AlgebricksPartitionConstraint>(
- runtime, apc);
- }
+ @Override
+ public IDataSourceIndex<String, AqlSourceId> findDataSourceIndex(String indexId, AqlSourceId dataSourceId)
+ throws AlgebricksException {
+ AqlDataSource ads = findDataSource(dataSourceId);
+ AqlCompiledDatasetDecl adecl = ads.getCompiledDatasetDecl();
+ if (adecl.getDatasetType() == DatasetType.EXTERNAL) {
+ throw new AlgebricksException("No index for external dataset " + dataSourceId);
+ }
- @Override
- public IDataSourceIndex<String, AqlSourceId> findDataSourceIndex(
- String indexId, AqlSourceId dataSourceId)
- throws AlgebricksException {
- AqlDataSource ads = findDataSource(dataSourceId);
- AqlCompiledDatasetDecl adecl = ads.getCompiledDatasetDecl();
- if (adecl.getDatasetType() == DatasetType.EXTERNAL) {
- throw new AlgebricksException("No index for external dataset "
- + dataSourceId);
- }
+ String idxName = (String) indexId;
+ AqlCompiledIndexDecl acid = DatasetUtils.findSecondaryIndexByName(adecl, idxName);
+ AqlSourceId asid = (AqlSourceId) dataSourceId;
+ if (acid != null) {
+ return new AqlIndex(acid, metadata, asid.getDatasetName());
+ } else {
+ AqlCompiledIndexDecl primIdx = DatasetUtils.getPrimaryIndex(adecl);
+ if (primIdx.getIndexName().equals(indexId)) {
+ return new AqlIndex(primIdx, metadata, asid.getDatasetName());
+ } else {
+ return null;
+ }
+ }
+ }
- String idxName = (String) indexId;
- AqlCompiledIndexDecl acid = DatasetUtils.findSecondaryIndexByName(
- adecl, idxName);
- AqlSourceId asid = (AqlSourceId) dataSourceId;
- if (acid != null) {
- return new AqlIndex(acid, metadata, asid.getDatasetName());
- } else {
- AqlCompiledIndexDecl primIdx = DatasetUtils.getPrimaryIndex(adecl);
- if (primIdx.getIndexName().equals(indexId)) {
- return new AqlIndex(primIdx, metadata, asid.getDatasetName());
- } else {
- return null;
- }
- }
- }
+ public static AqlDataSource lookupSourceInMetadata(AqlCompiledMetadataDeclarations metadata, AqlSourceId aqlId)
+ throws AlgebricksException {
+ if (!aqlId.getDataverseName().equals(metadata.getDataverseName())) {
+ return null;
+ }
+ AqlCompiledDatasetDecl acdd = metadata.findDataset(aqlId.getDatasetName());
+ if (acdd == null) {
+ throw new AlgebricksException("Datasource with id " + aqlId + " was not found.");
+ }
+ String tName = acdd.getItemTypeName();
+ IAType itemType;
+ try {
+ itemType = metadata.findType(tName);
+ } catch (Exception e) {
+ throw new AlgebricksException(e);
+ }
+ return new AqlDataSource(aqlId, acdd, itemType);
+ }
- public static AqlDataSource lookupSourceInMetadata(
- AqlCompiledMetadataDeclarations metadata, AqlSourceId aqlId)
- throws AlgebricksException {
- if (!aqlId.getDataverseName().equals(metadata.getDataverseName())) {
- return null;
- }
- AqlCompiledDatasetDecl acdd = metadata.findDataset(aqlId
- .getDatasetName());
- if (acdd == null) {
- throw new AlgebricksException("Datasource with id " + aqlId
- + " was not found.");
- }
- String tName = acdd.getItemTypeName();
- IAType itemType;
- try {
- itemType = metadata.findType(tName);
- } catch (Exception e) {
- throw new AlgebricksException(e);
- }
- return new AqlDataSource(aqlId, acdd, itemType);
- }
+ @Override
+ public boolean scannerOperatorIsLeaf(IDataSource<AqlSourceId> dataSource) {
+ AqlSourceId asid = dataSource.getId();
+ String datasetName = asid.getDatasetName();
+ AqlCompiledDatasetDecl adecl = metadata.findDataset(datasetName);
+ if (adecl == null) {
+ throw new IllegalArgumentException("Unknown dataset " + datasetName);
+ }
+ return adecl.getDatasetType() == DatasetType.EXTERNAL;
+ }
- @Override
- public boolean scannerOperatorIsLeaf(IDataSource<AqlSourceId> dataSource) {
- AqlSourceId asid = dataSource.getId();
- String datasetName = asid.getDatasetName();
- AqlCompiledDatasetDecl adecl = metadata.findDataset(datasetName);
- if (adecl == null) {
- throw new IllegalArgumentException("Unknown dataset " + datasetName);
- }
- return adecl.getDatasetType() == DatasetType.EXTERNAL;
- }
+ @Override
+ public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getWriteResultRuntime(
+ IDataSource<AqlSourceId> dataSource, IOperatorSchema propagatedSchema, List<LogicalVariable> keys,
+ LogicalVariable payload, JobGenContext context, JobSpecification spec) throws AlgebricksException {
+ String datasetName = dataSource.getId().getDatasetName();
+ int numKeys = keys.size();
+ // move key fields to front
+ int[] fieldPermutation = new int[numKeys + 1];
+ // System.arraycopy(keys, 0, fieldPermutation, 0, numKeys);
+ int i = 0;
+ for (LogicalVariable varKey : keys) {
+ int idx = propagatedSchema.findVariable(varKey);
+ fieldPermutation[i] = idx;
+ i++;
+ }
+ fieldPermutation[numKeys] = propagatedSchema.findVariable(payload);
- @Override
- public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getWriteResultRuntime(
- IDataSource<AqlSourceId> dataSource,
- IOperatorSchema propagatedSchema, List<LogicalVariable> keys,
- LogicalVariable payload, JobGenContext context,
- JobSpecification spec) throws AlgebricksException {
- String datasetName = dataSource.getId().getDatasetName();
- int numKeys = keys.size();
- // move key fields to front
- int[] fieldPermutation = new int[numKeys + 1];
- // System.arraycopy(keys, 0, fieldPermutation, 0, numKeys);
- int i = 0;
- for (LogicalVariable varKey : keys) {
- int idx = propagatedSchema.findVariable(varKey);
- fieldPermutation[i] = idx;
- i++;
- }
- fieldPermutation[numKeys] = propagatedSchema.findVariable(payload);
+ AqlCompiledDatasetDecl compiledDatasetDecl = metadata.findDataset(datasetName);
+ if (compiledDatasetDecl == null) {
+ throw new AlgebricksException("Unknown dataset " + datasetName);
+ }
+ String indexName = DatasetUtils.getPrimaryIndex(compiledDatasetDecl).getIndexName();
- AqlCompiledDatasetDecl compiledDatasetDecl = metadata
- .findDataset(datasetName);
- if (compiledDatasetDecl == null) {
- throw new AlgebricksException("Unknown dataset " + datasetName);
- }
- String indexName = DatasetUtils.getPrimaryIndex(compiledDatasetDecl)
- .getIndexName();
+ ITypeTraits[] typeTraits = DatasetUtils.computeTupleTypeTraits(compiledDatasetDecl, metadata);
- ITypeTraits[] typeTraits = DatasetUtils.computeTupleTypeTraits(
- compiledDatasetDecl, metadata);
+ IAsterixApplicationContextInfo appContext = (IAsterixApplicationContextInfo) context.getAppContext();
- IAsterixApplicationContextInfo appContext = (IAsterixApplicationContextInfo) context
- .getAppContext();
+ IBinaryComparatorFactory[] comparatorFactories = DatasetUtils.computeKeysBinaryComparatorFactories(
+ compiledDatasetDecl, context.getBinaryComparatorFactoryProvider());
- IBinaryComparatorFactory[] comparatorFactories = DatasetUtils
- .computeKeysBinaryComparatorFactories(compiledDatasetDecl,
- context.getBinaryComparatorFactoryProvider());
+ Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint;
+ try {
+ splitsAndConstraint = metadata.splitProviderAndPartitionConstraintsForInternalOrFeedDataset(datasetName,
+ indexName);
+ } catch (Exception e) {
+ throw new AlgebricksException(e);
+ }
- Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint;
- try {
- splitsAndConstraint = metadata
- .splitProviderAndPartitionConstraintsForInternalOrFeedDataset(
- datasetName, indexName);
- } catch (Exception e) {
- throw new AlgebricksException(e);
- }
+ TreeIndexBulkLoadOperatorDescriptor btreeBulkLoad = new TreeIndexBulkLoadOperatorDescriptor(spec,
+ appContext.getStorageManagerInterface(), appContext.getIndexRegistryProvider(),
+ splitsAndConstraint.first, typeTraits, comparatorFactories, fieldPermutation,
+ GlobalConfig.DEFAULT_BTREE_FILL_FACTOR, new BTreeDataflowHelperFactory(),
+ NoOpOperationCallbackProvider.INSTANCE);
+ return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(btreeBulkLoad, splitsAndConstraint.second);
+ }
- TreeIndexBulkLoadOperatorDescriptor btreeBulkLoad = new TreeIndexBulkLoadOperatorDescriptor(
- spec, appContext.getStorageManagerInterface(),
- appContext.getIndexRegistryProvider(),
- splitsAndConstraint.first, typeTraits, comparatorFactories,
- fieldPermutation, GlobalConfig.DEFAULT_BTREE_FILL_FACTOR,
- new BTreeDataflowHelperFactory(),
- NoOpOperationCallbackProvider.INSTANCE);
- return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(
- btreeBulkLoad, splitsAndConstraint.second);
- }
+ @Override
+ public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getInsertRuntime(
+ IDataSource<AqlSourceId> dataSource, IOperatorSchema propagatedSchema, List<LogicalVariable> keys,
+ LogicalVariable payload, RecordDescriptor recordDesc, JobGenContext context, JobSpecification spec)
+ throws AlgebricksException {
+ String datasetName = dataSource.getId().getDatasetName();
+ int numKeys = keys.size();
+ // move key fields to front
+ int[] fieldPermutation = new int[numKeys + 1];
+ // System.arraycopy(keys, 0, fieldPermutation, 0, numKeys);
+ int i = 0;
+ for (LogicalVariable varKey : keys) {
+ int idx = propagatedSchema.findVariable(varKey);
+ fieldPermutation[i] = idx;
+ i++;
+ }
+ fieldPermutation[numKeys] = propagatedSchema.findVariable(payload);
- @Override
- public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getInsertRuntime(
- IDataSource<AqlSourceId> dataSource,
- IOperatorSchema propagatedSchema, List<LogicalVariable> keys,
- LogicalVariable payload, RecordDescriptor recordDesc,
- JobGenContext context, JobSpecification spec)
- throws AlgebricksException {
- String datasetName = dataSource.getId().getDatasetName();
- int numKeys = keys.size();
- // move key fields to front
- int[] fieldPermutation = new int[numKeys + 1];
- // System.arraycopy(keys, 0, fieldPermutation, 0, numKeys);
- int i = 0;
- for (LogicalVariable varKey : keys) {
- int idx = propagatedSchema.findVariable(varKey);
- fieldPermutation[i] = idx;
- i++;
- }
- fieldPermutation[numKeys] = propagatedSchema.findVariable(payload);
+ AqlCompiledDatasetDecl compiledDatasetDecl = metadata.findDataset(datasetName);
+ if (compiledDatasetDecl == null) {
+ throw new AlgebricksException("Unknown dataset " + datasetName);
+ }
+ String indexName = DatasetUtils.getPrimaryIndex(compiledDatasetDecl).getIndexName();
- AqlCompiledDatasetDecl compiledDatasetDecl = metadata
- .findDataset(datasetName);
- if (compiledDatasetDecl == null) {
- throw new AlgebricksException("Unknown dataset " + datasetName);
- }
- String indexName = DatasetUtils.getPrimaryIndex(compiledDatasetDecl)
- .getIndexName();
+ ITypeTraits[] typeTraits = DatasetUtils.computeTupleTypeTraits(compiledDatasetDecl, metadata);
- ITypeTraits[] typeTraits = DatasetUtils.computeTupleTypeTraits(
- compiledDatasetDecl, metadata);
+ IAsterixApplicationContextInfo appContext = (IAsterixApplicationContextInfo) context.getAppContext();
- IAsterixApplicationContextInfo appContext = (IAsterixApplicationContextInfo) context
- .getAppContext();
+ IBinaryComparatorFactory[] comparatorFactories = DatasetUtils.computeKeysBinaryComparatorFactories(
+ compiledDatasetDecl, context.getBinaryComparatorFactoryProvider());
- IBinaryComparatorFactory[] comparatorFactories = DatasetUtils
- .computeKeysBinaryComparatorFactories(compiledDatasetDecl,
- context.getBinaryComparatorFactoryProvider());
+ Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint;
+ try {
+ splitsAndConstraint = metadata.splitProviderAndPartitionConstraintsForInternalOrFeedDataset(datasetName,
+ indexName);
+ } catch (Exception e) {
+ throw new AlgebricksException(e);
+ }
- Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint;
- try {
- splitsAndConstraint = metadata
- .splitProviderAndPartitionConstraintsForInternalOrFeedDataset(
- datasetName, indexName);
- } catch (Exception e) {
- throw new AlgebricksException(e);
- }
+ TreeIndexInsertUpdateDeleteOperatorDescriptor btreeBulkLoad = new TreeIndexInsertUpdateDeleteOperatorDescriptor(
+ spec, recordDesc, appContext.getStorageManagerInterface(), appContext.getIndexRegistryProvider(),
+ splitsAndConstraint.first, typeTraits, comparatorFactories, fieldPermutation, IndexOp.INSERT,
+ new BTreeDataflowHelperFactory(), null, NoOpOperationCallbackProvider.INSTANCE, txnId);
+ return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(btreeBulkLoad, splitsAndConstraint.second);
+ }
- TreeIndexInsertUpdateDeleteOperatorDescriptor btreeBulkLoad = new TreeIndexInsertUpdateDeleteOperatorDescriptor(
- spec, recordDesc, appContext.getStorageManagerInterface(),
- appContext.getIndexRegistryProvider(),
- splitsAndConstraint.first, typeTraits, comparatorFactories,
- fieldPermutation, IndexOp.INSERT,
- new BTreeDataflowHelperFactory(),
- null, NoOpOperationCallbackProvider.INSTANCE, txnId);
- return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(
- btreeBulkLoad, splitsAndConstraint.second);
- }
+ @Override
+ public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getDeleteRuntime(
+ IDataSource<AqlSourceId> dataSource, IOperatorSchema propagatedSchema, List<LogicalVariable> keys,
+ LogicalVariable payload, RecordDescriptor recordDesc, JobGenContext context, JobSpecification spec)
+ throws AlgebricksException {
+ String datasetName = dataSource.getId().getDatasetName();
+ int numKeys = keys.size();
+ // move key fields to front
+ int[] fieldPermutation = new int[numKeys + 1];
+ // System.arraycopy(keys, 0, fieldPermutation, 0, numKeys);
+ int i = 0;
+ for (LogicalVariable varKey : keys) {
+ int idx = propagatedSchema.findVariable(varKey);
+ fieldPermutation[i] = idx;
+ i++;
+ }
+ fieldPermutation[numKeys] = propagatedSchema.findVariable(payload);
- @Override
- public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getDeleteRuntime(
- IDataSource<AqlSourceId> dataSource,
- IOperatorSchema propagatedSchema, List<LogicalVariable> keys,
- LogicalVariable payload, RecordDescriptor recordDesc,
- JobGenContext context, JobSpecification spec)
- throws AlgebricksException {
- String datasetName = dataSource.getId().getDatasetName();
- int numKeys = keys.size();
- // move key fields to front
- int[] fieldPermutation = new int[numKeys + 1];
- // System.arraycopy(keys, 0, fieldPermutation, 0, numKeys);
- int i = 0;
- for (LogicalVariable varKey : keys) {
- int idx = propagatedSchema.findVariable(varKey);
- fieldPermutation[i] = idx;
- i++;
- }
- fieldPermutation[numKeys] = propagatedSchema.findVariable(payload);
+ AqlCompiledDatasetDecl compiledDatasetDecl = metadata.findDataset(datasetName);
+ if (compiledDatasetDecl == null) {
+ throw new AlgebricksException("Unknown dataset " + datasetName);
+ }
+ String indexName = DatasetUtils.getPrimaryIndex(compiledDatasetDecl).getIndexName();
- AqlCompiledDatasetDecl compiledDatasetDecl = metadata
- .findDataset(datasetName);
- if (compiledDatasetDecl == null) {
- throw new AlgebricksException("Unknown dataset " + datasetName);
- }
- String indexName = DatasetUtils.getPrimaryIndex(compiledDatasetDecl)
- .getIndexName();
+ ITypeTraits[] typeTraits = DatasetUtils.computeTupleTypeTraits(compiledDatasetDecl, metadata);
- ITypeTraits[] typeTraits = DatasetUtils.computeTupleTypeTraits(
- compiledDatasetDecl, metadata);
+ IAsterixApplicationContextInfo appContext = (IAsterixApplicationContextInfo) context.getAppContext();
- IAsterixApplicationContextInfo appContext = (IAsterixApplicationContextInfo) context
- .getAppContext();
+ IBinaryComparatorFactory[] comparatorFactories = DatasetUtils.computeKeysBinaryComparatorFactories(
+ compiledDatasetDecl, context.getBinaryComparatorFactoryProvider());
- IBinaryComparatorFactory[] comparatorFactories = DatasetUtils
- .computeKeysBinaryComparatorFactories(compiledDatasetDecl,
- context.getBinaryComparatorFactoryProvider());
+ Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint;
+ try {
+ splitsAndConstraint = metadata.splitProviderAndPartitionConstraintsForInternalOrFeedDataset(datasetName,
+ indexName);
+ } catch (Exception e) {
+ throw new AlgebricksException(e);
+ }
- Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint;
- try {
- splitsAndConstraint = metadata
- .splitProviderAndPartitionConstraintsForInternalOrFeedDataset(
- datasetName, indexName);
- } catch (Exception e) {
- throw new AlgebricksException(e);
- }
+ TreeIndexInsertUpdateDeleteOperatorDescriptor btreeBulkLoad = new TreeIndexInsertUpdateDeleteOperatorDescriptor(
+ spec, recordDesc, appContext.getStorageManagerInterface(), appContext.getIndexRegistryProvider(),
+ splitsAndConstraint.first, typeTraits, comparatorFactories, fieldPermutation, IndexOp.DELETE,
+ new BTreeDataflowHelperFactory(), null, NoOpOperationCallbackProvider.INSTANCE, txnId);
+ return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(btreeBulkLoad, splitsAndConstraint.second);
+ }
- TreeIndexInsertUpdateDeleteOperatorDescriptor btreeBulkLoad = new TreeIndexInsertUpdateDeleteOperatorDescriptor(
- spec, recordDesc, appContext.getStorageManagerInterface(),
- appContext.getIndexRegistryProvider(),
- splitsAndConstraint.first, typeTraits, comparatorFactories,
- fieldPermutation, IndexOp.DELETE,
- new BTreeDataflowHelperFactory(),
- null, NoOpOperationCallbackProvider.INSTANCE, txnId);
- return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(
- btreeBulkLoad, splitsAndConstraint.second);
- }
+ @Override
+ public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getIndexInsertRuntime(
+ IDataSourceIndex<String, AqlSourceId> dataSourceIndex, IOperatorSchema propagatedSchema,
+ IOperatorSchema[] inputSchemas, IVariableTypeEnvironment typeEnv, List<LogicalVariable> primaryKeys,
+ List<LogicalVariable> secondaryKeys, ILogicalExpression filterExpr, RecordDescriptor recordDesc,
+ JobGenContext context, JobSpecification spec) throws AlgebricksException {
+ String indexName = dataSourceIndex.getId();
+ String datasetName = dataSourceIndex.getDataSource().getId().getDatasetName();
+ AqlCompiledDatasetDecl compiledDatasetDecl = metadata.findDataset(datasetName);
+ if (compiledDatasetDecl == null) {
+ throw new AlgebricksException("Unknown dataset " + datasetName);
+ }
+ AqlCompiledIndexDecl cid = DatasetUtils.findSecondaryIndexByName(compiledDatasetDecl, indexName);
+ AsterixTupleFilterFactory filterFactory = createTupleFilterFactory(inputSchemas, typeEnv, filterExpr, context);
+ if (cid.getKind() == IndexKind.BTREE) {
+ return getBTreeDmlRuntime(datasetName, indexName, propagatedSchema, primaryKeys, secondaryKeys,
+ filterFactory, recordDesc, context, spec, IndexOp.INSERT);
+ } else {
+ return getRTreeDmlRuntime(datasetName, indexName, propagatedSchema, primaryKeys, secondaryKeys,
+ filterFactory, recordDesc, context, spec, IndexOp.INSERT);
+ }
+ }
- @Override
- public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getIndexInsertRuntime(
- IDataSourceIndex<String, AqlSourceId> dataSourceIndex,
- IOperatorSchema propagatedSchema, IOperatorSchema[] inputSchemas,
- IVariableTypeEnvironment typeEnv,
- List<LogicalVariable> primaryKeys,
- List<LogicalVariable> secondaryKeys, ILogicalExpression filterExpr,
- RecordDescriptor recordDesc, JobGenContext context,
- JobSpecification spec) throws AlgebricksException {
- String indexName = dataSourceIndex.getId();
- String datasetName = dataSourceIndex.getDataSource().getId()
- .getDatasetName();
- AqlCompiledDatasetDecl compiledDatasetDecl = metadata
- .findDataset(datasetName);
- if (compiledDatasetDecl == null) {
- throw new AlgebricksException("Unknown dataset " + datasetName);
- }
- AqlCompiledIndexDecl cid = DatasetUtils.findSecondaryIndexByName(
- compiledDatasetDecl, indexName);
- AsterixTupleFilterFactory filterFactory = createTupleFilterFactory(
- inputSchemas, typeEnv, filterExpr, context);
- if (cid.getKind() == IndexKind.BTREE) {
- return getBTreeDmlRuntime(datasetName, indexName, propagatedSchema,
- primaryKeys, secondaryKeys, filterFactory, recordDesc, context, spec,
- IndexOp.INSERT);
- } else {
- return getRTreeDmlRuntime(datasetName, indexName, propagatedSchema,
- primaryKeys, secondaryKeys, filterFactory, recordDesc, context, spec,
- IndexOp.INSERT);
- }
- }
+ @Override
+ public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getIndexDeleteRuntime(
+ IDataSourceIndex<String, AqlSourceId> dataSourceIndex, IOperatorSchema propagatedSchema,
+ IOperatorSchema[] inputSchemas, IVariableTypeEnvironment typeEnv, List<LogicalVariable> primaryKeys,
+ List<LogicalVariable> secondaryKeys, ILogicalExpression filterExpr, RecordDescriptor recordDesc,
+ JobGenContext context, JobSpecification spec) throws AlgebricksException {
+ String indexName = dataSourceIndex.getId();
+ String datasetName = dataSourceIndex.getDataSource().getId().getDatasetName();
+ AqlCompiledDatasetDecl compiledDatasetDecl = metadata.findDataset(datasetName);
+ if (compiledDatasetDecl == null) {
+ throw new AlgebricksException("Unknown dataset " + datasetName);
+ }
+ AqlCompiledIndexDecl cid = DatasetUtils.findSecondaryIndexByName(compiledDatasetDecl, indexName);
+ AsterixTupleFilterFactory filterFactory = createTupleFilterFactory(inputSchemas, typeEnv, filterExpr, context);
+ if (cid.getKind() == IndexKind.BTREE) {
+ return getBTreeDmlRuntime(datasetName, indexName, propagatedSchema, primaryKeys, secondaryKeys,
+ filterFactory, recordDesc, context, spec, IndexOp.DELETE);
+ } else {
+ return getRTreeDmlRuntime(datasetName, indexName, propagatedSchema, primaryKeys, secondaryKeys,
+ filterFactory, recordDesc, context, spec, IndexOp.DELETE);
+ }
+ }
- @Override
- public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getIndexDeleteRuntime(
- IDataSourceIndex<String, AqlSourceId> dataSourceIndex,
- IOperatorSchema propagatedSchema, IOperatorSchema[] inputSchemas,
- IVariableTypeEnvironment typeEnv,
- List<LogicalVariable> primaryKeys,
- List<LogicalVariable> secondaryKeys, ILogicalExpression filterExpr,
- RecordDescriptor recordDesc, JobGenContext context,
- JobSpecification spec) throws AlgebricksException {
- String indexName = dataSourceIndex.getId();
- String datasetName = dataSourceIndex.getDataSource().getId()
- .getDatasetName();
- AqlCompiledDatasetDecl compiledDatasetDecl = metadata
- .findDataset(datasetName);
- if (compiledDatasetDecl == null) {
- throw new AlgebricksException("Unknown dataset " + datasetName);
- }
- AqlCompiledIndexDecl cid = DatasetUtils.findSecondaryIndexByName(
- compiledDatasetDecl, indexName);
- AsterixTupleFilterFactory filterFactory = createTupleFilterFactory(
- inputSchemas, typeEnv, filterExpr, context);
- if (cid.getKind() == IndexKind.BTREE) {
- return getBTreeDmlRuntime(datasetName, indexName, propagatedSchema,
- primaryKeys, secondaryKeys, filterFactory, recordDesc, context, spec,
- IndexOp.DELETE);
- } else {
- return getRTreeDmlRuntime(datasetName, indexName, propagatedSchema,
- primaryKeys, secondaryKeys, filterFactory, recordDesc, context, spec,
- IndexOp.DELETE);
- }
- }
+ private AsterixTupleFilterFactory createTupleFilterFactory(IOperatorSchema[] inputSchemas,
+ IVariableTypeEnvironment typeEnv, ILogicalExpression filterExpr, JobGenContext context)
+ throws AlgebricksException {
+ // No filtering condition.
+ if (filterExpr == null) {
+ return null;
+ }
+ ILogicalExpressionJobGen exprJobGen = context.getExpressionJobGen();
+ IEvaluatorFactory filterEvalFactory = exprJobGen.createEvaluatorFactory(filterExpr, typeEnv, inputSchemas,
+ context);
+ return new AsterixTupleFilterFactory(filterEvalFactory, context.getBinaryBooleanInspector());
+ }
- private AsterixTupleFilterFactory createTupleFilterFactory(
- IOperatorSchema[] inputSchemas, IVariableTypeEnvironment typeEnv,
- ILogicalExpression filterExpr, JobGenContext context)
- throws AlgebricksException {
- // No filtering condition.
- if (filterExpr == null) {
- return null;
- }
- ILogicalExpressionJobGen exprJobGen = context.getExpressionJobGen();
- IEvaluatorFactory filterEvalFactory = exprJobGen
- .createEvaluatorFactory(filterExpr, typeEnv, inputSchemas,
- context);
- return new AsterixTupleFilterFactory(filterEvalFactory,
- context.getBinaryBooleanInspector());
- }
-
- private Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getBTreeDmlRuntime(
- String datasetName, String indexName,
- IOperatorSchema propagatedSchema,
- List<LogicalVariable> primaryKeys,
- List<LogicalVariable> secondaryKeys,
- AsterixTupleFilterFactory filterFactory,
- RecordDescriptor recordDesc, JobGenContext context,
- JobSpecification spec, IndexOp indexOp) throws AlgebricksException {
- int numKeys = primaryKeys.size() + secondaryKeys.size();
- // generate field permutations
- int[] fieldPermutation = new int[numKeys];
- int i = 0;
- for (LogicalVariable varKey : secondaryKeys) {
- int idx = propagatedSchema.findVariable(varKey);
- fieldPermutation[i] = idx;
- i++;
- }
- for (LogicalVariable varKey : primaryKeys) {
- int idx = propagatedSchema.findVariable(varKey);
- fieldPermutation[i] = idx;
- i++;
- }
+ private Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getBTreeDmlRuntime(String datasetName,
+ String indexName, IOperatorSchema propagatedSchema, List<LogicalVariable> primaryKeys,
+ List<LogicalVariable> secondaryKeys, AsterixTupleFilterFactory filterFactory, RecordDescriptor recordDesc,
+ JobGenContext context, JobSpecification spec, IndexOp indexOp) throws AlgebricksException {
+ int numKeys = primaryKeys.size() + secondaryKeys.size();
+ // generate field permutations
+ int[] fieldPermutation = new int[numKeys];
+ int i = 0;
+ for (LogicalVariable varKey : secondaryKeys) {
+ int idx = propagatedSchema.findVariable(varKey);
+ fieldPermutation[i] = idx;
+ i++;
+ }
+ for (LogicalVariable varKey : primaryKeys) {
+ int idx = propagatedSchema.findVariable(varKey);
+ fieldPermutation[i] = idx;
+ i++;
+ }
- // dataset
- AqlCompiledDatasetDecl compiledDatasetDecl = metadata
- .findDataset(datasetName);
- if (compiledDatasetDecl == null) {
- throw new AlgebricksException("Unknown dataset " + datasetName);
- }
- String itemTypeName = compiledDatasetDecl.getItemTypeName();
- IAType itemType;
- try {
- itemType = metadata.findType(itemTypeName);
- } catch (Exception e) {
- throw new AlgebricksException(e);
- }
- if (itemType.getTypeTag() != ATypeTag.RECORD) {
- throw new AlgebricksException("Only record types can be indexed.");
- }
- ARecordType recType = (ARecordType) itemType;
+ // dataset
+ AqlCompiledDatasetDecl compiledDatasetDecl = metadata.findDataset(datasetName);
+ if (compiledDatasetDecl == null) {
+ throw new AlgebricksException("Unknown dataset " + datasetName);
+ }
+ String itemTypeName = compiledDatasetDecl.getItemTypeName();
+ IAType itemType;
+ try {
+ itemType = metadata.findType(itemTypeName);
+ } catch (Exception e) {
+ throw new AlgebricksException(e);
+ }
+ if (itemType.getTypeTag() != ATypeTag.RECORD) {
+ throw new AlgebricksException("Only record types can be indexed.");
+ }
+ ARecordType recType = (ARecordType) itemType;
- // index parameters
- AqlCompiledIndexDecl cid = DatasetUtils.findSecondaryIndexByName(
- compiledDatasetDecl, indexName);
- List<String> secondaryKeyExprs = cid.getFieldExprs();
- ITypeTraits[] typeTraits = new ITypeTraits[numKeys];
- IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[numKeys];
- for (i = 0; i < secondaryKeys.size(); ++i) {
- IAType keyType = AqlCompiledIndexDecl.keyFieldType(
- secondaryKeyExprs.get(i).toString(), recType);
- comparatorFactories[i] = AqlBinaryComparatorFactoryProvider.INSTANCE
- .getBinaryComparatorFactory(keyType, true);
- typeTraits[i] = AqlTypeTraitProvider.INSTANCE.getTypeTrait(keyType);
- }
- for (Triple<IEvaluatorFactory, ScalarFunctionCallExpression, IAType> evalFactoryAndType : DatasetUtils
- .getPartitioningFunctions(compiledDatasetDecl)) {
- IAType keyType = evalFactoryAndType.third;
- comparatorFactories[i] = AqlBinaryComparatorFactoryProvider.INSTANCE
- .getBinaryComparatorFactory(keyType, true);
- typeTraits[i] = AqlTypeTraitProvider.INSTANCE.getTypeTrait(keyType);
- ++i;
- }
+ // index parameters
+ AqlCompiledIndexDecl cid = DatasetUtils.findSecondaryIndexByName(compiledDatasetDecl, indexName);
+ List<String> secondaryKeyExprs = cid.getFieldExprs();
+ ITypeTraits[] typeTraits = new ITypeTraits[numKeys];
+ IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[numKeys];
+ for (i = 0; i < secondaryKeys.size(); ++i) {
+ IAType keyType = AqlCompiledIndexDecl.keyFieldType(secondaryKeyExprs.get(i).toString(), recType);
+ comparatorFactories[i] = AqlBinaryComparatorFactoryProvider.INSTANCE.getBinaryComparatorFactory(keyType,
+ true);
+ typeTraits[i] = AqlTypeTraitProvider.INSTANCE.getTypeTrait(keyType);
+ }
+ for (Triple<IEvaluatorFactory, ScalarFunctionCallExpression, IAType> evalFactoryAndType : DatasetUtils
+ .getPartitioningFunctions(compiledDatasetDecl)) {
+ IAType keyType = evalFactoryAndType.third;
+ comparatorFactories[i] = AqlBinaryComparatorFactoryProvider.INSTANCE.getBinaryComparatorFactory(keyType,
+ true);
+ typeTraits[i] = AqlTypeTraitProvider.INSTANCE.getTypeTrait(keyType);
+ ++i;
+ }
- IAsterixApplicationContextInfo appContext = (IAsterixApplicationContextInfo) context
- .getAppContext();
- Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint;
- try {
- splitsAndConstraint = metadata
- .splitProviderAndPartitionConstraintsForInternalOrFeedDataset(
- datasetName, indexName);
- } catch (Exception e) {
- throw new AlgebricksException(e);
- }
- TreeIndexInsertUpdateDeleteOperatorDescriptor btreeBulkLoad = new TreeIndexInsertUpdateDeleteOperatorDescriptor(
- spec, recordDesc, appContext.getStorageManagerInterface(),
- appContext.getIndexRegistryProvider(),
- splitsAndConstraint.first, typeTraits, comparatorFactories,
- fieldPermutation, indexOp, new BTreeDataflowHelperFactory(),
- filterFactory, NoOpOperationCallbackProvider.INSTANCE, txnId);
- return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(
- btreeBulkLoad, splitsAndConstraint.second);
- }
+ IAsterixApplicationContextInfo appContext = (IAsterixApplicationContextInfo) context.getAppContext();
+ Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint;
+ try {
+ splitsAndConstraint = metadata.splitProviderAndPartitionConstraintsForInternalOrFeedDataset(datasetName,
+ indexName);
+ } catch (Exception e) {
+ throw new AlgebricksException(e);
+ }
+ TreeIndexInsertUpdateDeleteOperatorDescriptor btreeBulkLoad = new TreeIndexInsertUpdateDeleteOperatorDescriptor(
+ spec, recordDesc, appContext.getStorageManagerInterface(), appContext.getIndexRegistryProvider(),
+ splitsAndConstraint.first, typeTraits, comparatorFactories, fieldPermutation, indexOp,
+ new BTreeDataflowHelperFactory(), filterFactory, NoOpOperationCallbackProvider.INSTANCE, txnId);
+ return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(btreeBulkLoad, splitsAndConstraint.second);
+ }
- private Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getRTreeDmlRuntime(
- String datasetName, String indexName,
- IOperatorSchema propagatedSchema,
- List<LogicalVariable> primaryKeys,
- List<LogicalVariable> secondaryKeys,
- AsterixTupleFilterFactory filterFactory,
- RecordDescriptor recordDesc, JobGenContext context,
- JobSpecification spec, IndexOp indexOp) throws AlgebricksException {
- AqlCompiledDatasetDecl compiledDatasetDecl = metadata
- .findDataset(datasetName);
- String itemTypeName = compiledDatasetDecl.getItemTypeName();
- IAType itemType;
- try {
- itemType = metadata.findType(itemTypeName);
- } catch (Exception e) {
- throw new AlgebricksException(e);
- }
- if (itemType.getTypeTag() != ATypeTag.RECORD) {
- throw new AlgebricksException("Only record types can be indexed.");
- }
- ARecordType recType = (ARecordType) itemType;
- AqlCompiledIndexDecl cid = DatasetUtils.findSecondaryIndexByName(
- compiledDatasetDecl, indexName);
- List<String> secondaryKeyExprs = cid.getFieldExprs();
- Pair<IAType, Boolean> keyPairType = AqlCompiledIndexDecl.getNonNullableKeyFieldType(
- secondaryKeyExprs.get(0), recType);
+ private Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getRTreeDmlRuntime(String datasetName,
+ String indexName, IOperatorSchema propagatedSchema, List<LogicalVariable> primaryKeys,
+ List<LogicalVariable> secondaryKeys, AsterixTupleFilterFactory filterFactory, RecordDescriptor recordDesc,
+ JobGenContext context, JobSpecification spec, IndexOp indexOp) throws AlgebricksException {
+ AqlCompiledDatasetDecl compiledDatasetDecl = metadata.findDataset(datasetName);
+ String itemTypeName = compiledDatasetDecl.getItemTypeName();
+ IAType itemType;
+ try {
+ itemType = metadata.findType(itemTypeName);
+ } catch (Exception e) {
+ throw new AlgebricksException(e);
+ }
+ if (itemType.getTypeTag() != ATypeTag.RECORD) {
+ throw new AlgebricksException("Only record types can be indexed.");
+ }
+ ARecordType recType = (ARecordType) itemType;
+ AqlCompiledIndexDecl cid = DatasetUtils.findSecondaryIndexByName(compiledDatasetDecl, indexName);
+ List<String> secondaryKeyExprs = cid.getFieldExprs();
+ Pair<IAType, Boolean> keyPairType = AqlCompiledIndexDecl.getNonNullableKeyFieldType(secondaryKeyExprs.get(0),
+ recType);
IAType spatialType = keyPairType.first;
- int dimension = NonTaggedFormatUtil.getNumDimensions(spatialType
- .getTypeTag());
- int numSecondaryKeys = dimension * 2;
- int numPrimaryKeys = primaryKeys.size();
- int numKeys = numSecondaryKeys + numPrimaryKeys;
- ITypeTraits[] typeTraits = new ITypeTraits[numKeys];
- IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[numKeys];
- int[] fieldPermutation = new int[numKeys];
- int i = 0;
+ int dimension = NonTaggedFormatUtil.getNumDimensions(spatialType.getTypeTag());
+ int numSecondaryKeys = dimension * 2;
+ int numPrimaryKeys = primaryKeys.size();
+ int numKeys = numSecondaryKeys + numPrimaryKeys;
+ ITypeTraits[] typeTraits = new ITypeTraits[numKeys];
+ IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[numKeys];
+ int[] fieldPermutation = new int[numKeys];
+ int i = 0;
- for (LogicalVariable varKey : secondaryKeys) {
- int idx = propagatedSchema.findVariable(varKey);
- fieldPermutation[i] = idx;
- i++;
- }
- for (LogicalVariable varKey : primaryKeys) {
- int idx = propagatedSchema.findVariable(varKey);
- fieldPermutation[i] = idx;
- i++;
- }
- IAType nestedKeyType = NonTaggedFormatUtil
- .getNestedSpatialType(spatialType.getTypeTag());
- IPrimitiveValueProviderFactory[] valueProviderFactories = new IPrimitiveValueProviderFactory[numSecondaryKeys];
- for (i = 0; i < numSecondaryKeys; i++) {
- ISerializerDeserializer keySerde = AqlSerializerDeserializerProvider.INSTANCE
- .getSerializerDeserializer(nestedKeyType);
- comparatorFactories[i] = AqlBinaryComparatorFactoryProvider.INSTANCE
- .getBinaryComparatorFactory(nestedKeyType, true);
- typeTraits[i] = AqlTypeTraitProvider.INSTANCE
- .getTypeTrait(nestedKeyType);
- valueProviderFactories[i] = AqlPrimitiveValueProviderFactory.INSTANCE;
- }
- for (Triple<IEvaluatorFactory, ScalarFunctionCallExpression, IAType> evalFactoryAndType : DatasetUtils
- .getPartitioningFunctions(compiledDatasetDecl)) {
- IAType keyType = evalFactoryAndType.third;
- comparatorFactories[i] = AqlBinaryComparatorFactoryProvider.INSTANCE
- .getBinaryComparatorFactory(keyType, true);
- typeTraits[i] = AqlTypeTraitProvider.INSTANCE.getTypeTrait(keyType);
- ++i;
- }
+ for (LogicalVariable varKey : secondaryKeys) {
+ int idx = propagatedSchema.findVariable(varKey);
+ fieldPermutation[i] = idx;
+ i++;
+ }
+ for (LogicalVariable varKey : primaryKeys) {
+ int idx = propagatedSchema.findVariable(varKey);
+ fieldPermutation[i] = idx;
+ i++;
+ }
+ IAType nestedKeyType = NonTaggedFormatUtil.getNestedSpatialType(spatialType.getTypeTag());
+ IPrimitiveValueProviderFactory[] valueProviderFactories = new IPrimitiveValueProviderFactory[numSecondaryKeys];
+ for (i = 0; i < numSecondaryKeys; i++) {
+ ISerializerDeserializer keySerde = AqlSerializerDeserializerProvider.INSTANCE
+ .getSerializerDeserializer(nestedKeyType);
+ comparatorFactories[i] = AqlBinaryComparatorFactoryProvider.INSTANCE.getBinaryComparatorFactory(
+ nestedKeyType, true);
+ typeTraits[i] = AqlTypeTraitProvider.INSTANCE.getTypeTrait(nestedKeyType);
+ valueProviderFactories[i] = AqlPrimitiveValueProviderFactory.INSTANCE;
+ }
+ for (Triple<IEvaluatorFactory, ScalarFunctionCallExpression, IAType> evalFactoryAndType : DatasetUtils
+ .getPartitioningFunctions(compiledDatasetDecl)) {
+ IAType keyType = evalFactoryAndType.third;
+ comparatorFactories[i] = AqlBinaryComparatorFactoryProvider.INSTANCE.getBinaryComparatorFactory(keyType,
+ true);
+ typeTraits[i] = AqlTypeTraitProvider.INSTANCE.getTypeTrait(keyType);
+ ++i;
+ }
- IAsterixApplicationContextInfo appContext = (IAsterixApplicationContextInfo) context
- .getAppContext();
- Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint;
- try {
- splitsAndConstraint = metadata
- .splitProviderAndPartitionConstraintsForInternalOrFeedDataset(
- datasetName, indexName);
- } catch (Exception e) {
- throw new AlgebricksException(e);
- }
- TreeIndexInsertUpdateDeleteOperatorDescriptor rtreeUpdate = new TreeIndexInsertUpdateDeleteOperatorDescriptor(
- spec, recordDesc, appContext.getStorageManagerInterface(),
- appContext.getIndexRegistryProvider(),
- splitsAndConstraint.first, typeTraits, comparatorFactories,
- fieldPermutation, indexOp, new RTreeDataflowHelperFactory(
- valueProviderFactories), filterFactory,
- NoOpOperationCallbackProvider.INSTANCE, txnId);
- return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(
- rtreeUpdate, splitsAndConstraint.second);
- }
+ IAsterixApplicationContextInfo appContext = (IAsterixApplicationContextInfo) context.getAppContext();
+ Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint;
+ try {
+ splitsAndConstraint = metadata.splitProviderAndPartitionConstraintsForInternalOrFeedDataset(datasetName,
+ indexName);
+ } catch (Exception e) {
+ throw new AlgebricksException(e);
+ }
+ TreeIndexInsertUpdateDeleteOperatorDescriptor rtreeUpdate = new TreeIndexInsertUpdateDeleteOperatorDescriptor(
+ spec, recordDesc, appContext.getStorageManagerInterface(), appContext.getIndexRegistryProvider(),
+ splitsAndConstraint.first, typeTraits, comparatorFactories, fieldPermutation, indexOp,
+ new RTreeDataflowHelperFactory(valueProviderFactories), filterFactory,
+ NoOpOperationCallbackProvider.INSTANCE, txnId);
+ return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(rtreeUpdate, splitsAndConstraint.second);
+ }
- public long getTxnId() {
- return txnId;
- }
+ public long getTxnId() {
+ return txnId;
+ }
- public static ITreeIndexFrameFactory createBTreeNSMInteriorFrameFactory(
- ITypeTraits[] typeTraits) {
- return new BTreeNSMInteriorFrameFactory(
- new TypeAwareTupleWriterFactory(typeTraits));
- }
+ public static ITreeIndexFrameFactory createBTreeNSMInteriorFrameFactory(ITypeTraits[] typeTraits) {
+ return new BTreeNSMInteriorFrameFactory(new TypeAwareTupleWriterFactory(typeTraits));
+ }
- public static ITreeIndexFrameFactory createBTreeNSMLeafFrameFactory(
- ITypeTraits[] typeTraits) {
- return new BTreeNSMLeafFrameFactory(new TypeAwareTupleWriterFactory(
- typeTraits));
- }
+ public static ITreeIndexFrameFactory createBTreeNSMLeafFrameFactory(ITypeTraits[] typeTraits) {
+ return new BTreeNSMLeafFrameFactory(new TypeAwareTupleWriterFactory(typeTraits));
+ }
- @Override
- public IFunctionInfo lookupFunction(FunctionIdentifier fid) {
- return AsterixBuiltinFunctions.lookupFunction(fid);
- }
-
+ @Override
+ public IFunctionInfo lookupFunction(FunctionIdentifier fid) {
+ return AsterixBuiltinFunctions.lookupFunction(fid);
+ }
}