During creation of a jobspec for a job that involved the ExternalDataScanOperator, the adapter is now configured at compile time and the oncfigured instance is set inside the operator. At runtime, the instance is deserialized to retain all information that was populated at compile time. This saves effort in re-computing the stuff by all operator instances
git-svn-id: https://asterixdb.googlecode.com/svn/branches/asterix_stabilization@131 eaa15691-b419-025a-1212-ee371bd00084
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/data/operator/ExternalDataScanOperatorDescriptor.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/data/operator/ExternalDataScanOperatorDescriptor.java
index 4fef771..332b48b 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/data/operator/ExternalDataScanOperatorDescriptor.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/data/operator/ExternalDataScanOperatorDescriptor.java
@@ -16,6 +16,7 @@
import java.util.Map;
+import edu.uci.ics.asterix.external.data.adapter.api.IDatasourceAdapter;
import edu.uci.ics.asterix.external.data.adapter.api.IDatasourceReadAdapter;
import edu.uci.ics.asterix.om.types.IAType;
import edu.uci.ics.hyracks.api.application.ICCApplicationContext;
@@ -36,8 +37,7 @@
private final String adapter;
private final Map<String, String> adapterConfiguration;
private final IAType atype;
-
- private transient IDatasourceReadAdapter datasourceReadAdapter;
+ private IDatasourceReadAdapter datasourceReadAdapter;
public ExternalDataScanOperatorDescriptor(JobSpecification spec, String adapter, Map<String, String> arguments,
IAType atype, RecordDescriptor rDesc) {
@@ -86,7 +86,7 @@
throws HyracksDataException {
try {
- datasourceReadAdapter = (IDatasourceReadAdapter) Class.forName(adapter).newInstance();
+ //datasourceReadAdapter = (IDatasourceReadAdapter) Class.forName(adapter).newInstance();
datasourceReadAdapter.configure(adapterConfiguration, atype);
datasourceReadAdapter.initialize(ctx);
} catch (Exception e) {
@@ -106,4 +106,8 @@
}
};
}
+
+ public void setDatasourceAdapter(IDatasourceReadAdapter adapterInstance){
+ this.datasourceReadAdapter = adapterInstance;
+ }
}
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/AbstractDatasourceAdapter.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/AbstractDatasourceAdapter.java
index 320a29f..dd153ea 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/AbstractDatasourceAdapter.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/AbstractDatasourceAdapter.java
@@ -40,13 +40,13 @@
protected Map<String, String> configuration;
- protected AlgebricksPartitionConstraint partitionConstraint;
+ protected transient AlgebricksPartitionConstraint partitionConstraint;
protected IAType atype;
protected IHyracksTaskContext ctx;
- protected IDataParser dataParser;
+ protected transient IDataParser dataParser;
protected static final HashMap<ATypeTag, IValueParserFactory> typeToValueParserFactMap = new HashMap<ATypeTag, IValueParserFactory>();
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/HDFSAdapter.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/HDFSAdapter.java
index b59f034..8bd2fb0 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/HDFSAdapter.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/HDFSAdapter.java
@@ -26,15 +26,14 @@
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapred.Counters.Counter;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.SequenceFileInputFormat;
+import org.apache.hadoop.mapred.Counters.Counter;
import edu.uci.ics.asterix.external.data.adapter.api.IDatasourceReadAdapter;
-import edu.uci.ics.asterix.external.data.parser.DelimitedDataStreamParser;
import edu.uci.ics.asterix.external.data.parser.IDataParser;
import edu.uci.ics.asterix.external.data.parser.IDataStreamParser;
import edu.uci.ics.asterix.om.types.ARecordType;
@@ -56,12 +55,13 @@
private List<String> hdfsPaths;
private String inputFormatClassName;
private Object[] inputSplits;
- private JobConf conf;
+ private transient JobConf conf;
private IHyracksTaskContext ctx;
private Reporter reporter;
private boolean isDelimited;
private Character delimiter;
private InputSplitsProxy inputSplitsProxy;
+ private String parserClass;
private static final Map<String, String> formatClassNames = new HashMap<String, String>();
public static final String KEY_HDFS_URL = "hdfs";
@@ -108,7 +108,7 @@
throw new Exception("format " + format + " not supported");
}
- String parserClass = configuration.get(KEY_PARSER);
+ parserClass = configuration.get(KEY_PARSER);
if (parserClass == null) {
if (FORMAT_DELIMITED_TEXT.equalsIgnoreCase(configuration.get(KEY_FORMAT))) {
parserClass = formatToParserMap.get(FORMAT_DELIMITED_TEXT);
@@ -116,18 +116,23 @@
parserClass = formatToParserMap.get(FORMAT_ADM);
}
}
+
+ }
+ private void configureParser() throws Exception {
dataParser = (IDataParser) Class.forName(parserClass).newInstance();
dataParser.configure(configuration);
}
private void configurePartitionConstraint() throws Exception {
- InputSplit[] inputSplits = conf.getInputFormat().getSplits(conf, 0);
- inputSplitsProxy = new InputSplitsProxy(conf, inputSplits);
- partitionConstraint = new AlgebricksCountPartitionConstraint(inputSplits.length);
- hdfsPaths = new ArrayList<String>();
- for (String hdfsPath : configuration.get(KEY_HDFS_PATH).split(",")) {
- hdfsPaths.add(hdfsPath);
+ if (inputSplitsProxy == null) {
+ InputSplit[] inputSplits = conf.getInputFormat().getSplits(conf, 0);
+ inputSplitsProxy = new InputSplitsProxy(conf, inputSplits);
+ partitionConstraint = new AlgebricksCountPartitionConstraint(inputSplits.length);
+ hdfsPaths = new ArrayList<String>();
+ for (String hdfsPath : configuration.get(KEY_HDFS_PATH).split(",")) {
+ hdfsPaths.add(hdfsPath);
+ }
}
}
@@ -172,6 +177,7 @@
public void initialize(IHyracksTaskContext ctx) throws Exception {
this.ctx = ctx;
inputSplits = inputSplitsProxy.toInputSplits(conf);
+ configureParser();
dataParser.initialize((ARecordType) atype, ctx);
reporter = new Reporter() {
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/NCFileSystemAdapter.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/NCFileSystemAdapter.java
index 2449db0..385ffe5 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/NCFileSystemAdapter.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/NCFileSystemAdapter.java
@@ -34,7 +34,8 @@
public class NCFileSystemAdapter extends AbstractDatasourceAdapter implements IDatasourceReadAdapter {
private static final long serialVersionUID = -4154256369973615710L;
- protected FileSplit[] fileSplits;
+ private FileSplit[] fileSplits;
+ private String parserClass;
public class Constants {
public static final String KEY_SPLITS = "path";
@@ -69,6 +70,7 @@
@Override
public void initialize(IHyracksTaskContext ctx) throws Exception {
this.ctx = ctx;
+ configureDataParser();
dataParser.initialize((ARecordType) atype, ctx);
}
@@ -101,20 +103,22 @@
}
private void configureFileSplits(String[] splits) {
- fileSplits = new FileSplit[splits.length];
- String nodeName;
- String nodeLocalPath;
- int count = 0;
- for (String splitPath : splits) {
- nodeName = splitPath.split(":")[0];
- nodeLocalPath = splitPath.split("://")[1];
- FileSplit fileSplit = new FileSplit(nodeName, new FileReference(new File(nodeLocalPath)));
- fileSplits[count++] = fileSplit;
+ if (fileSplits == null) {
+ fileSplits = new FileSplit[splits.length];
+ String nodeName;
+ String nodeLocalPath;
+ int count = 0;
+ for (String splitPath : splits) {
+ nodeName = splitPath.split(":")[0];
+ nodeLocalPath = splitPath.split("://")[1];
+ FileSplit fileSplit = new FileSplit(nodeName, new FileReference(new File(nodeLocalPath)));
+ fileSplits[count++] = fileSplit;
+ }
}
}
protected void configureFormat() throws Exception {
- String parserClass = configuration.get(Constants.KEY_PARSER);
+ parserClass = configuration.get(Constants.KEY_PARSER);
if (parserClass == null) {
if (Constants.FORMAT_DELIMITED_TEXT.equalsIgnoreCase(configuration.get(KEY_FORMAT))) {
parserClass = formatToParserMap.get(FORMAT_DELIMITED_TEXT);
@@ -124,6 +128,10 @@
throw new IllegalArgumentException(" format " + configuration.get(KEY_FORMAT) + " not supported");
}
}
+
+ }
+
+ private void configureDataParser() throws Exception {
dataParser = (IDataParser) Class.forName(parserClass).newInstance();
dataParser.configure(configuration);
}
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java
index 2d85fe3..17deaaf 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java
@@ -24,6 +24,7 @@
import edu.uci.ics.asterix.dataflow.base.IAsterixApplicationContextInfo;
import edu.uci.ics.asterix.dataflow.data.nontagged.valueproviders.AqlPrimitiveValueProviderFactory;
import edu.uci.ics.asterix.external.data.adapter.api.IDatasourceAdapter;
+import edu.uci.ics.asterix.external.data.adapter.api.IDatasourceReadAdapter;
import edu.uci.ics.asterix.external.data.operator.ExternalDataScanOperatorDescriptor;
import edu.uci.ics.asterix.feed.comm.IFeedMessage;
import edu.uci.ics.asterix.feed.mgmt.FeedId;
@@ -87,846 +88,1025 @@
import edu.uci.ics.hyracks.storage.am.rtree.frames.RTreeNSMLeafFrameFactory;
import edu.uci.ics.hyracks.storage.am.rtree.tuples.RTreeTypeAwareTupleWriterFactory;
-public class AqlMetadataProvider implements IMetadataProvider<AqlSourceId, String> {
- private final long txnId;
- private boolean isWriteTransaction;
- private final AqlCompiledMetadataDeclarations metadata;
+public class AqlMetadataProvider implements
+ IMetadataProvider<AqlSourceId, String> {
+ private final long txnId;
+ private boolean isWriteTransaction;
+ private final AqlCompiledMetadataDeclarations metadata;
- public AqlMetadataProvider(long txnId, boolean isWriteTransaction, AqlCompiledMetadataDeclarations metadata) {
- this.txnId = txnId;
- this.isWriteTransaction = isWriteTransaction;
- this.metadata = metadata;
- }
+ public AqlMetadataProvider(long txnId, boolean isWriteTransaction,
+ AqlCompiledMetadataDeclarations metadata) {
+ this.txnId = txnId;
+ this.isWriteTransaction = isWriteTransaction;
+ this.metadata = metadata;
+ }
- @Override
- public AqlDataSource findDataSource(AqlSourceId id) throws AlgebricksException {
- AqlSourceId aqlId = (AqlSourceId) id;
- return lookupSourceInMetadata(metadata, aqlId);
- }
+ @Override
+ public AqlDataSource findDataSource(AqlSourceId id)
+ throws AlgebricksException {
+ AqlSourceId aqlId = (AqlSourceId) id;
+ return lookupSourceInMetadata(metadata, aqlId);
+ }
- public AqlCompiledMetadataDeclarations getMetadataDeclarations() {
- return metadata;
- }
+ public AqlCompiledMetadataDeclarations getMetadataDeclarations() {
+ return metadata;
+ }
- public boolean isWriteTransaction() {
- return isWriteTransaction;
- }
+ public boolean isWriteTransaction() {
+ return isWriteTransaction;
+ }
- @Override
- public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getScannerRuntime(
- IDataSource<AqlSourceId> dataSource, List<LogicalVariable> scanVariables,
- List<LogicalVariable> projectVariables, boolean projectPushed, JobGenContext context,
- JobSpecification jobSpec) throws AlgebricksException {
- AqlCompiledDatasetDecl adecl = metadata.findDataset(dataSource.getId().getDatasetName());
- if (adecl == null) {
- throw new AlgebricksException("Unknown dataset " + dataSource.getId().getDatasetName());
- }
- switch (adecl.getDatasetType()) {
- case FEED:
- if (dataSource instanceof ExternalFeedDataSource) {
- return buildExternalDatasetScan(jobSpec, adecl, dataSource);
- } else {
- return buildInternalDatasetScan(jobSpec, adecl, dataSource, context);
- }
+ @Override
+ public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getScannerRuntime(
+ IDataSource<AqlSourceId> dataSource,
+ List<LogicalVariable> scanVariables,
+ List<LogicalVariable> projectVariables, boolean projectPushed,
+ JobGenContext context, JobSpecification jobSpec)
+ throws AlgebricksException {
+ AqlCompiledDatasetDecl adecl = metadata.findDataset(dataSource.getId()
+ .getDatasetName());
+ if (adecl == null) {
+ throw new AlgebricksException("Unknown dataset "
+ + dataSource.getId().getDatasetName());
+ }
+ switch (adecl.getDatasetType()) {
+ case FEED:
+ if (dataSource instanceof ExternalFeedDataSource) {
+ return buildExternalDatasetScan(jobSpec, adecl, dataSource);
+ } else {
+ return buildInternalDatasetScan(jobSpec, adecl, dataSource,
+ context);
+ }
- case INTERNAL: {
- return buildInternalDatasetScan(jobSpec, adecl, dataSource, context);
- }
+ case INTERNAL: {
+ return buildInternalDatasetScan(jobSpec, adecl, dataSource, context);
+ }
- case EXTERNAL: {
- return buildExternalDatasetScan(jobSpec, adecl, dataSource);
- }
+ case EXTERNAL: {
+ return buildExternalDatasetScan(jobSpec, adecl, dataSource);
+ }
- default: {
- throw new IllegalArgumentException();
- }
- }
- }
+ default: {
+ throw new IllegalArgumentException();
+ }
+ }
+ }
- private Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> buildInternalDatasetScan(JobSpecification jobSpec,
- AqlCompiledDatasetDecl acedl, IDataSource<AqlSourceId> dataSource, JobGenContext context)
- throws AlgebricksException {
- AqlSourceId asid = dataSource.getId();
- String datasetName = asid.getDatasetName();
- String indexName = DatasetUtils.getPrimaryIndex(acedl).getIndexName();
+ private Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> buildInternalDatasetScan(
+ JobSpecification jobSpec, AqlCompiledDatasetDecl acedl,
+ IDataSource<AqlSourceId> dataSource, JobGenContext context)
+ throws AlgebricksException {
+ AqlSourceId asid = dataSource.getId();
+ String datasetName = asid.getDatasetName();
+ String indexName = DatasetUtils.getPrimaryIndex(acedl).getIndexName();
- try {
- return buildBtreeRuntime(metadata, context, jobSpec, datasetName, acedl, indexName, null, null, true, true);
- } catch (AlgebricksException e) {
- throw new AlgebricksException(e);
- }
- }
+ try {
+ return buildBtreeRuntime(metadata, context, jobSpec, datasetName,
+ acedl, indexName, null, null, true, true);
+ } catch (AlgebricksException e) {
+ throw new AlgebricksException(e);
+ }
+ }
- private Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> buildExternalDatasetScan(JobSpecification jobSpec,
- AqlCompiledDatasetDecl acedl, IDataSource<AqlSourceId> dataSource) throws AlgebricksException {
- String itemTypeName = acedl.getItemTypeName();
- IAType itemType;
- try {
- itemType = metadata.findType(itemTypeName);
- } catch (Exception e) {
- throw new AlgebricksException(e);
- }
+ private Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> buildExternalDatasetScan(
+ JobSpecification jobSpec, AqlCompiledDatasetDecl acedl,
+ IDataSource<AqlSourceId> dataSource) throws AlgebricksException {
+ String itemTypeName = acedl.getItemTypeName();
+ IAType itemType;
+ try {
+ itemType = metadata.findType(itemTypeName);
+ } catch (Exception e) {
+ throw new AlgebricksException(e);
+ }
- if (dataSource instanceof ExternalFeedDataSource) {
- AqlCompiledFeedDatasetDetails acfdd = (AqlCompiledFeedDatasetDetails) ((ExternalFeedDataSource) dataSource)
- .getCompiledDatasetDecl().getAqlCompiledDatasetDetails();
+ if (dataSource instanceof ExternalFeedDataSource) {
+ AqlCompiledFeedDatasetDetails acfdd = (AqlCompiledFeedDatasetDetails) ((ExternalFeedDataSource) dataSource)
+ .getCompiledDatasetDecl().getAqlCompiledDatasetDetails();
- return buildFeedIntakeRuntime(jobSpec, metadata.getDataverseName(), acedl.getName(), itemType, acfdd,
- metadata.getFormat());
- } else {
- return buildExternalDataScannerRuntime(jobSpec, itemType,
- (AqlCompiledExternalDatasetDetails) acedl.getAqlCompiledDatasetDetails(), metadata.getFormat());
- }
- }
+ return buildFeedIntakeRuntime(jobSpec, metadata.getDataverseName(),
+ acedl.getName(), itemType, acfdd, metadata.getFormat());
+ } else {
+ return buildExternalDataScannerRuntime(jobSpec, itemType,
+ (AqlCompiledExternalDatasetDetails) acedl
+ .getAqlCompiledDatasetDetails(), metadata
+ .getFormat());
+ }
+ }
- @SuppressWarnings("rawtypes")
- public static Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> buildExternalDataScannerRuntime(
- JobSpecification jobSpec, IAType itemType, AqlCompiledExternalDatasetDetails decl, IDataFormat format)
- throws AlgebricksException {
- if (itemType.getTypeTag() != ATypeTag.RECORD) {
- throw new AlgebricksException("Can only scan datasets of records.");
- }
+ @SuppressWarnings("rawtypes")
+ public static Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> buildExternalDataScannerRuntime(
+ JobSpecification jobSpec, IAType itemType,
+ AqlCompiledExternalDatasetDetails decl, IDataFormat format)
+ throws AlgebricksException {
+ if (itemType.getTypeTag() != ATypeTag.RECORD) {
+ throw new AlgebricksException("Can only scan datasets of records.");
+ }
- IDatasourceAdapter adapter;
- try {
- adapter = (IDatasourceAdapter) Class.forName(decl.getAdapter()).newInstance();
- } catch (Exception e) {
- e.printStackTrace();
- throw new AlgebricksException("unable to load the adapter class " + e);
- }
+ IDatasourceReadAdapter adapter;
+ try {
+ adapter = (IDatasourceReadAdapter) Class.forName(decl.getAdapter())
+ .newInstance();
+ } catch (Exception e) {
+ e.printStackTrace();
+ throw new AlgebricksException("unable to load the adapter class "
+ + e);
+ }
- if (!(adapter.getAdapterType().equals(IDatasourceAdapter.AdapterType.READ) || adapter.getAdapterType().equals(
- IDatasourceAdapter.AdapterType.READ_WRITE))) {
- throw new AlgebricksException("external dataset does not support read");
- }
- ARecordType rt = (ARecordType) itemType;
- try {
- adapter.configure(decl.getProperties(), itemType);
- } catch (Exception e) {
- e.printStackTrace();
- throw new AlgebricksException("unable to configure the datasource adapter " + e);
- }
+ if (!(adapter.getAdapterType().equals(
+ IDatasourceAdapter.AdapterType.READ) || adapter
+ .getAdapterType().equals(
+ IDatasourceAdapter.AdapterType.READ_WRITE))) {
+ throw new AlgebricksException(
+ "external dataset does not support read");
+ }
+ ARecordType rt = (ARecordType) itemType;
+ try {
+ adapter.configure(decl.getProperties(), itemType);
+ } catch (Exception e) {
+ e.printStackTrace();
+ throw new AlgebricksException(
+ "unable to configure the datasource adapter " + e);
+ }
- ISerializerDeserializer payloadSerde = format.getSerdeProvider().getSerializerDeserializer(itemType);
- RecordDescriptor scannerDesc = new RecordDescriptor(new ISerializerDeserializer[] { payloadSerde });
+ ISerializerDeserializer payloadSerde = format.getSerdeProvider()
+ .getSerializerDeserializer(itemType);
+ RecordDescriptor scannerDesc = new RecordDescriptor(
+ new ISerializerDeserializer[] { payloadSerde });
- ExternalDataScanOperatorDescriptor dataScanner = new ExternalDataScanOperatorDescriptor(jobSpec,
- decl.getAdapter(), decl.getProperties(), rt, scannerDesc);
- AlgebricksPartitionConstraint constraint = adapter.getPartitionConstraint();
- return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(dataScanner, constraint);
- }
+ ExternalDataScanOperatorDescriptor dataScanner = new ExternalDataScanOperatorDescriptor(
+ jobSpec, decl.getAdapter(), decl.getProperties(), rt,
+ scannerDesc);
+ dataScanner.setDatasourceAdapter(adapter);
+ AlgebricksPartitionConstraint constraint = adapter
+ .getPartitionConstraint();
+ return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(
+ dataScanner, constraint);
+ }
- @SuppressWarnings("rawtypes")
- public static Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> buildScannerRuntime(
- JobSpecification jobSpec, IAType itemType, IParseFileSplitsDecl decl, IDataFormat format)
- throws AlgebricksException {
- if (itemType.getTypeTag() != ATypeTag.RECORD) {
- throw new AlgebricksException("Can only scan datasets of records.");
- }
- ARecordType rt = (ARecordType) itemType;
- ITupleParserFactory tupleParser = format.createTupleParser(rt, decl);
- FileSplit[] splits = decl.getSplits();
- IFileSplitProvider scannerSplitProvider = new ConstantFileSplitProvider(splits);
- ISerializerDeserializer payloadSerde = format.getSerdeProvider().getSerializerDeserializer(itemType);
- RecordDescriptor scannerDesc = new RecordDescriptor(new ISerializerDeserializer[] { payloadSerde });
- IOperatorDescriptor scanner = new FileScanOperatorDescriptor(jobSpec, scannerSplitProvider, tupleParser,
- scannerDesc);
- String[] locs = new String[splits.length];
- for (int i = 0; i < splits.length; i++) {
- locs[i] = splits[i].getNodeName();
- }
- AlgebricksPartitionConstraint apc = new AlgebricksAbsolutePartitionConstraint(locs);
- return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(scanner, apc);
- }
+ @SuppressWarnings("rawtypes")
+ public static Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> buildScannerRuntime(
+ JobSpecification jobSpec, IAType itemType,
+ IParseFileSplitsDecl decl, IDataFormat format)
+ throws AlgebricksException {
+ if (itemType.getTypeTag() != ATypeTag.RECORD) {
+ throw new AlgebricksException("Can only scan datasets of records.");
+ }
+ ARecordType rt = (ARecordType) itemType;
+ ITupleParserFactory tupleParser = format.createTupleParser(rt, decl);
+ FileSplit[] splits = decl.getSplits();
+ IFileSplitProvider scannerSplitProvider = new ConstantFileSplitProvider(
+ splits);
+ ISerializerDeserializer payloadSerde = format.getSerdeProvider()
+ .getSerializerDeserializer(itemType);
+ RecordDescriptor scannerDesc = new RecordDescriptor(
+ new ISerializerDeserializer[] { payloadSerde });
+ IOperatorDescriptor scanner = new FileScanOperatorDescriptor(jobSpec,
+ scannerSplitProvider, tupleParser, scannerDesc);
+ String[] locs = new String[splits.length];
+ for (int i = 0; i < splits.length; i++) {
+ locs[i] = splits[i].getNodeName();
+ }
+ AlgebricksPartitionConstraint apc = new AlgebricksAbsolutePartitionConstraint(
+ locs);
+ return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(
+ scanner, apc);
+ }
- @SuppressWarnings("rawtypes")
- public static Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> buildFeedIntakeRuntime(
- JobSpecification jobSpec, String dataverse, String dataset, IAType itemType,
- AqlCompiledFeedDatasetDetails decl, IDataFormat format) throws AlgebricksException {
- if (itemType.getTypeTag() != ATypeTag.RECORD) {
- throw new AlgebricksException("Can only consume records.");
- }
- IDatasourceAdapter adapter;
- try {
- adapter = (IDatasourceAdapter) Class.forName(decl.getAdapter()).newInstance();
- } catch (Exception e) {
- e.printStackTrace();
- throw new AlgebricksException("unable to load the adapter class " + e);
- }
+ @SuppressWarnings("rawtypes")
+ public static Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> buildFeedIntakeRuntime(
+ JobSpecification jobSpec, String dataverse, String dataset,
+ IAType itemType, AqlCompiledFeedDatasetDetails decl,
+ IDataFormat format) throws AlgebricksException {
+ if (itemType.getTypeTag() != ATypeTag.RECORD) {
+ throw new AlgebricksException("Can only consume records.");
+ }
+ IDatasourceAdapter adapter;
+ try {
+ adapter = (IDatasourceAdapter) Class.forName(decl.getAdapter())
+ .newInstance();
+ } catch (Exception e) {
+ e.printStackTrace();
+ throw new AlgebricksException("unable to load the adapter class "
+ + e);
+ }
- ARecordType rt = (ARecordType) itemType;
- try {
- adapter.configure(decl.getProperties(), itemType);
- } catch (Exception e) {
- e.printStackTrace();
- throw new AlgebricksException("unable to configure the datasource adapter " + e);
- }
+ ARecordType rt = (ARecordType) itemType;
+ try {
+ adapter.configure(decl.getProperties(), itemType);
+ } catch (Exception e) {
+ e.printStackTrace();
+ throw new AlgebricksException(
+ "unable to configure the datasource adapter " + e);
+ }
- ISerializerDeserializer payloadSerde = format.getSerdeProvider().getSerializerDeserializer(itemType);
- RecordDescriptor feedDesc = new RecordDescriptor(new ISerializerDeserializer[] { payloadSerde });
+ ISerializerDeserializer payloadSerde = format.getSerdeProvider()
+ .getSerializerDeserializer(itemType);
+ RecordDescriptor feedDesc = new RecordDescriptor(
+ new ISerializerDeserializer[] { payloadSerde });
- FeedIntakeOperatorDescriptor feedIngestor = new FeedIntakeOperatorDescriptor(jobSpec, new FeedId(dataverse,
- dataset), decl.getAdapter(), decl.getProperties(), rt, feedDesc);
+ FeedIntakeOperatorDescriptor feedIngestor = new FeedIntakeOperatorDescriptor(
+ jobSpec, new FeedId(dataverse, dataset), decl.getAdapter(),
+ decl.getProperties(), rt, feedDesc);
- AlgebricksPartitionConstraint constraint = adapter.getPartitionConstraint();
- return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(feedIngestor, constraint);
- }
+ AlgebricksPartitionConstraint constraint = adapter
+ .getPartitionConstraint();
+ return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(
+ feedIngestor, constraint);
+ }
- @SuppressWarnings("rawtypes")
- public static Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> buildFeedMessengerRuntime(
- JobSpecification jobSpec, AqlCompiledMetadataDeclarations metadata, AqlCompiledFeedDatasetDetails decl,
- String dataverse, String dataset, List<IFeedMessage> feedMessages) throws AlgebricksException {
+ @SuppressWarnings("rawtypes")
+ public static Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> buildFeedMessengerRuntime(
+ JobSpecification jobSpec, AqlCompiledMetadataDeclarations metadata,
+ AqlCompiledFeedDatasetDetails decl, String dataverse,
+ String dataset, List<IFeedMessage> feedMessages)
+ throws AlgebricksException {
- Pair<IFileSplitProvider, AlgebricksPartitionConstraint> spPc;
- try {
- spPc = metadata.splitProviderAndPartitionConstraintsForInternalOrFeedDataset(dataset, dataset);
- } catch (Exception e) {
- throw new AlgebricksException(e);
- }
+ Pair<IFileSplitProvider, AlgebricksPartitionConstraint> spPc;
+ try {
+ spPc = metadata
+ .splitProviderAndPartitionConstraintsForInternalOrFeedDataset(
+ dataset, dataset);
+ } catch (Exception e) {
+ throw new AlgebricksException(e);
+ }
- FeedMessageOperatorDescriptor feedMessenger = new FeedMessageOperatorDescriptor(jobSpec, dataverse, dataset,
- feedMessages);
+ FeedMessageOperatorDescriptor feedMessenger = new FeedMessageOperatorDescriptor(
+ jobSpec, dataverse, dataset, feedMessages);
- return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(feedMessenger, spPc.second);
- }
+ return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(
+ feedMessenger, spPc.second);
+ }
- @SuppressWarnings("rawtypes")
- public static Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> buildBtreeRuntime(
- AqlCompiledMetadataDeclarations metadata, JobGenContext context, JobSpecification jobSpec,
- String datasetName, AqlCompiledDatasetDecl ddecl, String indexName, int[] lowKeyFields,
- int[] highKeyFields, boolean lowKeyInclusive, boolean highKeyInclusive) throws AlgebricksException {
- String itemTypeName = ddecl.getItemTypeName();
- IAType itemType;
- try {
- itemType = metadata.findType(itemTypeName);
- } catch (Exception e) {
- throw new AlgebricksException(e);
- }
+ @SuppressWarnings("rawtypes")
+ public static Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> buildBtreeRuntime(
+ AqlCompiledMetadataDeclarations metadata, JobGenContext context,
+ JobSpecification jobSpec, String datasetName,
+ AqlCompiledDatasetDecl ddecl, String indexName, int[] lowKeyFields,
+ int[] highKeyFields, boolean lowKeyInclusive,
+ boolean highKeyInclusive) throws AlgebricksException {
+ String itemTypeName = ddecl.getItemTypeName();
+ IAType itemType;
+ try {
+ itemType = metadata.findType(itemTypeName);
+ } catch (Exception e) {
+ throw new AlgebricksException(e);
+ }
- boolean isSecondary = true;
- AqlCompiledIndexDecl primIdxDecl = DatasetUtils.getPrimaryIndex(ddecl);
+ boolean isSecondary = true;
+ AqlCompiledIndexDecl primIdxDecl = DatasetUtils.getPrimaryIndex(ddecl);
- if (primIdxDecl != null) {
- isSecondary = !indexName.equals(primIdxDecl.getIndexName());
- }
+ if (primIdxDecl != null) {
+ isSecondary = !indexName.equals(primIdxDecl.getIndexName());
+ }
- int numPrimaryKeys = DatasetUtils.getPartitioningFunctions(ddecl).size();
- ISerializerDeserializer[] recordFields;
- IBinaryComparatorFactory[] comparatorFactories;
- ITypeTraits[] typeTraits;
- int numSecondaryKeys = 0;
- int i = 0;
- if (isSecondary) {
- AqlCompiledIndexDecl cid = DatasetUtils.findSecondaryIndexByName(ddecl, indexName);
- if (cid == null) {
- throw new AlgebricksException("Code generation error: no index " + indexName + " for dataset "
- + datasetName);
- }
- List<String> secondaryKeyFields = cid.getFieldExprs();
- numSecondaryKeys = secondaryKeyFields.size();
- int numKeys = numSecondaryKeys + numPrimaryKeys;
- recordFields = new ISerializerDeserializer[numKeys];
- typeTraits = new ITypeTraits[numKeys];
- // comparatorFactories = new
- // IBinaryComparatorFactory[numSecondaryKeys];
- comparatorFactories = new IBinaryComparatorFactory[numKeys];
- if (itemType.getTypeTag() != ATypeTag.RECORD) {
- throw new AlgebricksException("Only record types can be indexed.");
- }
- ARecordType recType = (ARecordType) itemType;
- for (i = 0; i < numSecondaryKeys; i++) {
- IAType keyType = AqlCompiledIndexDecl.keyFieldType(secondaryKeyFields.get(i), recType);
- ISerializerDeserializer keySerde = metadata.getFormat().getSerdeProvider()
- .getSerializerDeserializer(keyType);
- recordFields[i] = keySerde;
- comparatorFactories[i] = AqlBinaryComparatorFactoryProvider.INSTANCE.getBinaryComparatorFactory(
- keyType, OrderKind.ASC);
- typeTraits[i] = AqlTypeTraitProvider.INSTANCE.getTypeTrait(keyType);
- }
- } else {
- recordFields = new ISerializerDeserializer[numPrimaryKeys + 1];
- comparatorFactories = new IBinaryComparatorFactory[numPrimaryKeys];
- typeTraits = new ITypeTraits[numPrimaryKeys + 1];
- ISerializerDeserializer payloadSerde = metadata.getFormat().getSerdeProvider()
- .getSerializerDeserializer(itemType);
- recordFields[numPrimaryKeys] = payloadSerde;
- typeTraits[numPrimaryKeys] = AqlTypeTraitProvider.INSTANCE.getTypeTrait(itemType);
- }
+ int numPrimaryKeys = DatasetUtils.getPartitioningFunctions(ddecl)
+ .size();
+ ISerializerDeserializer[] recordFields;
+ IBinaryComparatorFactory[] comparatorFactories;
+ ITypeTraits[] typeTraits;
+ int numSecondaryKeys = 0;
+ int i = 0;
+ if (isSecondary) {
+ AqlCompiledIndexDecl cid = DatasetUtils.findSecondaryIndexByName(
+ ddecl, indexName);
+ if (cid == null) {
+ throw new AlgebricksException(
+ "Code generation error: no index " + indexName
+ + " for dataset " + datasetName);
+ }
+ List<String> secondaryKeyFields = cid.getFieldExprs();
+ numSecondaryKeys = secondaryKeyFields.size();
+ int numKeys = numSecondaryKeys + numPrimaryKeys;
+ recordFields = new ISerializerDeserializer[numKeys];
+ typeTraits = new ITypeTraits[numKeys];
+ // comparatorFactories = new
+ // IBinaryComparatorFactory[numSecondaryKeys];
+ comparatorFactories = new IBinaryComparatorFactory[numKeys];
+ if (itemType.getTypeTag() != ATypeTag.RECORD) {
+ throw new AlgebricksException(
+ "Only record types can be indexed.");
+ }
+ ARecordType recType = (ARecordType) itemType;
+ for (i = 0; i < numSecondaryKeys; i++) {
+ IAType keyType = AqlCompiledIndexDecl.keyFieldType(
+ secondaryKeyFields.get(i), recType);
+ ISerializerDeserializer keySerde = metadata.getFormat()
+ .getSerdeProvider().getSerializerDeserializer(keyType);
+ recordFields[i] = keySerde;
+ comparatorFactories[i] = AqlBinaryComparatorFactoryProvider.INSTANCE
+ .getBinaryComparatorFactory(keyType, OrderKind.ASC);
+ typeTraits[i] = AqlTypeTraitProvider.INSTANCE
+ .getTypeTrait(keyType);
+ }
+ } else {
+ recordFields = new ISerializerDeserializer[numPrimaryKeys + 1];
+ comparatorFactories = new IBinaryComparatorFactory[numPrimaryKeys];
+ typeTraits = new ITypeTraits[numPrimaryKeys + 1];
+ ISerializerDeserializer payloadSerde = metadata.getFormat()
+ .getSerdeProvider().getSerializerDeserializer(itemType);
+ recordFields[numPrimaryKeys] = payloadSerde;
+ typeTraits[numPrimaryKeys] = AqlTypeTraitProvider.INSTANCE
+ .getTypeTrait(itemType);
+ }
- for (Triple<IEvaluatorFactory, ScalarFunctionCallExpression, IAType> evalFactoryAndType : DatasetUtils
- .getPartitioningFunctions(ddecl)) {
- IAType keyType = evalFactoryAndType.third;
- ISerializerDeserializer keySerde = metadata.getFormat().getSerdeProvider()
- .getSerializerDeserializer(keyType);
- recordFields[i] = keySerde;
- // if (!isSecondary) {
- comparatorFactories[i] = AqlBinaryComparatorFactoryProvider.INSTANCE.getBinaryComparatorFactory(keyType,
- OrderKind.ASC);
- // }
- typeTraits[i] = AqlTypeTraitProvider.INSTANCE.getTypeTrait(keyType);
- ++i;
- }
+ for (Triple<IEvaluatorFactory, ScalarFunctionCallExpression, IAType> evalFactoryAndType : DatasetUtils
+ .getPartitioningFunctions(ddecl)) {
+ IAType keyType = evalFactoryAndType.third;
+ ISerializerDeserializer keySerde = metadata.getFormat()
+ .getSerdeProvider().getSerializerDeserializer(keyType);
+ recordFields[i] = keySerde;
+ // if (!isSecondary) {
+ comparatorFactories[i] = AqlBinaryComparatorFactoryProvider.INSTANCE
+ .getBinaryComparatorFactory(keyType, OrderKind.ASC);
+ // }
+ typeTraits[i] = AqlTypeTraitProvider.INSTANCE.getTypeTrait(keyType);
+ ++i;
+ }
- ITreeIndexFrameFactory interiorFrameFactory = createBTreeNSMInteriorFrameFactory(typeTraits);
- ITreeIndexFrameFactory leafFrameFactory = createBTreeNSMLeafFrameFactory(typeTraits);
+ ITreeIndexFrameFactory interiorFrameFactory = createBTreeNSMInteriorFrameFactory(typeTraits);
+ ITreeIndexFrameFactory leafFrameFactory = createBTreeNSMLeafFrameFactory(typeTraits);
- IAsterixApplicationContextInfo appContext = (IAsterixApplicationContextInfo) context.getAppContext();
- RecordDescriptor recDesc = new RecordDescriptor(recordFields);
+ IAsterixApplicationContextInfo appContext = (IAsterixApplicationContextInfo) context
+ .getAppContext();
+ RecordDescriptor recDesc = new RecordDescriptor(recordFields);
- Pair<IFileSplitProvider, AlgebricksPartitionConstraint> spPc;
- try {
- spPc = metadata.splitProviderAndPartitionConstraintsForInternalOrFeedDataset(datasetName, indexName);
- } catch (Exception e) {
- throw new AlgebricksException(e);
- }
+ Pair<IFileSplitProvider, AlgebricksPartitionConstraint> spPc;
+ try {
+ spPc = metadata
+ .splitProviderAndPartitionConstraintsForInternalOrFeedDataset(
+ datasetName, indexName);
+ } catch (Exception e) {
+ throw new AlgebricksException(e);
+ }
- BTreeSearchOperatorDescriptor btreeSearchOp = new BTreeSearchOperatorDescriptor(jobSpec, recDesc,
- appContext.getStorageManagerInterface(), appContext.getTreeRegisterProvider(), spPc.first,
- interiorFrameFactory, leafFrameFactory, typeTraits, comparatorFactories, true, lowKeyFields,
- highKeyFields, lowKeyInclusive, highKeyInclusive, new BTreeDataflowHelperFactory());
+ BTreeSearchOperatorDescriptor btreeSearchOp = new BTreeSearchOperatorDescriptor(
+ jobSpec, recDesc, appContext.getStorageManagerInterface(),
+ appContext.getTreeRegisterProvider(), spPc.first,
+ interiorFrameFactory, leafFrameFactory, typeTraits,
+ comparatorFactories, true, lowKeyFields, highKeyFields,
+ lowKeyInclusive, highKeyInclusive,
+ new BTreeDataflowHelperFactory());
- return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(btreeSearchOp, spPc.second);
- }
+ return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(
+ btreeSearchOp, spPc.second);
+ }
- @SuppressWarnings("rawtypes")
- public static Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> buildRtreeRuntime(
- AqlCompiledMetadataDeclarations metadata, JobGenContext context, JobSpecification jobSpec,
- String datasetName, AqlCompiledDatasetDecl ddecl, String indexName, int[] keyFields)
- throws AlgebricksException {
- String itemTypeName = ddecl.getItemTypeName();
- IAType itemType;
- try {
- itemType = metadata.findType(itemTypeName);
- } catch (Exception e) {
- throw new AlgebricksException(e);
- }
+ @SuppressWarnings("rawtypes")
+ public static Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> buildRtreeRuntime(
+ AqlCompiledMetadataDeclarations metadata, JobGenContext context,
+ JobSpecification jobSpec, String datasetName,
+ AqlCompiledDatasetDecl ddecl, String indexName, int[] keyFields)
+ throws AlgebricksException {
+ String itemTypeName = ddecl.getItemTypeName();
+ IAType itemType;
+ try {
+ itemType = metadata.findType(itemTypeName);
+ } catch (Exception e) {
+ throw new AlgebricksException(e);
+ }
- boolean isSecondary = true;
- AqlCompiledIndexDecl primIdxDecl = DatasetUtils.getPrimaryIndex(ddecl);
- if (primIdxDecl != null) {
- isSecondary = !indexName.equals(primIdxDecl.getIndexName());
- }
+ boolean isSecondary = true;
+ AqlCompiledIndexDecl primIdxDecl = DatasetUtils.getPrimaryIndex(ddecl);
+ if (primIdxDecl != null) {
+ isSecondary = !indexName.equals(primIdxDecl.getIndexName());
+ }
- int numPrimaryKeys = DatasetUtils.getPartitioningFunctions(ddecl).size();
- ISerializerDeserializer[] recordFields;
- IBinaryComparatorFactory[] comparatorFactories;
- ITypeTraits[] typeTraits;
- IPrimitiveValueProviderFactory[] valueProviderFactories;
- int numSecondaryKeys = 0;
- int numNestedSecondaryKeyFields = 0;
- int i = 0;
- if (isSecondary) {
- AqlCompiledIndexDecl cid = DatasetUtils.findSecondaryIndexByName(ddecl, indexName);
- if (cid == null) {
- throw new AlgebricksException("Code generation error: no index " + indexName + " for dataset "
- + datasetName);
- }
- List<String> secondaryKeyFields = cid.getFieldExprs();
- numSecondaryKeys = secondaryKeyFields.size();
+ int numPrimaryKeys = DatasetUtils.getPartitioningFunctions(ddecl)
+ .size();
+ ISerializerDeserializer[] recordFields;
+ IBinaryComparatorFactory[] comparatorFactories;
+ ITypeTraits[] typeTraits;
+ IPrimitiveValueProviderFactory[] valueProviderFactories;
+ int numSecondaryKeys = 0;
+ int numNestedSecondaryKeyFields = 0;
+ int i = 0;
+ if (isSecondary) {
+ AqlCompiledIndexDecl cid = DatasetUtils.findSecondaryIndexByName(
+ ddecl, indexName);
+ if (cid == null) {
+ throw new AlgebricksException(
+ "Code generation error: no index " + indexName
+ + " for dataset " + datasetName);
+ }
+ List<String> secondaryKeyFields = cid.getFieldExprs();
+ numSecondaryKeys = secondaryKeyFields.size();
- if (numSecondaryKeys != 1) {
- throw new AlgebricksException(
- "Cannot use "
- + numSecondaryKeys
- + " fields as a key for the R-tree index. There can be only one field as a key for the R-tree index.");
- }
+ if (numSecondaryKeys != 1) {
+ throw new AlgebricksException(
+ "Cannot use "
+ + numSecondaryKeys
+ + " fields as a key for the R-tree index. There can be only one field as a key for the R-tree index.");
+ }
- if (itemType.getTypeTag() != ATypeTag.RECORD) {
- throw new AlgebricksException("Only record types can be indexed.");
- }
- ARecordType recType = (ARecordType) itemType;
+ if (itemType.getTypeTag() != ATypeTag.RECORD) {
+ throw new AlgebricksException(
+ "Only record types can be indexed.");
+ }
+ ARecordType recType = (ARecordType) itemType;
- IAType keyType = AqlCompiledIndexDecl.keyFieldType(secondaryKeyFields.get(0), recType);
- if (keyType == null) {
- throw new AlgebricksException("Could not find field " + secondaryKeyFields.get(0) + " in the schema.");
- }
+ IAType keyType = AqlCompiledIndexDecl.keyFieldType(
+ secondaryKeyFields.get(0), recType);
+ if (keyType == null) {
+ throw new AlgebricksException("Could not find field "
+ + secondaryKeyFields.get(0) + " in the schema.");
+ }
- int dimension = NonTaggedFormatUtil.getNumDimensions(keyType.getTypeTag());
- numNestedSecondaryKeyFields = dimension * 2;
+ int dimension = NonTaggedFormatUtil.getNumDimensions(keyType
+ .getTypeTag());
+ numNestedSecondaryKeyFields = dimension * 2;
- int numFields = numNestedSecondaryKeyFields + numPrimaryKeys;
- recordFields = new ISerializerDeserializer[numFields];
- typeTraits = new ITypeTraits[numFields];
- comparatorFactories = new IBinaryComparatorFactory[numNestedSecondaryKeyFields];
- valueProviderFactories = new IPrimitiveValueProviderFactory[numNestedSecondaryKeyFields];
+ int numFields = numNestedSecondaryKeyFields + numPrimaryKeys;
+ recordFields = new ISerializerDeserializer[numFields];
+ typeTraits = new ITypeTraits[numFields];
+ comparatorFactories = new IBinaryComparatorFactory[numNestedSecondaryKeyFields];
+ valueProviderFactories = new IPrimitiveValueProviderFactory[numNestedSecondaryKeyFields];
- IAType nestedKeyType = NonTaggedFormatUtil.getNestedSpatialType(keyType.getTypeTag());
- for (i = 0; i < numNestedSecondaryKeyFields; i++) {
- ISerializerDeserializer keySerde = AqlSerializerDeserializerProvider.INSTANCE
- .getSerializerDeserializer(nestedKeyType);
- recordFields[i] = keySerde;
- comparatorFactories[i] = AqlBinaryComparatorFactoryProvider.INSTANCE.getBinaryComparatorFactory(
- nestedKeyType, OrderKind.ASC);
- typeTraits[i] = AqlTypeTraitProvider.INSTANCE.getTypeTrait(nestedKeyType);
- valueProviderFactories[i] = AqlPrimitiveValueProviderFactory.INSTANCE;
- }
- } else {
- throw new AlgebricksException("R-tree can only be used as a secondary index");
- }
+ IAType nestedKeyType = NonTaggedFormatUtil
+ .getNestedSpatialType(keyType.getTypeTag());
+ for (i = 0; i < numNestedSecondaryKeyFields; i++) {
+ ISerializerDeserializer keySerde = AqlSerializerDeserializerProvider.INSTANCE
+ .getSerializerDeserializer(nestedKeyType);
+ recordFields[i] = keySerde;
+ comparatorFactories[i] = AqlBinaryComparatorFactoryProvider.INSTANCE
+ .getBinaryComparatorFactory(nestedKeyType,
+ OrderKind.ASC);
+ typeTraits[i] = AqlTypeTraitProvider.INSTANCE
+ .getTypeTrait(nestedKeyType);
+ valueProviderFactories[i] = AqlPrimitiveValueProviderFactory.INSTANCE;
+ }
+ } else {
+ throw new AlgebricksException(
+ "R-tree can only be used as a secondary index");
+ }
- for (Triple<IEvaluatorFactory, ScalarFunctionCallExpression, IAType> evalFactoryAndType : DatasetUtils
- .getPartitioningFunctions(ddecl)) {
- IAType keyType = evalFactoryAndType.third;
- ISerializerDeserializer keySerde = AqlSerializerDeserializerProvider.INSTANCE
- .getSerializerDeserializer(keyType);
- recordFields[i] = keySerde;
- typeTraits[i] = AqlTypeTraitProvider.INSTANCE.getTypeTrait(keyType);
- ++i;
- }
+ for (Triple<IEvaluatorFactory, ScalarFunctionCallExpression, IAType> evalFactoryAndType : DatasetUtils
+ .getPartitioningFunctions(ddecl)) {
+ IAType keyType = evalFactoryAndType.third;
+ ISerializerDeserializer keySerde = AqlSerializerDeserializerProvider.INSTANCE
+ .getSerializerDeserializer(keyType);
+ recordFields[i] = keySerde;
+ typeTraits[i] = AqlTypeTraitProvider.INSTANCE.getTypeTrait(keyType);
+ ++i;
+ }
- ITreeIndexFrameFactory interiorFrameFactory = new RTreeNSMInteriorFrameFactory(
- new RTreeTypeAwareTupleWriterFactory(typeTraits), valueProviderFactories);
- ITreeIndexFrameFactory leafFrameFactory = new RTreeNSMLeafFrameFactory(new RTreeTypeAwareTupleWriterFactory(
- typeTraits), valueProviderFactories);
- /*
- ITreeIndexFrameFactory interiorFrameFactory = JobGenHelper.createRTreeNSMInteriorFrameFactory(typeTraits,
- numNestedSecondaryKeyFields);
- ITreeIndexFrameFactory leafFrameFactory = JobGenHelper.createRTreeNSMLeafFrameFactory(typeTraits,
- numNestedSecondaryKeyFields);
- */
- IAsterixApplicationContextInfo appContext = (IAsterixApplicationContextInfo) context.getAppContext();
- RecordDescriptor recDesc = new RecordDescriptor(recordFields);
+ ITreeIndexFrameFactory interiorFrameFactory = new RTreeNSMInteriorFrameFactory(
+ new RTreeTypeAwareTupleWriterFactory(typeTraits),
+ valueProviderFactories);
+ ITreeIndexFrameFactory leafFrameFactory = new RTreeNSMLeafFrameFactory(
+ new RTreeTypeAwareTupleWriterFactory(typeTraits),
+ valueProviderFactories);
+ /*
+ * ITreeIndexFrameFactory interiorFrameFactory =
+ * JobGenHelper.createRTreeNSMInteriorFrameFactory(typeTraits,
+ * numNestedSecondaryKeyFields); ITreeIndexFrameFactory leafFrameFactory
+ * = JobGenHelper.createRTreeNSMLeafFrameFactory(typeTraits,
+ * numNestedSecondaryKeyFields);
+ */
+ IAsterixApplicationContextInfo appContext = (IAsterixApplicationContextInfo) context
+ .getAppContext();
+ RecordDescriptor recDesc = new RecordDescriptor(recordFields);
- Pair<IFileSplitProvider, AlgebricksPartitionConstraint> spPc;
- try {
- spPc = metadata.splitProviderAndPartitionConstraintsForInternalOrFeedDataset(datasetName, indexName);
- } catch (Exception e) {
- throw new AlgebricksException(e);
- }
+ Pair<IFileSplitProvider, AlgebricksPartitionConstraint> spPc;
+ try {
+ spPc = metadata
+ .splitProviderAndPartitionConstraintsForInternalOrFeedDataset(
+ datasetName, indexName);
+ } catch (Exception e) {
+ throw new AlgebricksException(e);
+ }
- RTreeSearchOperatorDescriptor rtreeSearchOp = new RTreeSearchOperatorDescriptor(jobSpec, recDesc,
- appContext.getStorageManagerInterface(), appContext.getTreeRegisterProvider(), spPc.first,
- interiorFrameFactory, leafFrameFactory, typeTraits, comparatorFactories, keyFields,
- new RTreeDataflowHelperFactory());
+ RTreeSearchOperatorDescriptor rtreeSearchOp = new RTreeSearchOperatorDescriptor(
+ jobSpec, recDesc, appContext.getStorageManagerInterface(),
+ appContext.getTreeRegisterProvider(), spPc.first,
+ interiorFrameFactory, leafFrameFactory, typeTraits,
+ comparatorFactories, keyFields,
+ new RTreeDataflowHelperFactory());
- return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(rtreeSearchOp, spPc.second);
- }
+ return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(
+ rtreeSearchOp, spPc.second);
+ }
- @Override
- public Pair<IPushRuntimeFactory, AlgebricksPartitionConstraint> getWriteFileRuntime(IDataSink sink,
- int[] printColumns, IPrinterFactory[] printerFactories, RecordDescriptor inputDesc) {
- FileSplitDataSink fsds = (FileSplitDataSink) sink;
- FileSplitSinkId fssi = (FileSplitSinkId) fsds.getId();
- FileSplit fs = fssi.getFileSplit();
- File outFile = fs.getLocalFile().getFile();
- String nodeId = fs.getNodeName();
+ @Override
+ public Pair<IPushRuntimeFactory, AlgebricksPartitionConstraint> getWriteFileRuntime(
+ IDataSink sink, int[] printColumns,
+ IPrinterFactory[] printerFactories, RecordDescriptor inputDesc) {
+ FileSplitDataSink fsds = (FileSplitDataSink) sink;
+ FileSplitSinkId fssi = (FileSplitSinkId) fsds.getId();
+ FileSplit fs = fssi.getFileSplit();
+ File outFile = fs.getLocalFile().getFile();
+ String nodeId = fs.getNodeName();
- SinkWriterRuntimeFactory runtime = new SinkWriterRuntimeFactory(printColumns, printerFactories, outFile,
- metadata.getWriterFactory(), inputDesc);
- AlgebricksPartitionConstraint apc = new AlgebricksAbsolutePartitionConstraint(new String[] { nodeId });
- return new Pair<IPushRuntimeFactory, AlgebricksPartitionConstraint>(runtime, apc);
- }
+ SinkWriterRuntimeFactory runtime = new SinkWriterRuntimeFactory(
+ printColumns, printerFactories, outFile, metadata
+ .getWriterFactory(), inputDesc);
+ AlgebricksPartitionConstraint apc = new AlgebricksAbsolutePartitionConstraint(
+ new String[] { nodeId });
+ return new Pair<IPushRuntimeFactory, AlgebricksPartitionConstraint>(
+ runtime, apc);
+ }
- @Override
- public IDataSourceIndex<String, AqlSourceId> findDataSourceIndex(String indexId, AqlSourceId dataSourceId)
- throws AlgebricksException {
- AqlDataSource ads = findDataSource(dataSourceId);
- AqlCompiledDatasetDecl adecl = ads.getCompiledDatasetDecl();
- if (adecl.getDatasetType() == DatasetType.EXTERNAL) {
- throw new AlgebricksException("No index for external dataset " + dataSourceId);
- }
+ @Override
+ public IDataSourceIndex<String, AqlSourceId> findDataSourceIndex(
+ String indexId, AqlSourceId dataSourceId)
+ throws AlgebricksException {
+ AqlDataSource ads = findDataSource(dataSourceId);
+ AqlCompiledDatasetDecl adecl = ads.getCompiledDatasetDecl();
+ if (adecl.getDatasetType() == DatasetType.EXTERNAL) {
+ throw new AlgebricksException("No index for external dataset "
+ + dataSourceId);
+ }
- String idxName = (String) indexId;
- AqlCompiledIndexDecl acid = DatasetUtils.findSecondaryIndexByName(adecl, idxName);
- AqlSourceId asid = (AqlSourceId) dataSourceId;
- if (acid != null) {
- return new AqlIndex(acid, metadata, asid.getDatasetName());
- } else {
- AqlCompiledIndexDecl primIdx = DatasetUtils.getPrimaryIndex(adecl);
- if (primIdx.getIndexName().equals(indexId)) {
- return new AqlIndex(primIdx, metadata, asid.getDatasetName());
- } else {
- return null;
- }
- }
- }
+ String idxName = (String) indexId;
+ AqlCompiledIndexDecl acid = DatasetUtils.findSecondaryIndexByName(
+ adecl, idxName);
+ AqlSourceId asid = (AqlSourceId) dataSourceId;
+ if (acid != null) {
+ return new AqlIndex(acid, metadata, asid.getDatasetName());
+ } else {
+ AqlCompiledIndexDecl primIdx = DatasetUtils.getPrimaryIndex(adecl);
+ if (primIdx.getIndexName().equals(indexId)) {
+ return new AqlIndex(primIdx, metadata, asid.getDatasetName());
+ } else {
+ return null;
+ }
+ }
+ }
- public static AqlDataSource lookupSourceInMetadata(AqlCompiledMetadataDeclarations metadata, AqlSourceId aqlId)
- throws AlgebricksException {
- if (!aqlId.getDataverseName().equals(metadata.getDataverseName())) {
- return null;
- }
- AqlCompiledDatasetDecl acdd = metadata.findDataset(aqlId.getDatasetName());
- if (acdd == null) {
- throw new AlgebricksException("Datasource with id " + aqlId + " was not found.");
- }
- String tName = acdd.getItemTypeName();
- IAType itemType;
- try {
- itemType = metadata.findType(tName);
- } catch (Exception e) {
- throw new AlgebricksException(e);
- }
- return new AqlDataSource(aqlId, acdd, itemType);
- }
+ public static AqlDataSource lookupSourceInMetadata(
+ AqlCompiledMetadataDeclarations metadata, AqlSourceId aqlId)
+ throws AlgebricksException {
+ if (!aqlId.getDataverseName().equals(metadata.getDataverseName())) {
+ return null;
+ }
+ AqlCompiledDatasetDecl acdd = metadata.findDataset(aqlId
+ .getDatasetName());
+ if (acdd == null) {
+ throw new AlgebricksException("Datasource with id " + aqlId
+ + " was not found.");
+ }
+ String tName = acdd.getItemTypeName();
+ IAType itemType;
+ try {
+ itemType = metadata.findType(tName);
+ } catch (Exception e) {
+ throw new AlgebricksException(e);
+ }
+ return new AqlDataSource(aqlId, acdd, itemType);
+ }
- @Override
- public boolean scannerOperatorIsLeaf(IDataSource<AqlSourceId> dataSource) {
- AqlSourceId asid = dataSource.getId();
- String datasetName = asid.getDatasetName();
- AqlCompiledDatasetDecl adecl = metadata.findDataset(datasetName);
- if (adecl == null) {
- throw new IllegalArgumentException("Unknown dataset " + datasetName);
- }
- return adecl.getDatasetType() == DatasetType.EXTERNAL;
- }
+ @Override
+ public boolean scannerOperatorIsLeaf(IDataSource<AqlSourceId> dataSource) {
+ AqlSourceId asid = dataSource.getId();
+ String datasetName = asid.getDatasetName();
+ AqlCompiledDatasetDecl adecl = metadata.findDataset(datasetName);
+ if (adecl == null) {
+ throw new IllegalArgumentException("Unknown dataset " + datasetName);
+ }
+ return adecl.getDatasetType() == DatasetType.EXTERNAL;
+ }
- @Override
- public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getWriteResultRuntime(
- IDataSource<AqlSourceId> dataSource, IOperatorSchema propagatedSchema, List<LogicalVariable> keys,
- LogicalVariable payload, JobGenContext context, JobSpecification spec) throws AlgebricksException {
- String datasetName = dataSource.getId().getDatasetName();
- int numKeys = keys.size();
- // move key fields to front
- int[] fieldPermutation = new int[numKeys + 1];
- // System.arraycopy(keys, 0, fieldPermutation, 0, numKeys);
- int i = 0;
- for (LogicalVariable varKey : keys) {
- int idx = propagatedSchema.findVariable(varKey);
- fieldPermutation[i] = idx;
- i++;
- }
- fieldPermutation[numKeys] = propagatedSchema.findVariable(payload);
+ @Override
+ public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getWriteResultRuntime(
+ IDataSource<AqlSourceId> dataSource,
+ IOperatorSchema propagatedSchema, List<LogicalVariable> keys,
+ LogicalVariable payload, JobGenContext context,
+ JobSpecification spec) throws AlgebricksException {
+ String datasetName = dataSource.getId().getDatasetName();
+ int numKeys = keys.size();
+ // move key fields to front
+ int[] fieldPermutation = new int[numKeys + 1];
+ // System.arraycopy(keys, 0, fieldPermutation, 0, numKeys);
+ int i = 0;
+ for (LogicalVariable varKey : keys) {
+ int idx = propagatedSchema.findVariable(varKey);
+ fieldPermutation[i] = idx;
+ i++;
+ }
+ fieldPermutation[numKeys] = propagatedSchema.findVariable(payload);
- AqlCompiledDatasetDecl compiledDatasetDecl = metadata.findDataset(datasetName);
- if (compiledDatasetDecl == null) {
- throw new AlgebricksException("Unknown dataset " + datasetName);
- }
- String indexName = DatasetUtils.getPrimaryIndex(compiledDatasetDecl).getIndexName();
+ AqlCompiledDatasetDecl compiledDatasetDecl = metadata
+ .findDataset(datasetName);
+ if (compiledDatasetDecl == null) {
+ throw new AlgebricksException("Unknown dataset " + datasetName);
+ }
+ String indexName = DatasetUtils.getPrimaryIndex(compiledDatasetDecl)
+ .getIndexName();
- ITypeTraits[] typeTraits = DatasetUtils.computeTupleTypeTraits(compiledDatasetDecl, metadata);
+ ITypeTraits[] typeTraits = DatasetUtils.computeTupleTypeTraits(
+ compiledDatasetDecl, metadata);
- ITreeIndexFrameFactory interiorFrameFactory = createBTreeNSMInteriorFrameFactory(typeTraits);
- ITreeIndexFrameFactory leafFrameFactory = createBTreeNSMLeafFrameFactory(typeTraits);
- IAsterixApplicationContextInfo appContext = (IAsterixApplicationContextInfo) context.getAppContext();
+ ITreeIndexFrameFactory interiorFrameFactory = createBTreeNSMInteriorFrameFactory(typeTraits);
+ ITreeIndexFrameFactory leafFrameFactory = createBTreeNSMLeafFrameFactory(typeTraits);
+ IAsterixApplicationContextInfo appContext = (IAsterixApplicationContextInfo) context
+ .getAppContext();
- IBinaryComparatorFactory[] comparatorFactories = DatasetUtils.computeKeysBinaryComparatorFactories(
- compiledDatasetDecl, context.getBinaryComparatorFactoryProvider());
+ IBinaryComparatorFactory[] comparatorFactories = DatasetUtils
+ .computeKeysBinaryComparatorFactories(compiledDatasetDecl,
+ context.getBinaryComparatorFactoryProvider());
- Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint;
- try {
- splitsAndConstraint = metadata.splitProviderAndPartitionConstraintsForInternalOrFeedDataset(datasetName,
- indexName);
- } catch (Exception e) {
- throw new AlgebricksException(e);
- }
+ Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint;
+ try {
+ splitsAndConstraint = metadata
+ .splitProviderAndPartitionConstraintsForInternalOrFeedDataset(
+ datasetName, indexName);
+ } catch (Exception e) {
+ throw new AlgebricksException(e);
+ }
- TreeIndexBulkLoadOperatorDescriptor btreeBulkLoad = new TreeIndexBulkLoadOperatorDescriptor(spec,
- appContext.getStorageManagerInterface(), appContext.getTreeRegisterProvider(),
- splitsAndConstraint.first, interiorFrameFactory, leafFrameFactory, typeTraits, comparatorFactories,
- fieldPermutation, GlobalConfig.DEFAULT_BTREE_FILL_FACTOR, new BTreeDataflowHelperFactory());
- return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(btreeBulkLoad, splitsAndConstraint.second);
- }
+ TreeIndexBulkLoadOperatorDescriptor btreeBulkLoad = new TreeIndexBulkLoadOperatorDescriptor(
+ spec, appContext.getStorageManagerInterface(), appContext
+ .getTreeRegisterProvider(), splitsAndConstraint.first,
+ interiorFrameFactory, leafFrameFactory, typeTraits,
+ comparatorFactories, fieldPermutation,
+ GlobalConfig.DEFAULT_BTREE_FILL_FACTOR,
+ new BTreeDataflowHelperFactory());
+ return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(
+ btreeBulkLoad, splitsAndConstraint.second);
+ }
- @Override
- public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getInsertRuntime(
- IDataSource<AqlSourceId> dataSource, IOperatorSchema propagatedSchema, List<LogicalVariable> keys,
- LogicalVariable payload, RecordDescriptor recordDesc, JobGenContext context, JobSpecification spec)
- throws AlgebricksException {
- String datasetName = dataSource.getId().getDatasetName();
- int numKeys = keys.size();
- // move key fields to front
- int[] fieldPermutation = new int[numKeys + 1];
- // System.arraycopy(keys, 0, fieldPermutation, 0, numKeys);
- int i = 0;
- for (LogicalVariable varKey : keys) {
- int idx = propagatedSchema.findVariable(varKey);
- fieldPermutation[i] = idx;
- i++;
- }
- fieldPermutation[numKeys] = propagatedSchema.findVariable(payload);
+ @Override
+ public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getInsertRuntime(
+ IDataSource<AqlSourceId> dataSource,
+ IOperatorSchema propagatedSchema, List<LogicalVariable> keys,
+ LogicalVariable payload, RecordDescriptor recordDesc,
+ JobGenContext context, JobSpecification spec)
+ throws AlgebricksException {
+ String datasetName = dataSource.getId().getDatasetName();
+ int numKeys = keys.size();
+ // move key fields to front
+ int[] fieldPermutation = new int[numKeys + 1];
+ // System.arraycopy(keys, 0, fieldPermutation, 0, numKeys);
+ int i = 0;
+ for (LogicalVariable varKey : keys) {
+ int idx = propagatedSchema.findVariable(varKey);
+ fieldPermutation[i] = idx;
+ i++;
+ }
+ fieldPermutation[numKeys] = propagatedSchema.findVariable(payload);
- AqlCompiledDatasetDecl compiledDatasetDecl = metadata.findDataset(datasetName);
- if (compiledDatasetDecl == null) {
- throw new AlgebricksException("Unknown dataset " + datasetName);
- }
- String indexName = DatasetUtils.getPrimaryIndex(compiledDatasetDecl).getIndexName();
+ AqlCompiledDatasetDecl compiledDatasetDecl = metadata
+ .findDataset(datasetName);
+ if (compiledDatasetDecl == null) {
+ throw new AlgebricksException("Unknown dataset " + datasetName);
+ }
+ String indexName = DatasetUtils.getPrimaryIndex(compiledDatasetDecl)
+ .getIndexName();
- ITypeTraits[] typeTraits = DatasetUtils.computeTupleTypeTraits(compiledDatasetDecl, metadata);
+ ITypeTraits[] typeTraits = DatasetUtils.computeTupleTypeTraits(
+ compiledDatasetDecl, metadata);
- ITreeIndexFrameFactory interiorFrameFactory = createBTreeNSMInteriorFrameFactory(typeTraits);
- ITreeIndexFrameFactory leafFrameFactory = createBTreeNSMLeafFrameFactory(typeTraits);
- IAsterixApplicationContextInfo appContext = (IAsterixApplicationContextInfo) context.getAppContext();
+ ITreeIndexFrameFactory interiorFrameFactory = createBTreeNSMInteriorFrameFactory(typeTraits);
+ ITreeIndexFrameFactory leafFrameFactory = createBTreeNSMLeafFrameFactory(typeTraits);
+ IAsterixApplicationContextInfo appContext = (IAsterixApplicationContextInfo) context
+ .getAppContext();
- IBinaryComparatorFactory[] comparatorFactories = DatasetUtils.computeKeysBinaryComparatorFactories(
- compiledDatasetDecl, context.getBinaryComparatorFactoryProvider());
+ IBinaryComparatorFactory[] comparatorFactories = DatasetUtils
+ .computeKeysBinaryComparatorFactories(compiledDatasetDecl,
+ context.getBinaryComparatorFactoryProvider());
- Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint;
- try {
- splitsAndConstraint = metadata.splitProviderAndPartitionConstraintsForInternalOrFeedDataset(datasetName,
- indexName);
- } catch (Exception e) {
- throw new AlgebricksException(e);
- }
+ Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint;
+ try {
+ splitsAndConstraint = metadata
+ .splitProviderAndPartitionConstraintsForInternalOrFeedDataset(
+ datasetName, indexName);
+ } catch (Exception e) {
+ throw new AlgebricksException(e);
+ }
- TreeIndexInsertUpdateDeleteOperatorDescriptor btreeBulkLoad = new TreeIndexInsertUpdateDeleteOperatorDescriptor(
- spec, recordDesc, appContext.getStorageManagerInterface(), appContext.getTreeRegisterProvider(),
- splitsAndConstraint.first, interiorFrameFactory, leafFrameFactory, typeTraits, comparatorFactories,
- new BTreeDataflowHelperFactory(), fieldPermutation, IndexOp.INSERT, txnId);
- return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(btreeBulkLoad, splitsAndConstraint.second);
- }
+ TreeIndexInsertUpdateDeleteOperatorDescriptor btreeBulkLoad = new TreeIndexInsertUpdateDeleteOperatorDescriptor(
+ spec, recordDesc, appContext.getStorageManagerInterface(),
+ appContext.getTreeRegisterProvider(),
+ splitsAndConstraint.first, interiorFrameFactory,
+ leafFrameFactory, typeTraits, comparatorFactories,
+ new BTreeDataflowHelperFactory(), fieldPermutation,
+ IndexOp.INSERT, txnId);
+ return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(
+ btreeBulkLoad, splitsAndConstraint.second);
+ }
- @Override
- public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getDeleteRuntime(
- IDataSource<AqlSourceId> dataSource, IOperatorSchema propagatedSchema, List<LogicalVariable> keys,
- LogicalVariable payload, RecordDescriptor recordDesc, JobGenContext context, JobSpecification spec)
- throws AlgebricksException {
- String datasetName = dataSource.getId().getDatasetName();
- int numKeys = keys.size();
- // move key fields to front
- int[] fieldPermutation = new int[numKeys + 1];
- // System.arraycopy(keys, 0, fieldPermutation, 0, numKeys);
- int i = 0;
- for (LogicalVariable varKey : keys) {
- int idx = propagatedSchema.findVariable(varKey);
- fieldPermutation[i] = idx;
- i++;
- }
- fieldPermutation[numKeys] = propagatedSchema.findVariable(payload);
+ @Override
+ public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getDeleteRuntime(
+ IDataSource<AqlSourceId> dataSource,
+ IOperatorSchema propagatedSchema, List<LogicalVariable> keys,
+ LogicalVariable payload, RecordDescriptor recordDesc,
+ JobGenContext context, JobSpecification spec)
+ throws AlgebricksException {
+ String datasetName = dataSource.getId().getDatasetName();
+ int numKeys = keys.size();
+ // move key fields to front
+ int[] fieldPermutation = new int[numKeys + 1];
+ // System.arraycopy(keys, 0, fieldPermutation, 0, numKeys);
+ int i = 0;
+ for (LogicalVariable varKey : keys) {
+ int idx = propagatedSchema.findVariable(varKey);
+ fieldPermutation[i] = idx;
+ i++;
+ }
+ fieldPermutation[numKeys] = propagatedSchema.findVariable(payload);
- AqlCompiledDatasetDecl compiledDatasetDecl = metadata.findDataset(datasetName);
- if (compiledDatasetDecl == null) {
- throw new AlgebricksException("Unknown dataset " + datasetName);
- }
- String indexName = DatasetUtils.getPrimaryIndex(compiledDatasetDecl).getIndexName();
+ AqlCompiledDatasetDecl compiledDatasetDecl = metadata
+ .findDataset(datasetName);
+ if (compiledDatasetDecl == null) {
+ throw new AlgebricksException("Unknown dataset " + datasetName);
+ }
+ String indexName = DatasetUtils.getPrimaryIndex(compiledDatasetDecl)
+ .getIndexName();
- ITypeTraits[] typeTraits = DatasetUtils.computeTupleTypeTraits(compiledDatasetDecl, metadata);
+ ITypeTraits[] typeTraits = DatasetUtils.computeTupleTypeTraits(
+ compiledDatasetDecl, metadata);
- ITreeIndexFrameFactory interiorFrameFactory = createBTreeNSMInteriorFrameFactory(typeTraits);
- ITreeIndexFrameFactory leafFrameFactory = createBTreeNSMLeafFrameFactory(typeTraits);
- IAsterixApplicationContextInfo appContext = (IAsterixApplicationContextInfo) context.getAppContext();
+ ITreeIndexFrameFactory interiorFrameFactory = createBTreeNSMInteriorFrameFactory(typeTraits);
+ ITreeIndexFrameFactory leafFrameFactory = createBTreeNSMLeafFrameFactory(typeTraits);
+ IAsterixApplicationContextInfo appContext = (IAsterixApplicationContextInfo) context
+ .getAppContext();
- IBinaryComparatorFactory[] comparatorFactories = DatasetUtils.computeKeysBinaryComparatorFactories(
- compiledDatasetDecl, context.getBinaryComparatorFactoryProvider());
+ IBinaryComparatorFactory[] comparatorFactories = DatasetUtils
+ .computeKeysBinaryComparatorFactories(compiledDatasetDecl,
+ context.getBinaryComparatorFactoryProvider());
- Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint;
- try {
- splitsAndConstraint = metadata.splitProviderAndPartitionConstraintsForInternalOrFeedDataset(datasetName,
- indexName);
- } catch (Exception e) {
- throw new AlgebricksException(e);
- }
+ Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint;
+ try {
+ splitsAndConstraint = metadata
+ .splitProviderAndPartitionConstraintsForInternalOrFeedDataset(
+ datasetName, indexName);
+ } catch (Exception e) {
+ throw new AlgebricksException(e);
+ }
- TreeIndexInsertUpdateDeleteOperatorDescriptor btreeBulkLoad = new TreeIndexInsertUpdateDeleteOperatorDescriptor(
- spec, recordDesc, appContext.getStorageManagerInterface(), appContext.getTreeRegisterProvider(),
- splitsAndConstraint.first, interiorFrameFactory, leafFrameFactory, typeTraits, comparatorFactories,
- new BTreeDataflowHelperFactory(), fieldPermutation, IndexOp.DELETE, txnId);
- return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(btreeBulkLoad, splitsAndConstraint.second);
- }
+ TreeIndexInsertUpdateDeleteOperatorDescriptor btreeBulkLoad = new TreeIndexInsertUpdateDeleteOperatorDescriptor(
+ spec, recordDesc, appContext.getStorageManagerInterface(),
+ appContext.getTreeRegisterProvider(),
+ splitsAndConstraint.first, interiorFrameFactory,
+ leafFrameFactory, typeTraits, comparatorFactories,
+ new BTreeDataflowHelperFactory(), fieldPermutation,
+ IndexOp.DELETE, txnId);
+ return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(
+ btreeBulkLoad, splitsAndConstraint.second);
+ }
- @Override
- public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getIndexInsertRuntime(
- IDataSourceIndex<String, AqlSourceId> dataSourceIndex, IOperatorSchema propagatedSchema,
- List<LogicalVariable> primaryKeys, List<LogicalVariable> secondaryKeys, RecordDescriptor recordDesc,
- JobGenContext context, JobSpecification spec) throws AlgebricksException {
- String indexName = dataSourceIndex.getId();
- String datasetName = dataSourceIndex.getDataSource().getId().getDatasetName();
- AqlCompiledDatasetDecl compiledDatasetDecl = metadata.findDataset(datasetName);
- if (compiledDatasetDecl == null) {
- throw new AlgebricksException("Unknown dataset " + datasetName);
- }
- AqlCompiledIndexDecl cid = DatasetUtils.findSecondaryIndexByName(compiledDatasetDecl, indexName);
+ @Override
+ public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getIndexInsertRuntime(
+ IDataSourceIndex<String, AqlSourceId> dataSourceIndex,
+ IOperatorSchema propagatedSchema,
+ List<LogicalVariable> primaryKeys,
+ List<LogicalVariable> secondaryKeys, RecordDescriptor recordDesc,
+ JobGenContext context, JobSpecification spec)
+ throws AlgebricksException {
+ String indexName = dataSourceIndex.getId();
+ String datasetName = dataSourceIndex.getDataSource().getId()
+ .getDatasetName();
+ AqlCompiledDatasetDecl compiledDatasetDecl = metadata
+ .findDataset(datasetName);
+ if (compiledDatasetDecl == null) {
+ throw new AlgebricksException("Unknown dataset " + datasetName);
+ }
+ AqlCompiledIndexDecl cid = DatasetUtils.findSecondaryIndexByName(
+ compiledDatasetDecl, indexName);
- if (cid.getKind() == IndexKind.BTREE)
- return getBTreeDmlRuntime(datasetName, indexName, propagatedSchema, primaryKeys, secondaryKeys, recordDesc,
- context, spec, IndexOp.INSERT);
- else
- return getRTreeDmlRuntime(datasetName, indexName, propagatedSchema, primaryKeys, secondaryKeys, recordDesc,
- context, spec, IndexOp.INSERT);
- }
+ if (cid.getKind() == IndexKind.BTREE)
+ return getBTreeDmlRuntime(datasetName, indexName, propagatedSchema,
+ primaryKeys, secondaryKeys, recordDesc, context, spec,
+ IndexOp.INSERT);
+ else
+ return getRTreeDmlRuntime(datasetName, indexName, propagatedSchema,
+ primaryKeys, secondaryKeys, recordDesc, context, spec,
+ IndexOp.INSERT);
+ }
- @Override
- public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getIndexDeleteRuntime(
- IDataSourceIndex<String, AqlSourceId> dataSourceIndex, IOperatorSchema propagatedSchema,
- List<LogicalVariable> primaryKeys, List<LogicalVariable> secondaryKeys, RecordDescriptor recordDesc,
- JobGenContext context, JobSpecification spec) throws AlgebricksException {
- String indexName = dataSourceIndex.getId();
- String datasetName = dataSourceIndex.getDataSource().getId().getDatasetName();
- AqlCompiledDatasetDecl compiledDatasetDecl = metadata.findDataset(datasetName);
- if (compiledDatasetDecl == null) {
- throw new AlgebricksException("Unknown dataset " + datasetName);
- }
- AqlCompiledIndexDecl cid = DatasetUtils.findSecondaryIndexByName(compiledDatasetDecl, indexName);
- if (cid.getKind() == IndexKind.BTREE)
- return getBTreeDmlRuntime(datasetName, indexName, propagatedSchema, primaryKeys, secondaryKeys, recordDesc,
- context, spec, IndexOp.DELETE);
- else
- return getRTreeDmlRuntime(datasetName, indexName, propagatedSchema, primaryKeys, secondaryKeys, recordDesc,
- context, spec, IndexOp.DELETE);
- }
+ @Override
+ public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getIndexDeleteRuntime(
+ IDataSourceIndex<String, AqlSourceId> dataSourceIndex,
+ IOperatorSchema propagatedSchema,
+ List<LogicalVariable> primaryKeys,
+ List<LogicalVariable> secondaryKeys, RecordDescriptor recordDesc,
+ JobGenContext context, JobSpecification spec)
+ throws AlgebricksException {
+ String indexName = dataSourceIndex.getId();
+ String datasetName = dataSourceIndex.getDataSource().getId()
+ .getDatasetName();
+ AqlCompiledDatasetDecl compiledDatasetDecl = metadata
+ .findDataset(datasetName);
+ if (compiledDatasetDecl == null) {
+ throw new AlgebricksException("Unknown dataset " + datasetName);
+ }
+ AqlCompiledIndexDecl cid = DatasetUtils.findSecondaryIndexByName(
+ compiledDatasetDecl, indexName);
+ if (cid.getKind() == IndexKind.BTREE)
+ return getBTreeDmlRuntime(datasetName, indexName, propagatedSchema,
+ primaryKeys, secondaryKeys, recordDesc, context, spec,
+ IndexOp.DELETE);
+ else
+ return getRTreeDmlRuntime(datasetName, indexName, propagatedSchema,
+ primaryKeys, secondaryKeys, recordDesc, context, spec,
+ IndexOp.DELETE);
+ }
- private Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getBTreeDmlRuntime(String datasetName,
- String indexName, IOperatorSchema propagatedSchema, List<LogicalVariable> primaryKeys,
- List<LogicalVariable> secondaryKeys, RecordDescriptor recordDesc, JobGenContext context,
- JobSpecification spec, IndexOp indexOp) throws AlgebricksException {
- int numKeys = primaryKeys.size() + secondaryKeys.size();
- // generate field permutations
- int[] fieldPermutation = new int[numKeys];
- int i = 0;
- for (LogicalVariable varKey : secondaryKeys) {
- int idx = propagatedSchema.findVariable(varKey);
- fieldPermutation[i] = idx;
- i++;
- }
- for (LogicalVariable varKey : primaryKeys) {
- int idx = propagatedSchema.findVariable(varKey);
- fieldPermutation[i] = idx;
- i++;
- }
+ private Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getBTreeDmlRuntime(
+ String datasetName, String indexName,
+ IOperatorSchema propagatedSchema,
+ List<LogicalVariable> primaryKeys,
+ List<LogicalVariable> secondaryKeys, RecordDescriptor recordDesc,
+ JobGenContext context, JobSpecification spec, IndexOp indexOp)
+ throws AlgebricksException {
+ int numKeys = primaryKeys.size() + secondaryKeys.size();
+ // generate field permutations
+ int[] fieldPermutation = new int[numKeys];
+ int i = 0;
+ for (LogicalVariable varKey : secondaryKeys) {
+ int idx = propagatedSchema.findVariable(varKey);
+ fieldPermutation[i] = idx;
+ i++;
+ }
+ for (LogicalVariable varKey : primaryKeys) {
+ int idx = propagatedSchema.findVariable(varKey);
+ fieldPermutation[i] = idx;
+ i++;
+ }
- // dataset
- AqlCompiledDatasetDecl compiledDatasetDecl = metadata.findDataset(datasetName);
- if (compiledDatasetDecl == null) {
- throw new AlgebricksException("Unknown dataset " + datasetName);
- }
- String itemTypeName = compiledDatasetDecl.getItemTypeName();
- IAType itemType;
- try {
- itemType = metadata.findType(itemTypeName);
- } catch (Exception e) {
- throw new AlgebricksException(e);
- }
- if (itemType.getTypeTag() != ATypeTag.RECORD) {
- throw new AlgebricksException("Only record types can be indexed.");
- }
- ARecordType recType = (ARecordType) itemType;
+ // dataset
+ AqlCompiledDatasetDecl compiledDatasetDecl = metadata
+ .findDataset(datasetName);
+ if (compiledDatasetDecl == null) {
+ throw new AlgebricksException("Unknown dataset " + datasetName);
+ }
+ String itemTypeName = compiledDatasetDecl.getItemTypeName();
+ IAType itemType;
+ try {
+ itemType = metadata.findType(itemTypeName);
+ } catch (Exception e) {
+ throw new AlgebricksException(e);
+ }
+ if (itemType.getTypeTag() != ATypeTag.RECORD) {
+ throw new AlgebricksException("Only record types can be indexed.");
+ }
+ ARecordType recType = (ARecordType) itemType;
- // index parameters
- AqlCompiledIndexDecl cid = DatasetUtils.findSecondaryIndexByName(compiledDatasetDecl, indexName);
- List<String> secondaryKeyExprs = cid.getFieldExprs();
- ITypeTraits[] typeTraits = new ITypeTraits[numKeys];
- IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[numKeys];
- for (i = 0; i < secondaryKeys.size(); ++i) {
- IAType keyType = AqlCompiledIndexDecl.keyFieldType(secondaryKeyExprs.get(i).toString(), recType);
- comparatorFactories[i] = AqlBinaryComparatorFactoryProvider.INSTANCE.getBinaryComparatorFactory(keyType,
- OrderKind.ASC);
- typeTraits[i] = AqlTypeTraitProvider.INSTANCE.getTypeTrait(keyType);
- }
- for (Triple<IEvaluatorFactory, ScalarFunctionCallExpression, IAType> evalFactoryAndType : DatasetUtils
- .getPartitioningFunctions(compiledDatasetDecl)) {
- IAType keyType = evalFactoryAndType.third;
- comparatorFactories[i] = AqlBinaryComparatorFactoryProvider.INSTANCE.getBinaryComparatorFactory(keyType,
- OrderKind.ASC);
- typeTraits[i] = AqlTypeTraitProvider.INSTANCE.getTypeTrait(keyType);
- ++i;
- }
+ // index parameters
+ AqlCompiledIndexDecl cid = DatasetUtils.findSecondaryIndexByName(
+ compiledDatasetDecl, indexName);
+ List<String> secondaryKeyExprs = cid.getFieldExprs();
+ ITypeTraits[] typeTraits = new ITypeTraits[numKeys];
+ IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[numKeys];
+ for (i = 0; i < secondaryKeys.size(); ++i) {
+ IAType keyType = AqlCompiledIndexDecl.keyFieldType(
+ secondaryKeyExprs.get(i).toString(), recType);
+ comparatorFactories[i] = AqlBinaryComparatorFactoryProvider.INSTANCE
+ .getBinaryComparatorFactory(keyType, OrderKind.ASC);
+ typeTraits[i] = AqlTypeTraitProvider.INSTANCE.getTypeTrait(keyType);
+ }
+ for (Triple<IEvaluatorFactory, ScalarFunctionCallExpression, IAType> evalFactoryAndType : DatasetUtils
+ .getPartitioningFunctions(compiledDatasetDecl)) {
+ IAType keyType = evalFactoryAndType.third;
+ comparatorFactories[i] = AqlBinaryComparatorFactoryProvider.INSTANCE
+ .getBinaryComparatorFactory(keyType, OrderKind.ASC);
+ typeTraits[i] = AqlTypeTraitProvider.INSTANCE.getTypeTrait(keyType);
+ ++i;
+ }
- ITreeIndexFrameFactory interiorFrameFactory = createBTreeNSMInteriorFrameFactory(typeTraits);
- ITreeIndexFrameFactory leafFrameFactory = createBTreeNSMLeafFrameFactory(typeTraits);
- IAsterixApplicationContextInfo appContext = (IAsterixApplicationContextInfo) context.getAppContext();
- Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint;
- try {
- splitsAndConstraint = metadata.splitProviderAndPartitionConstraintsForInternalOrFeedDataset(datasetName,
- indexName);
- } catch (Exception e) {
- throw new AlgebricksException(e);
- }
- TreeIndexInsertUpdateDeleteOperatorDescriptor btreeBulkLoad = new TreeIndexInsertUpdateDeleteOperatorDescriptor(
- spec, recordDesc, appContext.getStorageManagerInterface(), appContext.getTreeRegisterProvider(),
- splitsAndConstraint.first, interiorFrameFactory, leafFrameFactory, typeTraits, comparatorFactories,
- new BTreeDataflowHelperFactory(), fieldPermutation, indexOp, txnId);
- return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(btreeBulkLoad, splitsAndConstraint.second);
- }
+ ITreeIndexFrameFactory interiorFrameFactory = createBTreeNSMInteriorFrameFactory(typeTraits);
+ ITreeIndexFrameFactory leafFrameFactory = createBTreeNSMLeafFrameFactory(typeTraits);
+ IAsterixApplicationContextInfo appContext = (IAsterixApplicationContextInfo) context
+ .getAppContext();
+ Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint;
+ try {
+ splitsAndConstraint = metadata
+ .splitProviderAndPartitionConstraintsForInternalOrFeedDataset(
+ datasetName, indexName);
+ } catch (Exception e) {
+ throw new AlgebricksException(e);
+ }
+ TreeIndexInsertUpdateDeleteOperatorDescriptor btreeBulkLoad = new TreeIndexInsertUpdateDeleteOperatorDescriptor(
+ spec, recordDesc, appContext.getStorageManagerInterface(),
+ appContext.getTreeRegisterProvider(),
+ splitsAndConstraint.first, interiorFrameFactory,
+ leafFrameFactory, typeTraits, comparatorFactories,
+ new BTreeDataflowHelperFactory(), fieldPermutation, indexOp,
+ txnId);
+ return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(
+ btreeBulkLoad, splitsAndConstraint.second);
+ }
- private Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getRTreeDmlRuntime(String datasetName,
- String indexName, IOperatorSchema propagatedSchema, List<LogicalVariable> primaryKeys,
- List<LogicalVariable> secondaryKeys, RecordDescriptor recordDesc, JobGenContext context,
- JobSpecification spec, IndexOp indexOp) throws AlgebricksException {
- AqlCompiledDatasetDecl compiledDatasetDecl = metadata.findDataset(datasetName);
- String itemTypeName = compiledDatasetDecl.getItemTypeName();
- IAType itemType;
- try {
- itemType = metadata.findType(itemTypeName);
- } catch (Exception e) {
- throw new AlgebricksException(e);
- }
- if (itemType.getTypeTag() != ATypeTag.RECORD) {
- throw new AlgebricksException("Only record types can be indexed.");
- }
- ARecordType recType = (ARecordType) itemType;
- AqlCompiledIndexDecl cid = DatasetUtils.findSecondaryIndexByName(compiledDatasetDecl, indexName);
- List<String> secondaryKeyExprs = cid.getFieldExprs();
- IAType spatialType = AqlCompiledIndexDecl.keyFieldType(secondaryKeyExprs.get(0).toString(), recType);
- int dimension = NonTaggedFormatUtil.getNumDimensions(spatialType.getTypeTag());
- int numSecondaryKeys = dimension * 2;
- int numPrimaryKeys = primaryKeys.size();
- int numKeys = numSecondaryKeys + numPrimaryKeys;
- ITypeTraits[] typeTraits = new ITypeTraits[numKeys];
- IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[numKeys];
- int[] fieldPermutation = new int[numKeys];
- int i = 0;
+ private Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getRTreeDmlRuntime(
+ String datasetName, String indexName,
+ IOperatorSchema propagatedSchema,
+ List<LogicalVariable> primaryKeys,
+ List<LogicalVariable> secondaryKeys, RecordDescriptor recordDesc,
+ JobGenContext context, JobSpecification spec, IndexOp indexOp)
+ throws AlgebricksException {
+ AqlCompiledDatasetDecl compiledDatasetDecl = metadata
+ .findDataset(datasetName);
+ String itemTypeName = compiledDatasetDecl.getItemTypeName();
+ IAType itemType;
+ try {
+ itemType = metadata.findType(itemTypeName);
+ } catch (Exception e) {
+ throw new AlgebricksException(e);
+ }
+ if (itemType.getTypeTag() != ATypeTag.RECORD) {
+ throw new AlgebricksException("Only record types can be indexed.");
+ }
+ ARecordType recType = (ARecordType) itemType;
+ AqlCompiledIndexDecl cid = DatasetUtils.findSecondaryIndexByName(
+ compiledDatasetDecl, indexName);
+ List<String> secondaryKeyExprs = cid.getFieldExprs();
+ IAType spatialType = AqlCompiledIndexDecl.keyFieldType(
+ secondaryKeyExprs.get(0).toString(), recType);
+ int dimension = NonTaggedFormatUtil.getNumDimensions(spatialType
+ .getTypeTag());
+ int numSecondaryKeys = dimension * 2;
+ int numPrimaryKeys = primaryKeys.size();
+ int numKeys = numSecondaryKeys + numPrimaryKeys;
+ ITypeTraits[] typeTraits = new ITypeTraits[numKeys];
+ IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[numKeys];
+ int[] fieldPermutation = new int[numKeys];
+ int i = 0;
- for (LogicalVariable varKey : secondaryKeys) {
- int idx = propagatedSchema.findVariable(varKey);
- fieldPermutation[i] = idx;
- i++;
- }
- for (LogicalVariable varKey : primaryKeys) {
- int idx = propagatedSchema.findVariable(varKey);
- fieldPermutation[i] = idx;
- i++;
- }
- IAType nestedKeyType = NonTaggedFormatUtil.getNestedSpatialType(spatialType.getTypeTag());
- IPrimitiveValueProviderFactory[] valueProviderFactories = new IPrimitiveValueProviderFactory[numSecondaryKeys];
- for (i = 0; i < numSecondaryKeys; i++) {
- ISerializerDeserializer keySerde = AqlSerializerDeserializerProvider.INSTANCE
- .getSerializerDeserializer(nestedKeyType);
- comparatorFactories[i] = AqlBinaryComparatorFactoryProvider.INSTANCE.getBinaryComparatorFactory(
- nestedKeyType, OrderKind.ASC);
- typeTraits[i] = AqlTypeTraitProvider.INSTANCE.getTypeTrait(nestedKeyType);
- valueProviderFactories[i] = AqlPrimitiveValueProviderFactory.INSTANCE;
- }
- for (Triple<IEvaluatorFactory, ScalarFunctionCallExpression, IAType> evalFactoryAndType : DatasetUtils
- .getPartitioningFunctions(compiledDatasetDecl)) {
- IAType keyType = evalFactoryAndType.third;
- comparatorFactories[i] = AqlBinaryComparatorFactoryProvider.INSTANCE.getBinaryComparatorFactory(keyType,
- OrderKind.ASC);
- typeTraits[i] = AqlTypeTraitProvider.INSTANCE.getTypeTrait(keyType);
- ++i;
- }
+ for (LogicalVariable varKey : secondaryKeys) {
+ int idx = propagatedSchema.findVariable(varKey);
+ fieldPermutation[i] = idx;
+ i++;
+ }
+ for (LogicalVariable varKey : primaryKeys) {
+ int idx = propagatedSchema.findVariable(varKey);
+ fieldPermutation[i] = idx;
+ i++;
+ }
+ IAType nestedKeyType = NonTaggedFormatUtil
+ .getNestedSpatialType(spatialType.getTypeTag());
+ IPrimitiveValueProviderFactory[] valueProviderFactories = new IPrimitiveValueProviderFactory[numSecondaryKeys];
+ for (i = 0; i < numSecondaryKeys; i++) {
+ ISerializerDeserializer keySerde = AqlSerializerDeserializerProvider.INSTANCE
+ .getSerializerDeserializer(nestedKeyType);
+ comparatorFactories[i] = AqlBinaryComparatorFactoryProvider.INSTANCE
+ .getBinaryComparatorFactory(nestedKeyType, OrderKind.ASC);
+ typeTraits[i] = AqlTypeTraitProvider.INSTANCE
+ .getTypeTrait(nestedKeyType);
+ valueProviderFactories[i] = AqlPrimitiveValueProviderFactory.INSTANCE;
+ }
+ for (Triple<IEvaluatorFactory, ScalarFunctionCallExpression, IAType> evalFactoryAndType : DatasetUtils
+ .getPartitioningFunctions(compiledDatasetDecl)) {
+ IAType keyType = evalFactoryAndType.third;
+ comparatorFactories[i] = AqlBinaryComparatorFactoryProvider.INSTANCE
+ .getBinaryComparatorFactory(keyType, OrderKind.ASC);
+ typeTraits[i] = AqlTypeTraitProvider.INSTANCE.getTypeTrait(keyType);
+ ++i;
+ }
- ITreeIndexFrameFactory interiorFrameFactory = new RTreeNSMInteriorFrameFactory(
- new RTreeTypeAwareTupleWriterFactory(typeTraits), valueProviderFactories);
- ITreeIndexFrameFactory leafFrameFactory = new RTreeNSMLeafFrameFactory(new RTreeTypeAwareTupleWriterFactory(
- typeTraits), valueProviderFactories);
+ ITreeIndexFrameFactory interiorFrameFactory = new RTreeNSMInteriorFrameFactory(
+ new RTreeTypeAwareTupleWriterFactory(typeTraits),
+ valueProviderFactories);
+ ITreeIndexFrameFactory leafFrameFactory = new RTreeNSMLeafFrameFactory(
+ new RTreeTypeAwareTupleWriterFactory(typeTraits),
+ valueProviderFactories);
- /*
- ITreeIndexFrameFactory interiorFrameFactory = JobGenHelper.createRTreeNSMInteriorFrameFactory(typeTraits,
- numSecondaryKeys);
- ITreeIndexFrameFactory leafFrameFactory = JobGenHelper.createRTreeNSMLeafFrameFactory(typeTraits,
- numSecondaryKeys);
- */
+ /*
+ * ITreeIndexFrameFactory interiorFrameFactory =
+ * JobGenHelper.createRTreeNSMInteriorFrameFactory(typeTraits,
+ * numSecondaryKeys); ITreeIndexFrameFactory leafFrameFactory =
+ * JobGenHelper.createRTreeNSMLeafFrameFactory(typeTraits,
+ * numSecondaryKeys);
+ */
- IAsterixApplicationContextInfo appContext = (IAsterixApplicationContextInfo) context.getAppContext();
- Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint;
- try {
- splitsAndConstraint = metadata.splitProviderAndPartitionConstraintsForInternalOrFeedDataset(datasetName,
- indexName);
- } catch (Exception e) {
- throw new AlgebricksException(e);
- }
- TreeIndexInsertUpdateDeleteOperatorDescriptor rtreeUpdate = new TreeIndexInsertUpdateDeleteOperatorDescriptor(
- spec, recordDesc, appContext.getStorageManagerInterface(), appContext.getTreeRegisterProvider(),
- splitsAndConstraint.first, interiorFrameFactory, leafFrameFactory, typeTraits, comparatorFactories,
- new RTreeDataflowHelperFactory(), fieldPermutation, indexOp, txnId);
- return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(rtreeUpdate, splitsAndConstraint.second);
- }
+ IAsterixApplicationContextInfo appContext = (IAsterixApplicationContextInfo) context
+ .getAppContext();
+ Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint;
+ try {
+ splitsAndConstraint = metadata
+ .splitProviderAndPartitionConstraintsForInternalOrFeedDataset(
+ datasetName, indexName);
+ } catch (Exception e) {
+ throw new AlgebricksException(e);
+ }
+ TreeIndexInsertUpdateDeleteOperatorDescriptor rtreeUpdate = new TreeIndexInsertUpdateDeleteOperatorDescriptor(
+ spec, recordDesc, appContext.getStorageManagerInterface(),
+ appContext.getTreeRegisterProvider(),
+ splitsAndConstraint.first, interiorFrameFactory,
+ leafFrameFactory, typeTraits, comparatorFactories,
+ new RTreeDataflowHelperFactory(), fieldPermutation, indexOp,
+ txnId);
+ return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(
+ rtreeUpdate, splitsAndConstraint.second);
+ }
- public long getTxnId() {
- return txnId;
- }
+ public long getTxnId() {
+ return txnId;
+ }
- public static ITreeIndexFrameFactory createBTreeNSMInteriorFrameFactory(ITypeTraits[] typeTraits) {
- return new BTreeNSMInteriorFrameFactory(new TypeAwareTupleWriterFactory(typeTraits));
- }
+ public static ITreeIndexFrameFactory createBTreeNSMInteriorFrameFactory(
+ ITypeTraits[] typeTraits) {
+ return new BTreeNSMInteriorFrameFactory(
+ new TypeAwareTupleWriterFactory(typeTraits));
+ }
- public static ITreeIndexFrameFactory createBTreeNSMLeafFrameFactory(ITypeTraits[] typeTraits) {
- return new BTreeNSMLeafFrameFactory(new TypeAwareTupleWriterFactory(typeTraits));
- }
+ public static ITreeIndexFrameFactory createBTreeNSMLeafFrameFactory(
+ ITypeTraits[] typeTraits) {
+ return new BTreeNSMLeafFrameFactory(new TypeAwareTupleWriterFactory(
+ typeTraits));
+ }
- @Override
- public IFunctionInfo lookupFunction(FunctionIdentifier fid) {
- return AsterixBuiltinFunctions.lookupFunction(fid);
- }
+ @Override
+ public IFunctionInfo lookupFunction(FunctionIdentifier fid) {
+ return AsterixBuiltinFunctions.lookupFunction(fid);
+ }
}