Refactored External Data
This change re-arrange asterix module's order. asterix-
external-data is moved in front of asterix-metadata.
Change-Id: I46b60b5e1cc37fd59adc0dd89f374d96502091b2
Reviewed-on: https://asterix-gerrit.ics.uci.edu/559
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: abdullah alamoudi <bamousaa@gmail.com>
diff --git a/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/ExternalDataLookupPOperator.java b/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/ExternalDataLookupPOperator.java
index ac8e775..9b85674 100644
--- a/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/ExternalDataLookupPOperator.java
+++ b/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/ExternalDataLookupPOperator.java
@@ -21,12 +21,11 @@
import java.util.ArrayList;
import java.util.List;
-import org.apache.asterix.external.indexing.dataflow.HDFSLookupAdapterFactory;
import org.apache.asterix.metadata.declared.AqlDataSource;
+import org.apache.asterix.metadata.declared.AqlDataSource.AqlDataSourceType;
import org.apache.asterix.metadata.declared.AqlMetadataProvider;
import org.apache.asterix.metadata.declared.AqlSourceId;
import org.apache.asterix.metadata.declared.DatasetDataSource;
-import org.apache.asterix.metadata.declared.AqlDataSource.AqlDataSourceType;
import org.apache.asterix.metadata.entities.Dataset;
import org.apache.asterix.metadata.entities.Index;
import org.apache.asterix.om.functions.AsterixBuiltinFunctions;
@@ -70,7 +69,8 @@
private boolean retainNull;
public ExternalDataLookupPOperator(AqlSourceId datasetId, Dataset dataset, ARecordType recordType,
- Index secondaryIndex, List<LogicalVariable> ridVarList, boolean requiresBroadcast, boolean retainInput, boolean retainNull) {
+ Index secondaryIndex, List<LogicalVariable> ridVarList, boolean requiresBroadcast, boolean retainInput,
+ boolean retainNull) {
this.datasetId = datasetId;
this.dataset = dataset;
this.recordType = recordType;
@@ -123,7 +123,7 @@
@Override
public void contributeRuntimeOperator(IHyracksJobBuilder builder, JobGenContext context, ILogicalOperator op,
IOperatorSchema opSchema, IOperatorSchema[] inputSchemas, IOperatorSchema outerPlanSchema)
- throws AlgebricksException {
+ throws AlgebricksException {
ExternalDataLookupOperator edabro = (ExternalDataLookupOperator) op;
ILogicalExpression expr = edabro.getExpressionRef().getValue();
if (expr.getExpressionTag() != LogicalExpressionTag.FUNCTION_CALL) {
@@ -144,7 +144,7 @@
}
AqlMetadataProvider metadataProvider = (AqlMetadataProvider) context.getMetadataProvider();
- Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> externalLoopup = HDFSLookupAdapterFactory
+ Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> externalLoopup = AqlMetadataProvider
.buildExternalDataLookupRuntime(builder.getJobSpec(), dataset, secondaryIndex, ridIndexes, retainInput,
typeEnv, outputVars, opSchema, context, metadataProvider, retainNull);
builder.contributeHyracksOperator(edabro, externalLoopup.first);
@@ -164,6 +164,7 @@
return keyIndexes;
}
+ @Override
public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op,
IPhysicalPropertiesVector reqdByParent) {
if (requiresBroadcast) {
diff --git a/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/am/AccessMethodUtils.java b/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/am/AccessMethodUtils.java
index d2c36cc..1ff3df6 100644
--- a/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/am/AccessMethodUtils.java
+++ b/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/am/AccessMethodUtils.java
@@ -29,11 +29,12 @@
import org.apache.asterix.common.config.DatasetConfig.DatasetType;
import org.apache.asterix.common.config.DatasetConfig.IndexType;
import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.external.indexing.IndexingConstants;
import org.apache.asterix.lang.common.util.FunctionUtil;
import org.apache.asterix.metadata.declared.AqlSourceId;
import org.apache.asterix.metadata.entities.Dataset;
+import org.apache.asterix.metadata.entities.ExternalDatasetDetails;
import org.apache.asterix.metadata.entities.Index;
-import org.apache.asterix.metadata.external.IndexingConstants;
import org.apache.asterix.metadata.utils.DatasetUtils;
import org.apache.asterix.om.base.ABoolean;
import org.apache.asterix.om.base.AInt32;
@@ -278,7 +279,8 @@
throws AlgebricksException {
int numPrimaryKeys = 0;
if (dataset.getDatasetType() == DatasetType.EXTERNAL) {
- numPrimaryKeys = IndexingConstants.getRIDSize(dataset);
+ numPrimaryKeys = IndexingConstants
+ .getRIDSize(((ExternalDatasetDetails) dataset.getDatasetDetails()).getProperties());
} else {
numPrimaryKeys = DatasetUtils.getPartitioningKeys(dataset).size();
}
@@ -293,7 +295,8 @@
ILogicalOperator unnestMapOp) {
int numPrimaryKeys;
if (dataset.getDatasetType() == DatasetType.EXTERNAL) {
- numPrimaryKeys = IndexingConstants.getRIDSize(dataset);
+ numPrimaryKeys = IndexingConstants
+ .getRIDSize(((ExternalDatasetDetails) dataset.getDatasetDetails()).getProperties());
} else {
numPrimaryKeys = DatasetUtils.getPartitioningKeys(dataset).size();
}
@@ -567,7 +570,8 @@
}
private static void appendExternalRecPrimaryKeys(Dataset dataset, List<Object> target) throws AsterixException {
- int numPrimaryKeys = IndexingConstants.getRIDSize(dataset);
+ int numPrimaryKeys = IndexingConstants
+ .getRIDSize(((ExternalDatasetDetails) dataset.getDatasetDetails()).getProperties());
for (int i = 0; i < numPrimaryKeys; i++) {
target.add(IndexingConstants.getFieldType(i));
}
diff --git a/asterix-app/src/main/java/org/apache/asterix/aql/translator/QueryTranslator.java b/asterix-app/src/main/java/org/apache/asterix/aql/translator/QueryTranslator.java
index 51e1612..714e05c 100644
--- a/asterix-app/src/main/java/org/apache/asterix/aql/translator/QueryTranslator.java
+++ b/asterix-app/src/main/java/org/apache/asterix/aql/translator/QueryTranslator.java
@@ -63,6 +63,8 @@
import org.apache.asterix.common.feeds.api.IFeedLifecycleListener.ConnectionLocation;
import org.apache.asterix.common.functions.FunctionSignature;
import org.apache.asterix.compiler.provider.ILangCompilationProvider;
+import org.apache.asterix.external.adapter.factory.IFeedAdapterFactory;
+import org.apache.asterix.external.indexing.ExternalFile;
import org.apache.asterix.feeds.CentralFeedManager;
import org.apache.asterix.feeds.FeedJoint;
import org.apache.asterix.feeds.FeedLifecycleListener;
@@ -127,7 +129,6 @@
import org.apache.asterix.metadata.entities.Datatype;
import org.apache.asterix.metadata.entities.Dataverse;
import org.apache.asterix.metadata.entities.ExternalDatasetDetails;
-import org.apache.asterix.metadata.entities.ExternalFile;
import org.apache.asterix.metadata.entities.Feed;
import org.apache.asterix.metadata.entities.Feed.FeedType;
import org.apache.asterix.metadata.entities.FeedPolicy;
@@ -139,7 +140,6 @@
import org.apache.asterix.metadata.entities.SecondaryFeed;
import org.apache.asterix.metadata.feeds.FeedLifecycleEventSubscriber;
import org.apache.asterix.metadata.feeds.FeedUtil;
-import org.apache.asterix.metadata.feeds.IFeedAdapterFactory;
import org.apache.asterix.metadata.utils.DatasetUtils;
import org.apache.asterix.metadata.utils.ExternalDatasetsRegistry;
import org.apache.asterix.metadata.utils.MetadataLockManager;
diff --git a/asterix-app/src/main/java/org/apache/asterix/file/ExternalIndexingOperations.java b/asterix-app/src/main/java/org/apache/asterix/file/ExternalIndexingOperations.java
index 1b7bd1e..d3fade5 100644
--- a/asterix-app/src/main/java/org/apache/asterix/file/ExternalIndexingOperations.java
+++ b/asterix-app/src/main/java/org/apache/asterix/file/ExternalIndexingOperations.java
@@ -48,6 +48,9 @@
import org.apache.asterix.external.adapter.factory.HDFSAdapterFactory;
import org.apache.asterix.external.adapter.factory.HDFSIndexingAdapterFactory;
import org.apache.asterix.external.adapter.factory.HiveAdapterFactory;
+import org.apache.asterix.external.indexing.ExternalFile;
+import org.apache.asterix.external.indexing.FilesIndexDescription;
+import org.apache.asterix.external.indexing.IndexingConstants;
import org.apache.asterix.external.indexing.operators.ExternalDatasetIndexesAbortOperatorDescriptor;
import org.apache.asterix.external.indexing.operators.ExternalDatasetIndexesCommitOperatorDescriptor;
import org.apache.asterix.external.indexing.operators.ExternalDatasetIndexesRecoverOperatorDescriptor;
@@ -60,10 +63,7 @@
import org.apache.asterix.metadata.declared.AqlMetadataProvider;
import org.apache.asterix.metadata.entities.Dataset;
import org.apache.asterix.metadata.entities.ExternalDatasetDetails;
-import org.apache.asterix.metadata.entities.ExternalFile;
import org.apache.asterix.metadata.entities.Index;
-import org.apache.asterix.metadata.external.FilesIndexDescription;
-import org.apache.asterix.metadata.external.IndexingConstants;
import org.apache.asterix.metadata.feeds.ExternalDataScanOperatorDescriptor;
import org.apache.asterix.metadata.utils.DatasetUtils;
import org.apache.asterix.metadata.utils.ExternalDatasetsRegistry;
diff --git a/asterix-app/src/main/java/org/apache/asterix/file/FeedOperations.java b/asterix-app/src/main/java/org/apache/asterix/file/FeedOperations.java
index cd2f304..c7bb0e3 100644
--- a/asterix-app/src/main/java/org/apache/asterix/file/FeedOperations.java
+++ b/asterix-app/src/main/java/org/apache/asterix/file/FeedOperations.java
@@ -33,11 +33,11 @@
import org.apache.asterix.common.feeds.api.IFeedRuntime.FeedRuntimeType;
import org.apache.asterix.common.feeds.message.EndFeedMessage;
import org.apache.asterix.common.feeds.message.ThrottlingEnabledFeedMessage;
+import org.apache.asterix.external.adapter.factory.IFeedAdapterFactory;
import org.apache.asterix.feeds.FeedLifecycleListener;
import org.apache.asterix.metadata.declared.AqlMetadataProvider;
import org.apache.asterix.metadata.entities.PrimaryFeed;
import org.apache.asterix.metadata.feeds.FeedMessageOperatorDescriptor;
-import org.apache.asterix.metadata.feeds.IFeedAdapterFactory;
import org.apache.asterix.metadata.feeds.PrepareStallMessage;
import org.apache.asterix.metadata.feeds.TerminateDataFlowMessage;
import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
diff --git a/asterix-app/src/main/java/org/apache/asterix/file/IndexOperations.java b/asterix-app/src/main/java/org/apache/asterix/file/IndexOperations.java
index c5870a6..4cb8a0f 100644
--- a/asterix-app/src/main/java/org/apache/asterix/file/IndexOperations.java
+++ b/asterix-app/src/main/java/org/apache/asterix/file/IndexOperations.java
@@ -26,10 +26,10 @@
import org.apache.asterix.common.context.AsterixVirtualBufferCacheProvider;
import org.apache.asterix.common.exceptions.AsterixException;
import org.apache.asterix.common.ioopcallbacks.LSMBTreeIOOperationCallbackFactory;
+import org.apache.asterix.external.indexing.ExternalFile;
import org.apache.asterix.metadata.MetadataException;
import org.apache.asterix.metadata.declared.AqlMetadataProvider;
import org.apache.asterix.metadata.entities.Dataset;
-import org.apache.asterix.metadata.entities.ExternalFile;
import org.apache.asterix.metadata.utils.DatasetUtils;
import org.apache.asterix.om.types.ARecordType;
import org.apache.asterix.om.util.AsterixAppContextInfo;
diff --git a/asterix-app/src/main/java/org/apache/asterix/file/SecondaryBTreeOperationsHelper.java b/asterix-app/src/main/java/org/apache/asterix/file/SecondaryBTreeOperationsHelper.java
index 7c14e5d..a579d2c 100644
--- a/asterix-app/src/main/java/org/apache/asterix/file/SecondaryBTreeOperationsHelper.java
+++ b/asterix-app/src/main/java/org/apache/asterix/file/SecondaryBTreeOperationsHelper.java
@@ -30,9 +30,9 @@
import org.apache.asterix.common.exceptions.AsterixException;
import org.apache.asterix.common.ioopcallbacks.LSMBTreeIOOperationCallbackFactory;
import org.apache.asterix.common.ioopcallbacks.LSMBTreeWithBuddyIOOperationCallbackFactory;
+import org.apache.asterix.external.indexing.IndexingConstants;
import org.apache.asterix.metadata.declared.AqlMetadataProvider;
import org.apache.asterix.metadata.entities.Index;
-import org.apache.asterix.metadata.external.IndexingConstants;
import org.apache.asterix.metadata.feeds.ExternalDataScanOperatorDescriptor;
import org.apache.asterix.metadata.utils.ExternalDatasetsRegistry;
import org.apache.asterix.om.types.IAType;
diff --git a/asterix-app/src/main/java/org/apache/asterix/file/SecondaryIndexOperationsHelper.java b/asterix-app/src/main/java/org/apache/asterix/file/SecondaryIndexOperationsHelper.java
index 0cc137b..4eb6944 100644
--- a/asterix-app/src/main/java/org/apache/asterix/file/SecondaryIndexOperationsHelper.java
+++ b/asterix-app/src/main/java/org/apache/asterix/file/SecondaryIndexOperationsHelper.java
@@ -36,6 +36,8 @@
import org.apache.asterix.common.ioopcallbacks.LSMBTreeIOOperationCallbackFactory;
import org.apache.asterix.common.transactions.IRecoveryManager.ResourceType;
import org.apache.asterix.common.transactions.JobId;
+import org.apache.asterix.external.indexing.ExternalFile;
+import org.apache.asterix.external.indexing.IndexingConstants;
import org.apache.asterix.external.indexing.operators.ExternalIndexBulkModifyOperatorDescriptor;
import org.apache.asterix.formats.nontagged.AqlBinaryBooleanInspectorImpl;
import org.apache.asterix.formats.nontagged.AqlBinaryComparatorFactoryProvider;
@@ -44,8 +46,6 @@
import org.apache.asterix.metadata.MetadataException;
import org.apache.asterix.metadata.declared.AqlMetadataProvider;
import org.apache.asterix.metadata.entities.Dataset;
-import org.apache.asterix.metadata.entities.ExternalFile;
-import org.apache.asterix.metadata.external.IndexingConstants;
import org.apache.asterix.metadata.feeds.ExternalDataScanOperatorDescriptor;
import org.apache.asterix.metadata.utils.DatasetUtils;
import org.apache.asterix.om.types.ARecordType;
diff --git a/asterix-app/src/main/java/org/apache/asterix/file/SecondaryRTreeOperationsHelper.java b/asterix-app/src/main/java/org/apache/asterix/file/SecondaryRTreeOperationsHelper.java
index 6777d82..4d887dc 100644
--- a/asterix-app/src/main/java/org/apache/asterix/file/SecondaryRTreeOperationsHelper.java
+++ b/asterix-app/src/main/java/org/apache/asterix/file/SecondaryRTreeOperationsHelper.java
@@ -30,12 +30,12 @@
import org.apache.asterix.common.exceptions.AsterixException;
import org.apache.asterix.common.ioopcallbacks.LSMRTreeIOOperationCallbackFactory;
import org.apache.asterix.dataflow.data.nontagged.valueproviders.AqlPrimitiveValueProviderFactory;
+import org.apache.asterix.external.indexing.IndexingConstants;
import org.apache.asterix.formats.nontagged.AqlBinaryComparatorFactoryProvider;
import org.apache.asterix.formats.nontagged.AqlSerializerDeserializerProvider;
import org.apache.asterix.formats.nontagged.AqlTypeTraitProvider;
import org.apache.asterix.metadata.declared.AqlMetadataProvider;
import org.apache.asterix.metadata.entities.Index;
-import org.apache.asterix.metadata.external.IndexingConstants;
import org.apache.asterix.metadata.feeds.ExternalDataScanOperatorDescriptor;
import org.apache.asterix.metadata.utils.ExternalDatasetsRegistry;
import org.apache.asterix.om.types.ATypeTag;
diff --git a/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/AsterixGlobalRecoveryManager.java b/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/AsterixGlobalRecoveryManager.java
index 7cb2fd7..c6e04df 100644
--- a/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/AsterixGlobalRecoveryManager.java
+++ b/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/AsterixGlobalRecoveryManager.java
@@ -29,6 +29,7 @@
import org.apache.asterix.common.config.DatasetConfig.DatasetType;
import org.apache.asterix.common.config.DatasetConfig.ExternalDatasetTransactionState;
import org.apache.asterix.common.config.DatasetConfig.ExternalFilePendingOp;
+import org.apache.asterix.external.indexing.ExternalFile;
import org.apache.asterix.feeds.CentralFeedManager;
import org.apache.asterix.file.ExternalIndexingOperations;
import org.apache.asterix.metadata.MetadataManager;
@@ -38,7 +39,6 @@
import org.apache.asterix.metadata.entities.Dataset;
import org.apache.asterix.metadata.entities.Dataverse;
import org.apache.asterix.metadata.entities.ExternalDatasetDetails;
-import org.apache.asterix.metadata.entities.ExternalFile;
import org.apache.asterix.metadata.entities.Index;
import org.apache.asterix.om.util.AsterixClusterProperties;
import org.apache.asterix.common.api.IClusterManagementWork.ClusterState;
diff --git a/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/ExternalLibraryBootstrap.java b/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/ExternalLibraryBootstrap.java
index 51d857a..267be3d 100755
--- a/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/ExternalLibraryBootstrap.java
+++ b/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/ExternalLibraryBootstrap.java
@@ -37,6 +37,7 @@
import org.apache.asterix.common.exceptions.AsterixException;
import org.apache.asterix.common.functions.FunctionSignature;
import org.apache.asterix.external.library.ExternalLibrary;
+import org.apache.asterix.external.library.ExternalLibraryManager;
import org.apache.asterix.external.library.LibraryAdapter;
import org.apache.asterix.external.library.LibraryFunction;
import org.apache.asterix.metadata.MetadataManager;
@@ -46,7 +47,6 @@
import org.apache.asterix.metadata.entities.DatasourceAdapter.AdapterType;
import org.apache.asterix.metadata.entities.Dataverse;
import org.apache.asterix.metadata.feeds.AdapterIdentifier;
-import org.apache.asterix.metadata.functions.ExternalLibraryManager;
import org.apache.asterix.runtime.formats.NonTaggedDataFormat;
public class ExternalLibraryBootstrap {
diff --git a/asterix-external-data/pom.xml b/asterix-external-data/pom.xml
index 86e466e..4062b23 100644
--- a/asterix-external-data/pom.xml
+++ b/asterix-external-data/pom.xml
@@ -17,184 +17,174 @@
! under the License.
!-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
- <modelVersion>4.0.0</modelVersion>
- <parent>
- <artifactId>asterix</artifactId>
- <groupId>org.apache.asterix</groupId>
- <version>0.8.8-SNAPSHOT</version>
- </parent>
-
- <licenses>
- <license>
- <name>Apache License, Version 2.0</name>
- <url>http://www.apache.org/licenses/LICENSE-2.0.txt</url>
- <distribution>repo</distribution>
- <comments>A business-friendly OSS license</comments>
- </license>
- </licenses>
-
- <artifactId>asterix-external-data</artifactId>
-
- <build>
- <plugins>
- <plugin>
- <groupId>org.jvnet.jaxb2.maven2</groupId>
- <artifactId>maven-jaxb2-plugin</artifactId>
- <version>0.9.0</version>
- <executions>
- <execution>
- <id>configuration</id>
- <goals>
- <goal>generate</goal>
- </goals>
- <configuration>
- <schemaDirectory>src/main/resources/schema</schemaDirectory>
- <schemaIncludes>
- <include>library.xsd</include>
- </schemaIncludes>
- <generatePackage>org.apache.asterix.external.library</generatePackage>
- <generateDirectory>${project.build.directory}/generated-sources/configuration</generateDirectory>
- </configuration>
- </execution>
- </executions>
- </plugin>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-jar-plugin</artifactId>
- <version>2.4</version>
- <configuration>
- <includes>
- <include>**/*.class</include>
- <include>**/*.txt</include>
- </includes>
- </configuration>
- <executions>
- <execution>
- <goals>
- <goal>test-jar</goal>
- </goals>
- <phase>package</phase>
- </execution>
- </executions>
- </plugin>
- <plugin>
- <artifactId>maven-assembly-plugin</artifactId>
- <version>2.2-beta-5</version>
- <executions>
- <execution>
- <configuration>
- <descriptor>src/main/assembly/binary-assembly-libzip.xml</descriptor>
- <finalName>testlib-zip</finalName>
- </configuration>
- <phase>package</phase>
- <goals>
- <goal>attached</goal>
- </goals>
- </execution>
- </executions>
- </plugin>
- </plugins>
- </build>
- <dependencies>
- <dependency>
- <groupId>javax.servlet</groupId>
- <artifactId>servlet-api</artifactId>
- <type>jar</type>
- </dependency>
- <dependency>
- <groupId>org.apache.asterix</groupId>
- <artifactId>asterix-om</artifactId>
- <version>0.8.8-SNAPSHOT</version>
- <type>jar</type>
- <scope>compile</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.asterix</groupId>
- <artifactId>asterix-runtime</artifactId>
- <version>0.8.8-SNAPSHOT</version>
- <type>jar</type>
- <scope>compile</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.asterix</groupId>
- <artifactId>asterix-metadata</artifactId>
- <version>0.8.8-SNAPSHOT</version>
- <type>jar</type>
- <scope>compile</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.hyracks</groupId>
- <artifactId>algebricks-compiler</artifactId>
- </dependency>
- <dependency>
- <groupId>com.kenai.nbpwr</groupId>
- <artifactId>org-apache-commons-io</artifactId>
- <version>1.3.1-201002241208</version>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>org.twitter4j</groupId>
- <artifactId>twitter4j-core</artifactId>
- <version>[4.0,)</version>
- </dependency>
- <dependency>
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <artifactId>asterix</artifactId>
+ <groupId>org.apache.asterix</groupId>
+ <version>0.8.8-SNAPSHOT</version>
+ </parent>
+ <licenses>
+ <license>
+ <name>Apache License, Version 2.0</name>
+ <url>http://www.apache.org/licenses/LICENSE-2.0.txt</url>
+ <distribution>repo</distribution>
+ <comments>A business-friendly OSS license</comments>
+ </license>
+ </licenses>
+ <artifactId>asterix-external-data</artifactId>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.jvnet.jaxb2.maven2</groupId>
+ <artifactId>maven-jaxb2-plugin</artifactId>
+ <version>0.9.0</version>
+ <executions>
+ <execution>
+ <id>configuration</id>
+ <goals>
+ <goal>generate</goal>
+ </goals>
+ <configuration>
+ <schemaDirectory>src/main/resources/schema</schemaDirectory>
+ <schemaIncludes>
+ <include>library.xsd</include>
+ </schemaIncludes>
+ <generatePackage>org.apache.asterix.external.library</generatePackage>
+ <generateDirectory>${project.build.directory}/generated-sources/configuration</generateDirectory>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-jar-plugin</artifactId>
+ <version>2.4</version>
+ <configuration>
+ <includes>
+ <include>**/*.class</include>
+ <include>**/*.txt</include>
+ </includes>
+ </configuration>
+ <executions>
+ <execution>
+ <goals>
+ <goal>test-jar</goal>
+ </goals>
+ <phase>package</phase>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <artifactId>maven-assembly-plugin</artifactId>
+ <version>2.2-beta-5</version>
+ <executions>
+ <execution>
+ <configuration>
+ <descriptor>src/main/assembly/binary-assembly-libzip.xml</descriptor>
+ <finalName>testlib-zip</finalName>
+ </configuration>
+ <phase>package</phase>
+ <goals>
+ <goal>attached</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+ <dependencies>
+ <dependency>
+ <groupId>javax.servlet</groupId>
+ <artifactId>servlet-api</artifactId>
+ <type>jar</type>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.asterix</groupId>
+ <artifactId>asterix-om</artifactId>
+ <version>0.8.8-SNAPSHOT</version>
+ <type>jar</type>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.asterix</groupId>
+ <artifactId>asterix-runtime</artifactId>
+ <version>0.8.8-SNAPSHOT</version>
+ <type>jar</type>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hyracks</groupId>
+ <artifactId>algebricks-compiler</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>com.kenai.nbpwr</groupId>
+ <artifactId>org-apache-commons-io</artifactId>
+ <version>1.3.1-201002241208</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.twitter4j</groupId>
+ <artifactId>twitter4j-core</artifactId>
+ <version>[4.0,)</version>
+ </dependency>
+ <dependency>
<groupId>org.twitter4j</groupId>
<artifactId>twitter4j-stream</artifactId>
<version>[4.0,)</version>
</dependency>
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-client</artifactId>
- <type>jar</type>
- <scope>compile</scope>
- </dependency>
- <dependency>
- <groupId>net.java.dev.rome</groupId>
- <artifactId>rome-fetcher</artifactId>
- <version>1.0.0</version>
- <type>jar</type>
- <scope>compile</scope>
- <exclusions>
- <exclusion>
- <artifactId>rome</artifactId>
- <groupId>net.java.dev.rome</groupId>
- </exclusion>
- </exclusions>
- </dependency>
- <dependency>
- <groupId>rome</groupId>
- <artifactId>rome</artifactId>
- <version>1.0.1-modified-01</version>
- </dependency>
- <dependency>
- <groupId>org.apache.hyracks</groupId>
- <artifactId>hyracks-hdfs-core</artifactId>
- <version>${hyracks.version}</version>
- </dependency>
- <dependency>
- <groupId>jdom</groupId>
- <artifactId>jdom</artifactId>
- <version>1.0</version>
- </dependency>
- <dependency>
- <groupId>org.apache.asterix</groupId>
- <artifactId>asterix-common</artifactId>
- <version>0.8.8-SNAPSHOT</version>
- </dependency>
- <dependency>
- <groupId>com.microsoft.windowsazure</groupId>
- <artifactId>microsoft-windowsazure-api</artifactId>
- <version>0.4.4</version>
- </dependency>
- <dependency>
- <groupId>org.apache.hive</groupId>
- <artifactId>hive-exec</artifactId>
- <version>0.13.0</version>
- </dependency>
- <dependency>
- <groupId>javax.jdo</groupId>
- <artifactId>jdo2-api</artifactId>
- <version>2.3-20090302111651</version>
- </dependency>
- </dependencies>
-</project>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-client</artifactId>
+ <type>jar</type>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>net.java.dev.rome</groupId>
+ <artifactId>rome-fetcher</artifactId>
+ <version>1.0.0</version>
+ <type>jar</type>
+ <scope>compile</scope>
+ <exclusions>
+ <exclusion>
+ <artifactId>rome</artifactId>
+ <groupId>net.java.dev.rome</groupId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>rome</groupId>
+ <artifactId>rome</artifactId>
+ <version>1.0.1-modified-01</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hyracks</groupId>
+ <artifactId>hyracks-hdfs-core</artifactId>
+ <version>${hyracks.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>jdom</groupId>
+ <artifactId>jdom</artifactId>
+ <version>1.0</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.asterix</groupId>
+ <artifactId>asterix-common</artifactId>
+ <version>0.8.8-SNAPSHOT</version>
+ </dependency>
+ <dependency>
+ <groupId>com.microsoft.windowsazure</groupId>
+ <artifactId>microsoft-windowsazure-api</artifactId>
+ <version>0.4.4</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hive</groupId>
+ <artifactId>hive-exec</artifactId>
+ <version>0.13.0</version>
+ </dependency>
+ <dependency>
+ <groupId>javax.jdo</groupId>
+ <artifactId>jdo2-api</artifactId>
+ <version>2.3-20090302111651</version>
+ </dependency>
+ </dependencies>
+</project>
\ No newline at end of file
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/CNNFeedAdapterFactory.java b/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/CNNFeedAdapterFactory.java
index a74ae4d..8b7b6d5 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/CNNFeedAdapterFactory.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/CNNFeedAdapterFactory.java
@@ -26,7 +26,6 @@
import org.apache.asterix.common.feeds.api.IDatasourceAdapter;
import org.apache.asterix.common.feeds.api.IIntakeProgressTracker;
import org.apache.asterix.external.dataset.adapter.RSSFeedAdapter;
-import org.apache.asterix.metadata.feeds.IFeedAdapterFactory;
import org.apache.asterix.om.types.ARecordType;
import org.apache.hyracks.algebricks.common.constraints.AlgebricksCountPartitionConstraint;
import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/HDFSAdapterFactory.java b/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/HDFSAdapterFactory.java
index cc4e2ef..ebf41cc 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/HDFSAdapterFactory.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/HDFSAdapterFactory.java
@@ -26,6 +26,17 @@
import java.util.List;
import java.util.Map;
+import org.apache.asterix.common.config.DatasetConfig.ExternalFilePendingOp;
+import org.apache.asterix.common.feeds.api.IDatasourceAdapter;
+import org.apache.asterix.external.dataset.adapter.HDFSAdapter;
+import org.apache.asterix.external.indexing.ExternalFile;
+import org.apache.asterix.external.indexing.dataflow.HDFSObjectTupleParserFactory;
+import org.apache.asterix.om.types.ARecordType;
+import org.apache.asterix.om.types.IAType;
+import org.apache.asterix.om.util.AsterixAppContextInfo;
+import org.apache.asterix.om.util.AsterixClusterProperties;
+import org.apache.asterix.runtime.operators.file.AsterixTupleParserFactory;
+import org.apache.asterix.runtime.operators.file.AsterixTupleParserFactory.InputDataFormat;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
@@ -33,19 +44,6 @@
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
-
-import org.apache.asterix.common.config.DatasetConfig.ExternalFilePendingOp;
-import org.apache.asterix.common.feeds.api.IDatasourceAdapter;
-import org.apache.asterix.external.dataset.adapter.HDFSAdapter;
-import org.apache.asterix.external.indexing.dataflow.HDFSObjectTupleParserFactory;
-import org.apache.asterix.metadata.entities.ExternalFile;
-import org.apache.asterix.metadata.external.IAdapterFactory;
-import org.apache.asterix.om.types.ARecordType;
-import org.apache.asterix.om.types.IAType;
-import org.apache.asterix.om.util.AsterixAppContextInfo;
-import org.apache.asterix.om.util.AsterixClusterProperties;
-import org.apache.asterix.runtime.operators.file.AsterixTupleParserFactory;
-import org.apache.asterix.runtime.operators.file.AsterixTupleParserFactory.InputDataFormat;
import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
import org.apache.hyracks.api.context.ICCContext;
@@ -106,8 +104,8 @@
ICCContext ccContext = AsterixAppContextInfo.getInstance().getCCApplicationContext().getCCContext();
Scheduler scheduler = null;
try {
- scheduler = new Scheduler(ccContext.getClusterControllerInfo().getClientNetAddress(), ccContext
- .getClusterControllerInfo().getClientNetPort());
+ scheduler = new Scheduler(ccContext.getClusterControllerInfo().getClientNetAddress(),
+ ccContext.getClusterControllerInfo().getClientNetPort());
} catch (HyracksException e) {
throw new IllegalStateException("Cannot obtain hdfs scheduler");
}
@@ -145,15 +143,15 @@
public static JobConf configureJobConf(Map<String, String> configuration) throws Exception {
JobConf conf = new JobConf();
- String formatClassName = (String) formatClassNames.get(((String) configuration.get(KEY_INPUT_FORMAT)).trim());
- String localShortCircuitSocketPath = (String) configuration.get(KEY_LOCAL_SOCKET_PATH);
+ String formatClassName = formatClassNames.get(configuration.get(KEY_INPUT_FORMAT).trim());
+ String localShortCircuitSocketPath = configuration.get(KEY_LOCAL_SOCKET_PATH);
if (formatClassName == null) {
- formatClassName = ((String) configuration.get(KEY_INPUT_FORMAT)).trim();
+ formatClassName = configuration.get(KEY_INPUT_FORMAT).trim();
}
- conf.set(KEY_HADOOP_FILESYSTEM_URI, ((String) configuration.get(KEY_HDFS_URL)).trim());
+ conf.set(KEY_HADOOP_FILESYSTEM_URI, configuration.get(KEY_HDFS_URL).trim());
conf.set(KEY_HADOOP_FILESYSTEM_CLASS, CLASS_NAME_HDFS_FILESYSTEM);
conf.setClassLoader(HDFSAdapter.class.getClassLoader());
- conf.set(KEY_HADOOP_INPUT_DIR, ((String) configuration.get(KEY_PATH)).trim());
+ conf.set(KEY_HADOOP_INPUT_DIR, configuration.get(KEY_PATH).trim());
conf.set(KEY_HADOOP_INPUT_FORMAT, formatClassName);
// Enable local short circuit reads if user supplied the parameters
@@ -169,7 +167,7 @@
if (!configured) {
throw new IllegalStateException("Adapter factory has not been configured yet");
}
- return (AlgebricksPartitionConstraint) clusterLocations;
+ return clusterLocations;
}
@Override
@@ -199,7 +197,7 @@
Arrays.fill(executed, false);
configured = true;
- atype = (IAType) outputType;
+ atype = outputType;
configureFormat(atype);
}
@@ -241,26 +239,26 @@
* if data is text data (adm or delimited text), it will use a text tuple parser,
* otherwise it will use hdfs record object parser
*/
+ @Override
protected void configureFormat(IAType sourceDatatype) throws Exception {
- String specifiedFormat = (String) configuration.get(AsterixTupleParserFactory.KEY_FORMAT);
- if (specifiedFormat == null) {
- throw new IllegalArgumentException(" Unspecified data format");
- }
-
- if(AsterixTupleParserFactory.FORMAT_BINARY.equalsIgnoreCase(specifiedFormat)){
- parserFactory = new HDFSObjectTupleParserFactory((ARecordType) atype, this, configuration);
- } else {
- InputDataFormat inputFormat = InputDataFormat.UNKNOWN;
- if (AsterixTupleParserFactory.FORMAT_DELIMITED_TEXT.equalsIgnoreCase(specifiedFormat)) {
- inputFormat = InputDataFormat.DELIMITED;
- } else if (AsterixTupleParserFactory.FORMAT_ADM.equalsIgnoreCase(specifiedFormat)) {
- inputFormat = InputDataFormat.ADM;
- }
- parserFactory = new AsterixTupleParserFactory(configuration, (ARecordType) sourceDatatype
- , inputFormat);
- }
-
- }
+ String specifiedFormat = configuration.get(AsterixTupleParserFactory.KEY_FORMAT);
+ if (specifiedFormat == null) {
+ throw new IllegalArgumentException(" Unspecified data format");
+ }
+
+ if (AsterixTupleParserFactory.FORMAT_BINARY.equalsIgnoreCase(specifiedFormat)) {
+ parserFactory = new HDFSObjectTupleParserFactory((ARecordType) atype, this, configuration);
+ } else {
+ InputDataFormat inputFormat = InputDataFormat.UNKNOWN;
+ if (AsterixTupleParserFactory.FORMAT_DELIMITED_TEXT.equalsIgnoreCase(specifiedFormat)) {
+ inputFormat = InputDataFormat.DELIMITED;
+ } else if (AsterixTupleParserFactory.FORMAT_ADM.equalsIgnoreCase(specifiedFormat)) {
+ inputFormat = InputDataFormat.ADM;
+ }
+ parserFactory = new AsterixTupleParserFactory(configuration, (ARecordType) sourceDatatype, inputFormat);
+ }
+
+ }
/**
* Instead of creating the split using the input format, we do it manually
@@ -296,9 +294,11 @@
// Create a split per block
for (BlockLocation block : fileBlocks) {
if (block.getOffset() < file.getSize()) {
- fileSplits.add(new FileSplit(filePath, block.getOffset(), (block.getLength() + block
- .getOffset()) < file.getSize() ? block.getLength() : (file.getSize() - block
- .getOffset()), block.getHosts()));
+ fileSplits
+ .add(new FileSplit(filePath,
+ block.getOffset(), (block.getLength() + block.getOffset()) < file.getSize()
+ ? block.getLength() : (file.getSize() - block.getOffset()),
+ block.getHosts()));
orderedExternalFiles.add(file);
}
}
@@ -322,11 +322,11 @@
if (block.getOffset() < newSize) {
// Block interact with delta -> Create a split
long startCut = (block.getOffset() > oldSize) ? 0L : oldSize - block.getOffset();
- long endCut = (block.getOffset() + block.getLength() < newSize) ? 0L : block.getOffset()
- + block.getLength() - newSize;
+ long endCut = (block.getOffset() + block.getLength() < newSize) ? 0L
+ : block.getOffset() + block.getLength() - newSize;
long splitLength = block.getLength() - startCut - endCut;
- fileSplits.add(new FileSplit(filePath, block.getOffset() + startCut, splitLength, block
- .getHosts()));
+ fileSplits.add(new FileSplit(filePath, block.getOffset() + startCut, splitLength,
+ block.getHosts()));
orderedExternalFiles.add(file);
}
}
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/HDFSIndexingAdapterFactory.java b/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/HDFSIndexingAdapterFactory.java
index 0033e5c..11e2b96 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/HDFSIndexingAdapterFactory.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/HDFSIndexingAdapterFactory.java
@@ -23,9 +23,6 @@
import java.util.HashMap;
import java.util.Map;
-import org.apache.hadoop.mapred.InputSplit;
-import org.apache.hadoop.mapred.JobConf;
-
import org.apache.asterix.common.feeds.api.IDatasourceAdapter;
import org.apache.asterix.external.dataset.adapter.HDFSIndexingAdapter;
import org.apache.asterix.external.indexing.dataflow.HDFSIndexingParserFactory;
@@ -39,6 +36,8 @@
import org.apache.asterix.om.util.NonTaggedFormatUtil;
import org.apache.asterix.runtime.operators.file.AsterixTupleParserFactory;
import org.apache.asterix.runtime.operators.file.DelimitedDataParser;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
import org.apache.hyracks.algebricks.common.exceptions.NotImplementedException;
@@ -75,8 +74,8 @@
ICCContext ccContext = AsterixAppContextInfo.getInstance().getCCApplicationContext().getCCContext();
IndexingScheduler scheduler = null;
try {
- scheduler = new IndexingScheduler(ccContext.getClusterControllerInfo().getClientNetAddress(), ccContext
- .getClusterControllerInfo().getClientNetPort());
+ scheduler = new IndexingScheduler(ccContext.getClusterControllerInfo().getClientNetAddress(),
+ ccContext.getClusterControllerInfo().getClientNetPort());
} catch (HyracksException e) {
throw new IllegalStateException("Cannot obtain hdfs scheduler");
}
@@ -98,7 +97,7 @@
if (!configured) {
throw new IllegalStateException("Adapter factory has not been configured yet");
}
- return (AlgebricksPartitionConstraint) clusterLocations;
+ return clusterLocations;
}
@Override
@@ -131,11 +130,12 @@
executed = new boolean[readSchedule.length];
Arrays.fill(executed, false);
configured = true;
- atype = (IAType) outputType;
+ atype = outputType;
// The function below is overwritten to create indexing adapter factory instead of regular adapter factory
configureFormat(atype);
}
+ @Override
protected void configureFormat(IAType sourceDatatype) throws Exception {
char delimiter = AsterixTupleParserFactory.getDelimiter(configuration);
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/HiveAdapterFactory.java b/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/HiveAdapterFactory.java
index b61f5f3..553682e 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/HiveAdapterFactory.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/HiveAdapterFactory.java
@@ -24,8 +24,7 @@
import org.apache.asterix.common.feeds.api.IDatasourceAdapter;
import org.apache.asterix.external.dataset.adapter.HDFSAdapter;
import org.apache.asterix.external.dataset.adapter.HiveAdapter;
-import org.apache.asterix.metadata.entities.ExternalFile;
-import org.apache.asterix.metadata.external.IAdapterFactory;
+import org.apache.asterix.external.indexing.ExternalFile;
import org.apache.asterix.om.types.ARecordType;
import org.apache.asterix.om.types.IAType;
import org.apache.asterix.runtime.operators.file.AsterixTupleParserFactory;
@@ -67,7 +66,6 @@
return "hive";
}
-
@Override
public SupportedOperation getSupportedOperations() {
return SupportedOperation.READ;
@@ -78,13 +76,13 @@
if (!configured) {
populateConfiguration(configuration);
hdfsAdapterFactory.configure(configuration, outputType);
- this.atype = (IAType) outputType;
+ this.atype = outputType;
}
}
public static void populateConfiguration(Map<String, String> configuration) throws Exception {
/** configure hive */
- String database = (String) configuration.get(HIVE_DATABASE);
+ String database = configuration.get(HIVE_DATABASE);
String tablePath = null;
if (database == null) {
tablePath = configuration.get(HIVE_WAREHOUSE_DIR) + "/" + configuration.get(HIVE_TABLE);
@@ -93,14 +91,17 @@
+ configuration.get(HIVE_TABLE);
}
configuration.put(HDFSAdapterFactory.KEY_PATH, tablePath);
- if (!configuration.get(AsterixTupleParserFactory.KEY_FORMAT).equals(AsterixTupleParserFactory.FORMAT_DELIMITED_TEXT)) {
- throw new IllegalArgumentException("format" + configuration.get(AsterixTupleParserFactory.KEY_FORMAT) + " is not supported");
+ if (!configuration.get(AsterixTupleParserFactory.KEY_FORMAT)
+ .equals(AsterixTupleParserFactory.FORMAT_DELIMITED_TEXT)) {
+ throw new IllegalArgumentException(
+ "format" + configuration.get(AsterixTupleParserFactory.KEY_FORMAT) + " is not supported");
}
- if (!(configuration.get(HDFSAdapterFactory.KEY_INPUT_FORMAT).equals(HDFSAdapterFactory.INPUT_FORMAT_TEXT) || configuration
- .get(HDFSAdapterFactory.KEY_INPUT_FORMAT).equals(HDFSAdapterFactory.INPUT_FORMAT_SEQUENCE))) {
- throw new IllegalArgumentException("file input format"
- + configuration.get(HDFSAdapterFactory.KEY_INPUT_FORMAT) + " is not supported");
+ if (!(configuration.get(HDFSAdapterFactory.KEY_INPUT_FORMAT).equals(HDFSAdapterFactory.INPUT_FORMAT_TEXT)
+ || configuration.get(HDFSAdapterFactory.KEY_INPUT_FORMAT)
+ .equals(HDFSAdapterFactory.INPUT_FORMAT_SEQUENCE))) {
+ throw new IllegalArgumentException(
+ "file input format" + configuration.get(HDFSAdapterFactory.KEY_INPUT_FORMAT) + " is not supported");
}
}
@@ -109,7 +110,6 @@
return hdfsAdapterFactory.getPartitionConstraint();
}
-
@Override
public ARecordType getAdapterOutputType() {
return (ARecordType) atype;
diff --git a/asterix-metadata/src/main/java/org/apache/asterix/metadata/external/IAdapterFactory.java b/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/IAdapterFactory.java
similarity index 98%
rename from asterix-metadata/src/main/java/org/apache/asterix/metadata/external/IAdapterFactory.java
rename to asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/IAdapterFactory.java
index 4d841bd..b8005cd 100644
--- a/asterix-metadata/src/main/java/org/apache/asterix/metadata/external/IAdapterFactory.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/IAdapterFactory.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.asterix.metadata.external;
+package org.apache.asterix.external.adapter.factory;
import java.io.Serializable;
import java.util.Map;
diff --git a/asterix-metadata/src/main/java/org/apache/asterix/metadata/external/IControlledAdapterFactory.java b/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/IControlledAdapterFactory.java
similarity index 87%
rename from asterix-metadata/src/main/java/org/apache/asterix/metadata/external/IControlledAdapterFactory.java
rename to asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/IControlledAdapterFactory.java
index 4f66e55..0de6fad 100644
--- a/asterix-metadata/src/main/java/org/apache/asterix/metadata/external/IControlledAdapterFactory.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/IControlledAdapterFactory.java
@@ -16,11 +16,13 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.asterix.metadata.external;
+package org.apache.asterix.external.adapter.factory;
import java.io.Serializable;
import java.util.Map;
+import org.apache.asterix.external.dataset.adapter.IControlledAdapter;
+import org.apache.asterix.external.indexing.ExternalFileIndexAccessor;
import org.apache.asterix.om.types.IAType;
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
diff --git a/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/IFeedAdapterFactory.java b/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/IFeedAdapterFactory.java
similarity index 90%
rename from asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/IFeedAdapterFactory.java
rename to asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/IFeedAdapterFactory.java
index 0e09564..9358a52 100644
--- a/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/IFeedAdapterFactory.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/IFeedAdapterFactory.java
@@ -16,10 +16,9 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.asterix.metadata.feeds;
+package org.apache.asterix.external.adapter.factory;
import org.apache.asterix.common.feeds.api.IIntakeProgressTracker;
-import org.apache.asterix.metadata.external.IAdapterFactory;
public interface IFeedAdapterFactory extends IAdapterFactory {
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/NCFileSystemAdapterFactory.java b/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/NCFileSystemAdapterFactory.java
index 61bf4a3..251d69a 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/NCFileSystemAdapterFactory.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/NCFileSystemAdapterFactory.java
@@ -26,11 +26,10 @@
import org.apache.asterix.common.exceptions.AsterixException;
import org.apache.asterix.common.feeds.api.IDatasourceAdapter;
import org.apache.asterix.external.dataset.adapter.NCFileSystemAdapter;
+import org.apache.asterix.external.indexing.ExternalFile;
import org.apache.asterix.external.util.DNSResolverFactory;
import org.apache.asterix.external.util.INodeResolver;
import org.apache.asterix.external.util.INodeResolverFactory;
-import org.apache.asterix.metadata.entities.ExternalFile;
-import org.apache.asterix.metadata.external.IAdapterFactory;
import org.apache.asterix.om.types.ARecordType;
import org.apache.asterix.om.types.IAType;
import org.apache.asterix.runtime.operators.file.AsterixTupleParserFactory;
@@ -58,7 +57,6 @@
private FileSplit[] fileSplits;
private ARecordType outputType;
-
@Override
public IDatasourceAdapter createAdapter(IHyracksTaskContext ctx, int partition) throws Exception {
NCFileSystemAdapter fsAdapter = new NCFileSystemAdapter(fileSplits, parserFactory, sourceDatatype, ctx);
@@ -70,7 +68,6 @@
return NC_FILE_SYSTEM_ADAPTER_NAME;
}
-
@Override
public SupportedOperation getSupportedOperations() {
return SupportedOperation.READ;
@@ -80,8 +77,8 @@
public void configure(Map<String, String> configuration, ARecordType outputType) throws Exception {
this.configuration = configuration;
this.outputType = outputType;
- String[] splits = ((String) configuration.get(AsterixTupleParserFactory.KEY_PATH)).split(",");
- IAType sourceDatatype = (IAType) outputType;
+ String[] splits = configuration.get(AsterixTupleParserFactory.KEY_PATH).split(",");
+ IAType sourceDatatype = outputType;
configureFileSplits(splits);
configureFormat(sourceDatatype);
@@ -102,8 +99,8 @@
for (String splitPath : splits) {
trimmedValue = splitPath.trim();
if (!trimmedValue.contains("://")) {
- throw new AsterixException("Invalid path: " + splitPath
- + "\nUsage- path=\"Host://Absolute File Path\"");
+ throw new AsterixException(
+ "Invalid path: " + splitPath + "\nUsage- path=\"Host://Absolute File Path\"");
}
nodeName = trimmedValue.split(":")[0];
nodeLocalPath = trimmedValue.split("://")[1];
@@ -132,7 +129,8 @@
private static INodeResolver initializeNodeResolver() {
INodeResolver nodeResolver = null;
- String configuredNodeResolverFactory = System.getProperty(AsterixTupleParserFactory.NODE_RESOLVER_FACTORY_PROPERTY);
+ String configuredNodeResolverFactory = System
+ .getProperty(AsterixTupleParserFactory.NODE_RESOLVER_FACTORY_PROPERTY);
if (configuredNodeResolverFactory != null) {
try {
nodeResolver = ((INodeResolverFactory) (Class.forName(configuredNodeResolverFactory).newInstance()))
@@ -150,12 +148,12 @@
}
return nodeResolver;
}
-
+
@Override
public ARecordType getAdapterOutputType() {
return outputType;
}
-
+
@Override
public InputDataFormat getInputDataFormat() {
return InputDataFormat.UNKNOWN;
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/PullBasedAzureTwitterAdapterFactory.java b/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/PullBasedAzureTwitterAdapterFactory.java
index bf92480..fbde1b4 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/PullBasedAzureTwitterAdapterFactory.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/PullBasedAzureTwitterAdapterFactory.java
@@ -25,13 +25,7 @@
import org.apache.asterix.common.feeds.api.IDatasourceAdapter;
import org.apache.asterix.common.feeds.api.IIntakeProgressTracker;
import org.apache.asterix.external.dataset.adapter.PullBasedAzureTwitterAdapter;
-import org.apache.asterix.metadata.MetadataManager;
-import org.apache.asterix.metadata.MetadataTransactionContext;
-import org.apache.asterix.metadata.entities.Datatype;
-import org.apache.asterix.metadata.feeds.IFeedAdapterFactory;
import org.apache.asterix.om.types.ARecordType;
-import org.apache.asterix.om.types.ATypeTag;
-import org.apache.asterix.om.types.IAType;
import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
import org.apache.hyracks.api.context.IHyracksTaskContext;
@@ -42,7 +36,6 @@
private static final String INGESTOR_LOCATIONS_KEY = "ingestor-locations";
private static final String PARTITIONS_KEY = "partitions";
- private static final String OUTPUT_TYPE_KEY = "output-type";
private static final String TABLE_NAME_KEY = "table-name";
private static final String ACCOUNT_NAME_KEY = "account-name";
private static final String ACCOUNT_KEY_KEY = "account-key";
@@ -78,8 +71,8 @@
@Override
public IDatasourceAdapter createAdapter(IHyracksTaskContext ctx, int partition) throws Exception {
- return new PullBasedAzureTwitterAdapter(azureAccountName, azureAccountKey, tableName, partitions,
- configuration, ctx, outputType);
+ return new PullBasedAzureTwitterAdapter(azureAccountName, azureAccountKey, tableName, partitions, configuration,
+ ctx, outputType);
}
@Override
@@ -120,38 +113,6 @@
throw new AsterixException("Invalid adapter configuration: number of ingestion-locations ("
+ nIngestLocations + ") must be the same as the number of partitions (" + nPartitions + ")");
}
- configureType();
- }
-
- private void configureType() throws Exception {
- String fqOutputType = configuration.get(OUTPUT_TYPE_KEY);
-
- if (fqOutputType == null) {
- throw new IllegalArgumentException("No output type specified");
- }
- String[] dataverseAndType = fqOutputType.split("[.]");
- String dataverseName = dataverseAndType[0];
- String datatypeName = dataverseAndType[1];
-
- MetadataTransactionContext ctx = null;
- MetadataManager.INSTANCE.acquireReadLatch();
- try {
- ctx = MetadataManager.INSTANCE.beginTransaction();
- Datatype t = MetadataManager.INSTANCE.getDatatype(ctx, dataverseName, datatypeName);
- IAType type = t.getDatatype();
- if (type.getTypeTag() != ATypeTag.RECORD) {
- throw new IllegalStateException();
- }
- outputType = (ARecordType) t.getDatatype();
- MetadataManager.INSTANCE.commitTransaction(ctx);
- } catch (Exception e) {
- if (ctx != null) {
- MetadataManager.INSTANCE.abortTransaction(ctx);
- }
- throw e;
- } finally {
- MetadataManager.INSTANCE.releaseReadLatch();
- }
}
@Override
@@ -167,7 +128,5 @@
public FeedPolicyAccessor getIngestionPolicy() {
return ingestionPolicy;
}
-
-
}
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/PullBasedTwitterAdapterFactory.java b/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/PullBasedTwitterAdapterFactory.java
index c05df33..7d2dd73 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/PullBasedTwitterAdapterFactory.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/PullBasedTwitterAdapterFactory.java
@@ -28,7 +28,6 @@
import org.apache.asterix.external.dataset.adapter.PullBasedTwitterAdapter;
import org.apache.asterix.external.util.TwitterUtil;
import org.apache.asterix.external.util.TwitterUtil.SearchAPIConstants;
-import org.apache.asterix.metadata.feeds.IFeedAdapterFactory;
import org.apache.asterix.om.types.ARecordType;
import org.apache.hyracks.algebricks.common.constraints.AlgebricksCountPartitionConstraint;
import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/PushBasedTwitterAdapterFactory.java b/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/PushBasedTwitterAdapterFactory.java
index 2606063..5bfdbcf 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/PushBasedTwitterAdapterFactory.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/PushBasedTwitterAdapterFactory.java
@@ -25,7 +25,6 @@
import org.apache.asterix.external.dataset.adapter.PushBasedTwitterAdapter;
import org.apache.asterix.external.util.TwitterUtil;
import org.apache.asterix.external.util.TwitterUtil.AuthenticationConstants;
-import org.apache.asterix.metadata.feeds.IFeedAdapterFactory;
import org.apache.asterix.om.types.ARecordType;
import org.apache.hyracks.algebricks.common.constraints.AlgebricksCountPartitionConstraint;
import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/RSSFeedAdapterFactory.java b/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/RSSFeedAdapterFactory.java
index 8f47aa2..4d893fe 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/RSSFeedAdapterFactory.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/RSSFeedAdapterFactory.java
@@ -26,7 +26,6 @@
import org.apache.asterix.common.feeds.api.IDatasourceAdapter;
import org.apache.asterix.common.feeds.api.IIntakeProgressTracker;
import org.apache.asterix.external.dataset.adapter.RSSFeedAdapter;
-import org.apache.asterix.metadata.feeds.IFeedAdapterFactory;
import org.apache.asterix.om.types.ARecordType;
import org.apache.hyracks.algebricks.common.constraints.AlgebricksCountPartitionConstraint;
import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
@@ -68,7 +67,7 @@
public void configure(Map<String, String> configuration, ARecordType outputType) throws Exception {
this.configuration = configuration;
this.outputType = outputType;
- String rssURLProperty = (String) configuration.get(KEY_RSS_URL);
+ String rssURLProperty = configuration.get(KEY_RSS_URL);
if (rssURLProperty == null) {
throw new IllegalArgumentException("no rss url provided");
}
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/StreamBasedAdapterFactory.java b/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/StreamBasedAdapterFactory.java
index e37cf3e..c7e582f 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/StreamBasedAdapterFactory.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/StreamBasedAdapterFactory.java
@@ -22,7 +22,6 @@
import java.util.logging.Logger;
import org.apache.asterix.external.util.INodeResolver;
-import org.apache.asterix.metadata.external.IAdapterFactory;
import org.apache.asterix.om.types.ARecordType;
import org.apache.asterix.om.types.IAType;
import org.apache.asterix.runtime.operators.file.AsterixTupleParserFactory;
@@ -42,7 +41,8 @@
public abstract InputDataFormat getInputDataFormat();
protected void configureFormat(IAType sourceDatatype) throws Exception {
- parserFactory = new AsterixTupleParserFactory(configuration, (ARecordType) sourceDatatype, getInputDataFormat());
+ parserFactory = new AsterixTupleParserFactory(configuration, (ARecordType) sourceDatatype,
+ getInputDataFormat());
}
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/ClientBasedFeedAdapter.java b/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/ClientBasedFeedAdapter.java
index 26d4d8f..a197368 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/ClientBasedFeedAdapter.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/ClientBasedFeedAdapter.java
@@ -25,7 +25,7 @@
import org.apache.asterix.common.feeds.api.IFeedAdapter;
import org.apache.asterix.common.parse.ITupleForwardPolicy;
import org.apache.asterix.external.dataset.adapter.IFeedClient.InflowState;
-import org.apache.asterix.metadata.feeds.FeedPolicyEnforcer;
+import org.apache.asterix.external.feeds.FeedPolicyEnforcer;
import org.apache.asterix.om.types.ARecordType;
import org.apache.hyracks.api.comm.IFrame;
import org.apache.hyracks.api.comm.IFrameWriter;
@@ -138,6 +138,7 @@
*
* @throws Exception
*/
+ @Override
public void stop() throws Exception {
continueIngestion = false;
}
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/HDFSAdapter.java b/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/HDFSAdapter.java
index df0926b..5f1b1ae 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/HDFSAdapter.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/HDFSAdapter.java
@@ -23,11 +23,11 @@
import java.util.List;
import java.util.Map;
+import org.apache.asterix.external.indexing.ExternalFile;
import org.apache.asterix.external.indexing.input.GenericFileAwareRecordReader;
import org.apache.asterix.external.indexing.input.GenericRecordReader;
import org.apache.asterix.external.indexing.input.TextualDataReader;
import org.apache.asterix.external.indexing.input.TextualFullScanDataReader;
-import org.apache.asterix.metadata.entities.ExternalFile;
import org.apache.asterix.om.types.IAType;
import org.apache.asterix.runtime.operators.file.AsterixTupleParserFactory;
import org.apache.hadoop.mapred.InputSplit;
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/HDFSIndexingAdapter.java b/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/HDFSIndexingAdapter.java
index 7da182a..92a049d0 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/HDFSIndexingAdapter.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/HDFSIndexingAdapter.java
@@ -22,16 +22,15 @@
import java.io.InputStream;
import java.util.List;
-import org.apache.hadoop.mapred.InputSplit;
-import org.apache.hadoop.mapred.JobConf;
-
import org.apache.asterix.external.adapter.factory.HDFSAdapterFactory;
+import org.apache.asterix.external.indexing.ExternalFile;
import org.apache.asterix.external.indexing.input.GenericFileAwareRecordReader;
import org.apache.asterix.external.indexing.input.RCFileDataReader;
import org.apache.asterix.external.indexing.input.TextualDataReader;
-import org.apache.asterix.metadata.entities.ExternalFile;
import org.apache.asterix.om.types.IAType;
import org.apache.asterix.runtime.operators.file.AsterixTupleParserFactory;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.dataflow.std.file.ITupleParserFactory;
diff --git a/asterix-metadata/src/main/java/org/apache/asterix/metadata/external/IControlledAdapter.java b/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/IControlledAdapter.java
similarity index 96%
rename from asterix-metadata/src/main/java/org/apache/asterix/metadata/external/IControlledAdapter.java
rename to asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/IControlledAdapter.java
index cd3702e..e71f10c 100644
--- a/asterix-metadata/src/main/java/org/apache/asterix/metadata/external/IControlledAdapter.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/IControlledAdapter.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.asterix.metadata.external;
+package org.apache.asterix.external.dataset.adapter;
import java.io.Serializable;
import java.nio.ByteBuffer;
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/PullBasedAdapter.java b/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/PullBasedAdapter.java
index c878b36..d6b4ba7 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/PullBasedAdapter.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/PullBasedAdapter.java
@@ -23,8 +23,8 @@
import java.util.logging.Logger;
import org.apache.asterix.external.dataset.adapter.IPullBasedFeedClient.InflowState;
-import org.apache.asterix.metadata.feeds.FeedPolicyEnforcer;
-import org.apache.asterix.metadata.feeds.IPullBasedFeedAdapter;
+import org.apache.asterix.external.feeds.FeedPolicyEnforcer;
+import org.apache.asterix.external.feeds.IPullBasedFeedAdapter;
import org.apache.asterix.om.types.ARecordType;
import org.apache.hyracks.api.comm.IFrame;
import org.apache.hyracks.api.comm.IFrameWriter;
@@ -136,7 +136,8 @@
}
private void appendTupleToFrame(IFrameWriter writer) throws HyracksDataException {
- if (!appender.append(tupleBuilder.getFieldEndOffsets(), tupleBuilder.getByteArray(), 0, tupleBuilder.getSize())) {
+ if (!appender.append(tupleBuilder.getFieldEndOffsets(), tupleBuilder.getByteArray(), 0,
+ tupleBuilder.getSize())) {
appender.flush(writer, true);
if (!appender.append(tupleBuilder.getFieldEndOffsets(), tupleBuilder.getByteArray(), 0,
tupleBuilder.getSize())) {
diff --git a/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedPolicyEnforcer.java b/asterix-external-data/src/main/java/org/apache/asterix/external/feeds/FeedPolicyEnforcer.java
similarity index 97%
rename from asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedPolicyEnforcer.java
rename to asterix-external-data/src/main/java/org/apache/asterix/external/feeds/FeedPolicyEnforcer.java
index 4b39446..ae5c050 100644
--- a/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedPolicyEnforcer.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/feeds/FeedPolicyEnforcer.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.asterix.metadata.feeds;
+package org.apache.asterix.external.feeds;
import java.rmi.RemoteException;
import java.util.Map;
diff --git a/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/IPullBasedFeedAdapter.java b/asterix-external-data/src/main/java/org/apache/asterix/external/feeds/IPullBasedFeedAdapter.java
similarity index 95%
rename from asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/IPullBasedFeedAdapter.java
rename to asterix-external-data/src/main/java/org/apache/asterix/external/feeds/IPullBasedFeedAdapter.java
index 9e75452..62052af 100644
--- a/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/IPullBasedFeedAdapter.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/feeds/IPullBasedFeedAdapter.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.asterix.metadata.feeds;
+package org.apache.asterix.external.feeds;
import org.apache.asterix.common.feeds.api.IFeedAdapter;
diff --git a/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/ExternalFile.java b/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/ExternalFile.java
similarity index 87%
rename from asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/ExternalFile.java
rename to asterix-external-data/src/main/java/org/apache/asterix/external/indexing/ExternalFile.java
index 75532f3..3988f1a 100644
--- a/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/ExternalFile.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/ExternalFile.java
@@ -17,15 +17,14 @@
* under the License.
*/
-package org.apache.asterix.metadata.entities;
+package org.apache.asterix.external.indexing;
+import java.io.Serializable;
import java.util.Date;
import org.apache.asterix.common.config.DatasetConfig.ExternalFilePendingOp;
-import org.apache.asterix.metadata.MetadataCache;
-import org.apache.asterix.metadata.api.IMetadataEntity;
-public class ExternalFile implements IMetadataEntity, Comparable<ExternalFile> {
+public class ExternalFile implements Serializable, Comparable<ExternalFile> {
/**
* A class for metadata entity externalFile
@@ -109,18 +108,6 @@
}
@Override
- public Object addToCache(MetadataCache cache) {
- return null;
- //return cache.addExternalFileIfNotExists(this);
- }
-
- @Override
- public Object dropFromCache(MetadataCache cache) {
- return null;
- //return cache.dropExternalFile(this);
- }
-
- @Override
public boolean equals(Object obj) {
if (obj == null)
return false;
diff --git a/asterix-metadata/src/main/java/org/apache/asterix/metadata/external/ExternalFileIndexAccessor.java b/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/ExternalFileIndexAccessor.java
similarity index 91%
rename from asterix-metadata/src/main/java/org/apache/asterix/metadata/external/ExternalFileIndexAccessor.java
rename to asterix-external-data/src/main/java/org/apache/asterix/external/indexing/ExternalFileIndexAccessor.java
index ac975c4..b10379b 100644
--- a/asterix-metadata/src/main/java/org/apache/asterix/metadata/external/ExternalFileIndexAccessor.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/ExternalFileIndexAccessor.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.asterix.metadata.external;
+package org.apache.asterix.external.indexing;
import java.io.ByteArrayInputStream;
import java.io.DataInput;
@@ -24,8 +24,8 @@
import java.io.Serializable;
import java.util.Date;
+import org.apache.asterix.external.indexing.operators.ExternalLoopkupOperatorDiscriptor;
import org.apache.asterix.formats.nontagged.AqlSerializerDeserializerProvider;
-import org.apache.asterix.metadata.entities.ExternalFile;
import org.apache.asterix.om.base.ADateTime;
import org.apache.asterix.om.base.AInt64;
import org.apache.asterix.om.base.AMutableInt32;
@@ -71,7 +71,8 @@
private ILSMIndexAccessorInternal fileIndexAccessor;
private IIndexCursor fileIndexSearchCursor;
- public ExternalFileIndexAccessor(ExternalBTreeDataflowHelper indexDataflowHelper, ExternalLoopkupOperatorDiscriptor opDesc) {
+ public ExternalFileIndexAccessor(ExternalBTreeDataflowHelper indexDataflowHelper,
+ ExternalLoopkupOperatorDiscriptor opDesc) {
this.indexDataflowHelper = indexDataflowHelper;
this.opDesc = opDesc;
}
@@ -129,12 +130,14 @@
}
private void setExternalFileFromARecord(ARecord externalFileRecord, ExternalFile file) {
- file.setFileName(((AString) externalFileRecord
- .getValueByPos(FilesIndexDescription.EXTERNAL_FILE_NAME_FIELD_INDEX)).getStringValue());
+ file.setFileName(
+ ((AString) externalFileRecord.getValueByPos(FilesIndexDescription.EXTERNAL_FILE_NAME_FIELD_INDEX))
+ .getStringValue());
file.setSize(((AInt64) externalFileRecord.getValueByPos(FilesIndexDescription.EXTERNAL_FILE_SIZE_FIELD_INDEX))
.getLongValue());
- file.setLastModefiedTime((new Date(((ADateTime) externalFileRecord
- .getValueByPos(FilesIndexDescription.EXTERNAL_FILE_MOD_DATE_FIELD_INDEX)).getChrononTime())));
+ file.setLastModefiedTime((new Date(
+ ((ADateTime) externalFileRecord.getValueByPos(FilesIndexDescription.EXTERNAL_FILE_MOD_DATE_FIELD_INDEX))
+ .getChrononTime())));
}
public void closeIndex() throws HyracksDataException {
diff --git a/asterix-metadata/src/main/java/org/apache/asterix/metadata/external/FilesIndexDescription.java b/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/FilesIndexDescription.java
similarity index 96%
rename from asterix-metadata/src/main/java/org/apache/asterix/metadata/external/FilesIndexDescription.java
rename to asterix-external-data/src/main/java/org/apache/asterix/external/indexing/FilesIndexDescription.java
index cb4c0d2..fa81583 100644
--- a/asterix-metadata/src/main/java/org/apache/asterix/metadata/external/FilesIndexDescription.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/FilesIndexDescription.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.asterix.metadata.external;
+package org.apache.asterix.external.indexing;
import java.io.IOException;
@@ -64,8 +64,7 @@
public FilesIndexDescription() {
ARecordType type;
try {
- type = new ARecordType("ExternalFileRecordType", payloadFieldNames,
- payloadFieldTypes, true);
+ type = new ARecordType("ExternalFileRecordType", payloadFieldNames, payloadFieldTypes, true);
} catch (Exception e) {
e.printStackTrace();
throw new RuntimeException(e);
@@ -98,8 +97,7 @@
public void getBuddyBTreeTupleFromFileNumber(ArrayTupleReference tuple, ArrayTupleBuilder tupleBuilder,
AMutableInt32 aInt32) throws IOException, AsterixException {
tupleBuilder.reset();
- FILE_BUDDY_BTREE_RECORD_DESCRIPTOR.getFields()[0].serialize(aInt32,
- tupleBuilder.getDataOutput());
+ FILE_BUDDY_BTREE_RECORD_DESCRIPTOR.getFields()[0].serialize(aInt32, tupleBuilder.getDataOutput());
tupleBuilder.addFieldEndOffset();
tuple.reset(tupleBuilder.getFieldEndOffsets(), tupleBuilder.getByteArray());
}
diff --git a/asterix-metadata/src/main/java/org/apache/asterix/metadata/external/IndexingConstants.java b/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/IndexingConstants.java
similarity index 84%
rename from asterix-metadata/src/main/java/org/apache/asterix/metadata/external/IndexingConstants.java
rename to asterix-external-data/src/main/java/org/apache/asterix/external/indexing/IndexingConstants.java
index 4754e07..249bd12 100644
--- a/asterix-metadata/src/main/java/org/apache/asterix/metadata/external/IndexingConstants.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/IndexingConstants.java
@@ -16,18 +16,17 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.asterix.metadata.external;
+package org.apache.asterix.external.indexing;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
+import java.util.Map;
import org.apache.asterix.common.exceptions.AsterixException;
import org.apache.asterix.formats.nontagged.AqlBinaryComparatorFactoryProvider;
import org.apache.asterix.formats.nontagged.AqlSerializerDeserializerProvider;
import org.apache.asterix.formats.nontagged.AqlTypeTraitProvider;
-import org.apache.asterix.metadata.entities.Dataset;
-import org.apache.asterix.metadata.entities.ExternalDatasetDetails;
import org.apache.asterix.om.types.BuiltinType;
import org.apache.asterix.om.types.IAType;
import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
@@ -56,12 +55,14 @@
.getBinaryComparatorFactory(BuiltinType.AINT64, true);
private static final IBinaryComparatorFactory rowNumberCompFactory = AqlBinaryComparatorFactoryProvider.INSTANCE
.getBinaryComparatorFactory(BuiltinType.AINT32, true);
-
- private static final IBinaryComparatorFactory[] rCFileRIDComparatorFactories = {fileNumberCompFactory, recordOffsetCompFactory, rowNumberCompFactory};
- private static final IBinaryComparatorFactory[] txtSeqFileRIDComparatorFactories = {fileNumberCompFactory, recordOffsetCompFactory};
- private static final IBinaryComparatorFactory[] buddyBtreeComparatorFactories = {fileNumberCompFactory};
-
+ private static final IBinaryComparatorFactory[] rCFileRIDComparatorFactories = { fileNumberCompFactory,
+ recordOffsetCompFactory, rowNumberCompFactory };
+ private static final IBinaryComparatorFactory[] txtSeqFileRIDComparatorFactories = { fileNumberCompFactory,
+ recordOffsetCompFactory };
+
+ private static final IBinaryComparatorFactory[] buddyBtreeComparatorFactories = { fileNumberCompFactory };
+
//Serdes
private static ISerializerDeserializer fileNumberSerializerDeserializer;
private static ISerializerDeserializer recordOffsetSerializerDeserializer;
@@ -79,7 +80,7 @@
public static final int FILE_NUMBER_FIELD_INDEX = 0;
public static final int RECORD_OFFSET_FIELD_INDEX = 1;
public static final int ROW_NUMBER_FIELD_INDEX = 2;
-
+
public static final ArrayList<List<String>> RecordIDFields = new ArrayList<List<String>>();
static {
@@ -98,7 +99,7 @@
fileNumberEvalFactory = new TupleFieldEvaluatorFactory(1);
recordOffsetEvalFactory = new TupleFieldEvaluatorFactory(2);
rowNumberEvalFactory = new TupleFieldEvaluatorFactory(3);
-
+
// Add field names
RecordIDFields.add(new ArrayList<String>(Arrays.asList("FileNumber")));
RecordIDFields.add(new ArrayList<String>(Arrays.asList("RecordOffset")));
@@ -112,15 +113,15 @@
else
return 2;
}
-
- // This function returns the size of the RID for the passed file input format
+
+ // This function returns the size of the RID for the passed file input format
public static IBinaryComparatorFactory[] getComparatorFactories(String fileInputFormat) {
if (fileInputFormat.equals(INPUT_FORMAT_RC) || fileInputFormat.equals(INPUT_FORMAT_RC_FULLY_QUALIFIED))
return rCFileRIDComparatorFactories;
else
return txtSeqFileRIDComparatorFactories;
}
-
+
public static IAType getFieldType(int fieldNumber) throws AsterixException {
switch (fieldNumber) {
case 0:
@@ -133,9 +134,8 @@
throw new AsterixException("Unknown external field RID number");
}
}
-
- public static IBinaryComparatorFactory getComparatorFactory(int fieldNumber)
- throws AsterixException {
+
+ public static IBinaryComparatorFactory getComparatorFactory(int fieldNumber) throws AsterixException {
switch (fieldNumber) {
case 0:
return fileNumberCompFactory;
@@ -147,9 +147,8 @@
throw new AsterixException("Unknown external field RID number");
}
}
-
- public static ISerializerDeserializer getSerializerDeserializer(int fieldNumber)
- throws AsterixException {
+
+ public static ISerializerDeserializer getSerializerDeserializer(int fieldNumber) throws AsterixException {
switch (fieldNumber) {
case 0:
return fileNumberSerializerDeserializer;
@@ -161,9 +160,8 @@
throw new AsterixException("Unknown external field RID number");
}
}
-
- public static ITypeTraits getTypeTraits(int fieldNumber)
- throws AsterixException {
+
+ public static ITypeTraits getTypeTraits(int fieldNumber) throws AsterixException {
switch (fieldNumber) {
case 0:
return fileNumberTypeTraits;
@@ -175,30 +173,30 @@
throw new AsterixException("Unknown external field RID number");
}
}
-
- public static IScalarEvaluatorFactory getEvalFactory(int fieldNumber) throws AsterixException{
+
+ public static IScalarEvaluatorFactory getEvalFactory(int fieldNumber) throws AsterixException {
switch (fieldNumber) {
- case 0:
- return fileNumberEvalFactory;
- case 1:
- return recordOffsetEvalFactory;
- case 2:
- return rowNumberEvalFactory;
- default:
- throw new AsterixException("Unknown external field RID number");
- }
+ case 0:
+ return fileNumberEvalFactory;
+ case 1:
+ return recordOffsetEvalFactory;
+ case 2:
+ return rowNumberEvalFactory;
+ default:
+ throw new AsterixException("Unknown external field RID number");
+ }
}
public static IBinaryComparatorFactory[] getBuddyBtreeComparatorFactories() {
return buddyBtreeComparatorFactories;
}
- public static int getRIDSize(Dataset dataset) {
- return getRIDSize(((ExternalDatasetDetails) dataset.getDatasetDetails()).getProperties().get(KEY_INPUT_FORMAT));
+ public static int getRIDSize(Map<String, String> properties) {
+ return getRIDSize(properties.get(KEY_INPUT_FORMAT));
}
- public static List<List<String>> getRIDKeys(Dataset dataset) {
- String fileInputFormat = ((ExternalDatasetDetails) dataset.getDatasetDetails()).getProperties().get(KEY_INPUT_FORMAT);
+ public static List<List<String>> getRIDKeys(Map<String, String> properties) {
+ String fileInputFormat = properties.get(KEY_INPUT_FORMAT);
if (fileInputFormat.equals(INPUT_FORMAT_RC) || fileInputFormat.equals(INPUT_FORMAT_RC_FULLY_QUALIFIED))
return RecordIDFields;
else
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/dataflow/AdmOrDelimitedControlledTupleParser.java b/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/dataflow/AdmOrDelimitedControlledTupleParser.java
index 3f7ff33..c94be6a 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/dataflow/AdmOrDelimitedControlledTupleParser.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/dataflow/AdmOrDelimitedControlledTupleParser.java
@@ -23,8 +23,8 @@
import java.io.IOException;
import java.nio.ByteBuffer;
+import org.apache.asterix.external.indexing.IndexingConstants;
import org.apache.asterix.external.indexing.input.AbstractHDFSLookupInputStream;
-import org.apache.asterix.metadata.external.IndexingConstants;
import org.apache.asterix.om.base.AInt32;
import org.apache.asterix.om.base.AInt64;
import org.apache.asterix.om.types.ARecordType;
@@ -72,7 +72,7 @@
public AdmOrDelimitedControlledTupleParser(IHyracksTaskContext ctx, ARecordType recType,
AbstractHDFSLookupInputStream in, boolean propagateInput, RecordDescriptor inRecDesc, IDataParser parser,
int[] propagatedFields, int[] ridFields, boolean retainNull, INullWriterFactory iNullWriterFactory)
- throws HyracksDataException {
+ throws HyracksDataException {
this.recType = recType;
this.in = in;
this.propagateInput = propagateInput;
@@ -136,16 +136,15 @@
} else {
// Get file number
bbis.setByteBuffer(frameBuffer, tupleStartOffset + fileNumberStartOffset);
- int fileNumber = ((AInt32) inRecDesc.getFields()[ridFields[IndexingConstants.FILE_NUMBER_FIELD_INDEX]]
- .deserialize(dis)).getIntegerValue();
+ int fileNumber = ((AInt32) inRecDesc
+ .getFields()[ridFields[IndexingConstants.FILE_NUMBER_FIELD_INDEX]].deserialize(dis))
+ .getIntegerValue();
// Get record offset
- bbis.setByteBuffer(
- frameBuffer,
- tupleStartOffset
- + tupleAccessor.getFieldStartOffset(tupleIndex,
- ridFields[IndexingConstants.RECORD_OFFSET_FIELD_INDEX]));
- long recordOffset = ((AInt64) inRecDesc.getFields()[ridFields[IndexingConstants.RECORD_OFFSET_FIELD_INDEX]]
- .deserialize(dis)).getLongValue();
+ bbis.setByteBuffer(frameBuffer, tupleStartOffset + tupleAccessor.getFieldStartOffset(tupleIndex,
+ ridFields[IndexingConstants.RECORD_OFFSET_FIELD_INDEX]));
+ long recordOffset = ((AInt64) inRecDesc
+ .getFields()[ridFields[IndexingConstants.RECORD_OFFSET_FIELD_INDEX]].deserialize(dis))
+ .getLongValue();
found = in.fetchRecord(fileNumber, recordOffset);
}
if (found) {
@@ -202,16 +201,14 @@
int tc = tupleAccessor.getTupleCount();
System.err.println("TC: " + tc);
for (int i = 0; i < tc; ++i) {
- System.err.print(i + ":(" + tupleAccessor.getTupleStartOffset(i) + ", "
- + tupleAccessor.getTupleEndOffset(i) + ")[");
+ System.err.print(
+ i + ":(" + tupleAccessor.getTupleStartOffset(i) + ", " + tupleAccessor.getTupleEndOffset(i) + ")[");
for (int j = 0; j < tupleAccessor.getFieldCount(); ++j) {
System.err.print(j + ":(" + tupleAccessor.getFieldStartOffset(i, j) + ", "
+ tupleAccessor.getFieldEndOffset(i, j) + ") ");
System.err.print("{");
- bbis.setByteBuffer(
- tupleAccessor.getBuffer(),
- tupleAccessor.getTupleStartOffset(i) + tupleAccessor.getFieldSlotsLength()
- + tupleAccessor.getFieldStartOffset(i, j));
+ bbis.setByteBuffer(tupleAccessor.getBuffer(), tupleAccessor.getTupleStartOffset(i)
+ + tupleAccessor.getFieldSlotsLength() + tupleAccessor.getFieldStartOffset(i, j));
try {
byte tag = dis.readByte();
if (tag == nullByte) {
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/dataflow/FileIndexTupleTranslator.java b/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/dataflow/FileIndexTupleTranslator.java
index ebba65b..9271ebe 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/dataflow/FileIndexTupleTranslator.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/dataflow/FileIndexTupleTranslator.java
@@ -22,9 +22,9 @@
import org.apache.asterix.builders.RecordBuilder;
import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.external.indexing.ExternalFile;
+import org.apache.asterix.external.indexing.FilesIndexDescription;
import org.apache.asterix.formats.nontagged.AqlSerializerDeserializerProvider;
-import org.apache.asterix.metadata.entities.ExternalFile;
-import org.apache.asterix.metadata.external.FilesIndexDescription;
import org.apache.asterix.om.base.ADateTime;
import org.apache.asterix.om.base.AInt64;
import org.apache.asterix.om.base.AMutableDateTime;
@@ -42,25 +42,30 @@
@SuppressWarnings("unchecked")
public class FileIndexTupleTranslator {
private final FilesIndexDescription filesIndexDescription = new FilesIndexDescription();
- private ArrayTupleBuilder tupleBuilder = new ArrayTupleBuilder(filesIndexDescription.FILE_INDEX_RECORD_DESCRIPTOR.getFieldCount());
+ private ArrayTupleBuilder tupleBuilder = new ArrayTupleBuilder(
+ filesIndexDescription.FILE_INDEX_RECORD_DESCRIPTOR.getFieldCount());
private RecordBuilder recordBuilder = new RecordBuilder();
private ArrayBackedValueStorage fieldValue = new ArrayBackedValueStorage();
private AMutableInt32 aInt32 = new AMutableInt32(0);
private AMutableInt64 aInt64 = new AMutableInt64(0);
private AMutableString aString = new AMutableString(null);
private AMutableDateTime aDateTime = new AMutableDateTime(0);
- private ISerializerDeserializer<AString> stringSerde = AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.ASTRING);
- private ISerializerDeserializer<ADateTime> dateTimeSerde = AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.ADATETIME);
- private ISerializerDeserializer<AInt64> longSerde = AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.AINT64);
+ private ISerializerDeserializer<AString> stringSerde = AqlSerializerDeserializerProvider.INSTANCE
+ .getSerializerDeserializer(BuiltinType.ASTRING);
+ private ISerializerDeserializer<ADateTime> dateTimeSerde = AqlSerializerDeserializerProvider.INSTANCE
+ .getSerializerDeserializer(BuiltinType.ADATETIME);
+ private ISerializerDeserializer<AInt64> longSerde = AqlSerializerDeserializerProvider.INSTANCE
+ .getSerializerDeserializer(BuiltinType.AINT64);
private ArrayTupleReference tuple = new ArrayTupleReference();
-
- public ITupleReference getTupleFromFile(ExternalFile file) throws IOException, AsterixException{
+
+ public ITupleReference getTupleFromFile(ExternalFile file) throws IOException, AsterixException {
tupleBuilder.reset();
//File Number
aInt32.setValue(file.getFileNumber());
- filesIndexDescription.FILE_INDEX_RECORD_DESCRIPTOR.getFields()[0].serialize(aInt32, tupleBuilder.getDataOutput());
+ filesIndexDescription.FILE_INDEX_RECORD_DESCRIPTOR.getFields()[0].serialize(aInt32,
+ tupleBuilder.getDataOutput());
tupleBuilder.addFieldEndOffset();
-
+
//File Record
recordBuilder.reset(filesIndexDescription.EXTERNAL_FILE_RECORD_TYPE);
// write field 0 (File Name)
@@ -68,19 +73,19 @@
aString.setValue(file.getFileName());
stringSerde.serialize(aString, fieldValue.getDataOutput());
recordBuilder.addField(0, fieldValue);
-
+
//write field 1 (File Size)
fieldValue.reset();
aInt64.setValue(file.getSize());
longSerde.serialize(aInt64, fieldValue.getDataOutput());
recordBuilder.addField(1, fieldValue);
-
+
//write field 2 (File Mod Date)
fieldValue.reset();
aDateTime.setValue(file.getLastModefiedTime().getTime());
dateTimeSerde.serialize(aDateTime, fieldValue.getDataOutput());
recordBuilder.addField(2, fieldValue);
-
+
//write the record
recordBuilder.write(tupleBuilder.getDataOutput(), true);
tupleBuilder.addFieldEndOffset();
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/dataflow/HDFSLookupAdapter.java b/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/dataflow/HDFSLookupAdapter.java
index c6013a9..d9ce7aa 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/dataflow/HDFSLookupAdapter.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/dataflow/HDFSLookupAdapter.java
@@ -25,13 +25,13 @@
import org.apache.asterix.external.adapter.factory.HDFSAdapterFactory;
import org.apache.asterix.external.adapter.factory.HDFSIndexingAdapterFactory;
+import org.apache.asterix.external.dataset.adapter.IControlledAdapter;
+import org.apache.asterix.external.indexing.ExternalFileIndexAccessor;
import org.apache.asterix.external.indexing.input.RCFileLookupReader;
import org.apache.asterix.external.indexing.input.SequenceFileLookupInputStream;
import org.apache.asterix.external.indexing.input.SequenceFileLookupReader;
import org.apache.asterix.external.indexing.input.TextFileLookupInputStream;
import org.apache.asterix.external.indexing.input.TextFileLookupReader;
-import org.apache.asterix.metadata.external.ExternalFileIndexAccessor;
-import org.apache.asterix.metadata.external.IControlledAdapter;
import org.apache.asterix.om.types.ARecordType;
import org.apache.asterix.om.types.IAType;
import org.apache.asterix.runtime.operators.file.ADMDataParser;
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/dataflow/HDFSLookupAdapterFactory.java b/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/dataflow/HDFSLookupAdapterFactory.java
index 505e1a9..fab507d 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/dataflow/HDFSLookupAdapterFactory.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/dataflow/HDFSLookupAdapterFactory.java
@@ -18,46 +18,14 @@
*/
package org.apache.asterix.external.indexing.dataflow;
-import java.util.List;
import java.util.Map;
-import org.apache.asterix.common.dataflow.IAsterixApplicationContextInfo;
-import org.apache.asterix.common.ioopcallbacks.LSMBTreeIOOperationCallbackFactory;
-import org.apache.asterix.metadata.MetadataException;
-import org.apache.asterix.metadata.MetadataManager;
-import org.apache.asterix.metadata.declared.AqlMetadataProvider;
-import org.apache.asterix.metadata.entities.Dataset;
-import org.apache.asterix.metadata.entities.ExternalDatasetDetails;
-import org.apache.asterix.metadata.entities.Index;
-import org.apache.asterix.metadata.external.ExternalFileIndexAccessor;
-import org.apache.asterix.metadata.external.ExternalLoopkupOperatorDiscriptor;
-import org.apache.asterix.metadata.external.IControlledAdapter;
-import org.apache.asterix.metadata.external.IControlledAdapterFactory;
-import org.apache.asterix.metadata.external.IndexingConstants;
-import org.apache.asterix.metadata.utils.DatasetUtils;
-import org.apache.asterix.metadata.utils.ExternalDatasetsRegistry;
-import org.apache.asterix.om.types.ATypeTag;
+import org.apache.asterix.external.adapter.factory.IControlledAdapterFactory;
+import org.apache.asterix.external.dataset.adapter.IControlledAdapter;
+import org.apache.asterix.external.indexing.ExternalFileIndexAccessor;
import org.apache.asterix.om.types.IAType;
-import org.apache.asterix.transaction.management.opcallbacks.SecondaryIndexOperationTrackerProvider;
-import org.apache.asterix.transaction.management.opcallbacks.SecondaryIndexSearchOperationCallbackFactory;
-import org.apache.asterix.transaction.management.service.transaction.AsterixRuntimeComponentsProvider;
-import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
-import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
-import org.apache.hyracks.algebricks.common.utils.Pair;
-import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
-import org.apache.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
-import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenContext;
-import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenHelper;
import org.apache.hyracks.api.context.IHyracksTaskContext;
-import org.apache.hyracks.api.dataflow.IOperatorDescriptor;
import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
-import org.apache.hyracks.api.job.JobSpecification;
-import org.apache.hyracks.dataflow.std.file.IFileSplitProvider;
-import org.apache.hyracks.storage.am.common.api.ISearchOperationCallbackFactory;
-import org.apache.hyracks.storage.am.common.impls.NoOpOperationCallbackFactory;
-import org.apache.hyracks.storage.am.lsm.btree.dataflow.ExternalBTreeDataflowHelperFactory;
-import org.apache.hyracks.storage.am.lsm.common.api.ILSMMergePolicyFactory;
// This class takes care of creating the adapter based on the formats and input format
public class HDFSLookupAdapterFactory implements IControlledAdapterFactory {
@@ -112,71 +80,4 @@
}
}
}
-
- /*
- * This function creates an operator that uses the built indexes in asterix to perform record lookup over external data
- */
- public static Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> buildExternalDataLookupRuntime(
- JobSpecification jobSpec, Dataset dataset, Index secondaryIndex, int[] ridIndexes, boolean retainInput,
- IVariableTypeEnvironment typeEnv, List<LogicalVariable> outputVars, IOperatorSchema opSchema,
- JobGenContext context, AqlMetadataProvider metadataProvider, boolean retainNull) throws AlgebricksException {
-
- // Get data type
- IAType itemType = null;
- try {
- itemType = MetadataManager.INSTANCE.getDatatype(metadataProvider.getMetadataTxnContext(),
- dataset.getDataverseName(), dataset.getItemTypeName()).getDatatype();
- } catch (MetadataException e) {
- e.printStackTrace();
- throw new AlgebricksException("Unable to get item type from metadata " + e);
- }
- if (itemType.getTypeTag() != ATypeTag.RECORD) {
- throw new AlgebricksException("Can only scan datasets of records.");
- }
-
- // Create the adapter factory <- right now there is only one. if there are more in the future, we can create a map->
- ExternalDatasetDetails datasetDetails = (ExternalDatasetDetails) dataset.getDatasetDetails();
- HDFSLookupAdapterFactory adapterFactory = new HDFSLookupAdapterFactory();
- adapterFactory.configure(itemType, retainInput, ridIndexes, datasetDetails.getProperties(), retainNull);
-
- Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo;
- try {
- compactionInfo = DatasetUtils.getMergePolicyFactory(dataset, metadataProvider.getMetadataTxnContext());
- } catch (MetadataException e) {
- throw new AlgebricksException(" Unabel to create merge policy factory for external dataset", e);
- }
-
- boolean temp = dataset.getDatasetDetails().isTemp();
- // Create the file index data flow helper
- ExternalBTreeDataflowHelperFactory indexDataflowHelperFactory = new ExternalBTreeDataflowHelperFactory(
- compactionInfo.first, compactionInfo.second, new SecondaryIndexOperationTrackerProvider(
- dataset.getDatasetId()), AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
- LSMBTreeIOOperationCallbackFactory.INSTANCE, metadataProvider.getStorageProperties()
- .getBloomFilterFalsePositiveRate(), ExternalDatasetsRegistry.INSTANCE.getAndLockDatasetVersion(
- dataset, metadataProvider), !temp);
-
- // Create the out record descriptor, appContext and fileSplitProvider for the files index
- RecordDescriptor outRecDesc = JobGenHelper.mkRecordDescriptor(typeEnv, opSchema, context);
- IAsterixApplicationContextInfo appContext = (IAsterixApplicationContextInfo) context.getAppContext();
- Pair<IFileSplitProvider, AlgebricksPartitionConstraint> spPc;
- try {
- spPc = metadataProvider.splitProviderAndPartitionConstraintsForFilesIndex(dataset.getDataverseName(),
- dataset.getDatasetName(),
- dataset.getDatasetName().concat(IndexingConstants.EXTERNAL_FILE_INDEX_NAME_SUFFIX), false);
- } catch (Exception e) {
- throw new AlgebricksException(e);
- }
-
- ISearchOperationCallbackFactory searchOpCallbackFactory = temp ? NoOpOperationCallbackFactory.INSTANCE
- : new SecondaryIndexSearchOperationCallbackFactory();
- // Create the operator
- ExternalLoopkupOperatorDiscriptor op = new ExternalLoopkupOperatorDiscriptor(jobSpec, adapterFactory,
- outRecDesc, indexDataflowHelperFactory, retainInput, appContext.getIndexLifecycleManagerProvider(),
- appContext.getStorageManagerInterface(), spPc.first, dataset.getDatasetId(), metadataProvider
- .getStorageProperties().getBloomFilterFalsePositiveRate(), searchOpCallbackFactory, retainNull,
- context.getNullWriterFactory());
-
- // Return value
- return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(op, spPc.second);
- }
}
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/dataflow/RCFileControlledTupleParser.java b/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/dataflow/RCFileControlledTupleParser.java
index 4549e56..c8e9c65 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/dataflow/RCFileControlledTupleParser.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/dataflow/RCFileControlledTupleParser.java
@@ -23,8 +23,8 @@
import java.io.IOException;
import java.nio.ByteBuffer;
+import org.apache.asterix.external.indexing.IndexingConstants;
import org.apache.asterix.external.indexing.input.RCFileLookupReader;
-import org.apache.asterix.metadata.external.IndexingConstants;
import org.apache.asterix.om.base.AInt32;
import org.apache.asterix.om.base.AInt64;
import org.apache.asterix.om.types.ATypeTag;
@@ -127,23 +127,17 @@
// Get file number
bbis.setByteBuffer(frameBuffer, tupleStartOffset + fileNumberStartOffset);
int fileNumber = ((AInt32) inRecDesc
- .getFields()[ridFields[IndexingConstants.FILE_NUMBER_FIELD_INDEX]]
- .deserialize(dis)).getIntegerValue();
+ .getFields()[ridFields[IndexingConstants.FILE_NUMBER_FIELD_INDEX]].deserialize(dis))
+ .getIntegerValue();
// Get record group offset
- bbis.setByteBuffer(
- frameBuffer,
- tupleStartOffset
- + tupleAccessor.getFieldStartOffset(tupleIndex,
- ridFields[IndexingConstants.RECORD_OFFSET_FIELD_INDEX]));
+ bbis.setByteBuffer(frameBuffer, tupleStartOffset + tupleAccessor.getFieldStartOffset(tupleIndex,
+ ridFields[IndexingConstants.RECORD_OFFSET_FIELD_INDEX]));
long recordOffset = ((AInt64) inRecDesc
- .getFields()[ridFields[IndexingConstants.RECORD_OFFSET_FIELD_INDEX]]
- .deserialize(dis)).getLongValue();
+ .getFields()[ridFields[IndexingConstants.RECORD_OFFSET_FIELD_INDEX]].deserialize(dis))
+ .getLongValue();
// Get row number
- bbis.setByteBuffer(
- frameBuffer,
- tupleStartOffset
- + tupleAccessor.getFieldStartOffset(tupleIndex,
- ridFields[IndexingConstants.ROW_NUMBER_FIELD_INDEX]));
+ bbis.setByteBuffer(frameBuffer, tupleStartOffset + tupleAccessor.getFieldStartOffset(tupleIndex,
+ ridFields[IndexingConstants.ROW_NUMBER_FIELD_INDEX]));
int rowNumber = ((AInt32) inRecDesc.getFields()[ridFields[IndexingConstants.ROW_NUMBER_FIELD_INDEX]]
.deserialize(dis)).getIntegerValue();
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/dataflow/SeqOrTxtControlledTupleParser.java b/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/dataflow/SeqOrTxtControlledTupleParser.java
index 2e3097c..23ddd8a 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/dataflow/SeqOrTxtControlledTupleParser.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/dataflow/SeqOrTxtControlledTupleParser.java
@@ -23,8 +23,8 @@
import java.io.IOException;
import java.nio.ByteBuffer;
+import org.apache.asterix.external.indexing.IndexingConstants;
import org.apache.asterix.external.indexing.input.ILookupReader;
-import org.apache.asterix.metadata.external.IndexingConstants;
import org.apache.asterix.om.base.AInt32;
import org.apache.asterix.om.base.AInt64;
import org.apache.asterix.om.types.ATypeTag;
@@ -60,9 +60,9 @@
protected byte nullByte;
protected ArrayTupleBuilder nullTupleBuild;
- public SeqOrTxtControlledTupleParser(IHyracksTaskContext ctx, IAsterixHDFSRecordParser parser,
- ILookupReader reader, boolean propagateInput, int[] propagatedFields, RecordDescriptor inRecDesc,
- int[] ridFields, boolean retainNull, INullWriterFactory iNullWriterFactory) throws HyracksDataException {
+ public SeqOrTxtControlledTupleParser(IHyracksTaskContext ctx, IAsterixHDFSRecordParser parser, ILookupReader reader,
+ boolean propagateInput, int[] propagatedFields, RecordDescriptor inRecDesc, int[] ridFields,
+ boolean retainNull, INullWriterFactory iNullWriterFactory) throws HyracksDataException {
appender = new FrameTupleAppender(new VSizeFrame(ctx));
this.parser = parser;
this.reader = reader;
@@ -125,16 +125,15 @@
} else {
// Get file number
bbis.setByteBuffer(frameBuffer, tupleStartOffset + fileNumberStartOffset);
- int fileNumber = ((AInt32) inRecDesc.getFields()[ridFields[IndexingConstants.FILE_NUMBER_FIELD_INDEX]]
- .deserialize(dis)).getIntegerValue();
+ int fileNumber = ((AInt32) inRecDesc
+ .getFields()[ridFields[IndexingConstants.FILE_NUMBER_FIELD_INDEX]].deserialize(dis))
+ .getIntegerValue();
// Get record offset
- bbis.setByteBuffer(
- frameBuffer,
- tupleStartOffset
- + tupleAccessor.getFieldStartOffset(tupleIndex,
- ridFields[IndexingConstants.RECORD_OFFSET_FIELD_INDEX]));
- long recordOffset = ((AInt64) inRecDesc.getFields()[ridFields[IndexingConstants.RECORD_OFFSET_FIELD_INDEX]]
- .deserialize(dis)).getLongValue();
+ bbis.setByteBuffer(frameBuffer, tupleStartOffset + tupleAccessor.getFieldStartOffset(tupleIndex,
+ ridFields[IndexingConstants.RECORD_OFFSET_FIELD_INDEX]));
+ long recordOffset = ((AInt64) inRecDesc
+ .getFields()[ridFields[IndexingConstants.RECORD_OFFSET_FIELD_INDEX]].deserialize(dis))
+ .getLongValue();
// Read the record
record = reader.read(fileNumber, recordOffset);
}
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/input/AbstractHDFSLookupInputStream.java b/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/input/AbstractHDFSLookupInputStream.java
index 2bd2a95..563a46d 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/input/AbstractHDFSLookupInputStream.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/input/AbstractHDFSLookupInputStream.java
@@ -23,8 +23,8 @@
import java.io.InputStream;
import org.apache.asterix.common.config.DatasetConfig.ExternalFilePendingOp;
-import org.apache.asterix.metadata.entities.ExternalFile;
-import org.apache.asterix.metadata.external.ExternalFileIndexAccessor;
+import org.apache.asterix.external.indexing.ExternalFile;
+import org.apache.asterix.external.indexing.ExternalFileIndexAccessor;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/input/GenericFileAwareRecordReader.java b/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/input/GenericFileAwareRecordReader.java
index 6dbf464..ba36407 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/input/GenericFileAwareRecordReader.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/input/GenericFileAwareRecordReader.java
@@ -21,7 +21,7 @@
import java.io.IOException;
import java.util.List;
-import org.apache.asterix.metadata.entities.ExternalFile;
+import org.apache.asterix.external.indexing.ExternalFile;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/input/RCFileDataReader.java b/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/input/RCFileDataReader.java
index 4b89f59..50853d4 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/input/RCFileDataReader.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/input/RCFileDataReader.java
@@ -21,7 +21,7 @@
import java.io.IOException;
import java.util.List;
-import org.apache.asterix.metadata.entities.ExternalFile;
+import org.apache.asterix.external.indexing.ExternalFile;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/input/RCFileLookupReader.java b/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/input/RCFileLookupReader.java
index 15eb1e2..f312228 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/input/RCFileLookupReader.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/input/RCFileLookupReader.java
@@ -21,6 +21,9 @@
import java.io.FileNotFoundException;
import java.io.IOException;
+import org.apache.asterix.common.config.DatasetConfig.ExternalFilePendingOp;
+import org.apache.asterix.external.indexing.ExternalFile;
+import org.apache.asterix.external.indexing.ExternalFileIndexAccessor;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
@@ -30,10 +33,6 @@
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Writable;
-import org.apache.asterix.common.config.DatasetConfig.ExternalFilePendingOp;
-import org.apache.asterix.metadata.entities.ExternalFile;
-import org.apache.asterix.metadata.external.ExternalFileIndexAccessor;
-
public class RCFileLookupReader {
private FileSystem fs;
private Configuration conf;
@@ -85,7 +84,7 @@
// Seek to the record group if needed
if (recordGroupOffset != this.recordGroupOffset) {
this.recordGroupOffset = recordGroupOffset;
- if(reader.getPosition() != recordGroupOffset)
+ if (reader.getPosition() != recordGroupOffset)
reader.seek(recordGroupOffset);
reader.resetBuffer();
this.rowNumber = -1;
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/input/SequenceFileLookupInputStream.java b/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/input/SequenceFileLookupInputStream.java
index 27e11fd1..e787921 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/input/SequenceFileLookupInputStream.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/input/SequenceFileLookupInputStream.java
@@ -20,6 +20,7 @@
import java.io.IOException;
+import org.apache.asterix.external.indexing.ExternalFileIndexAccessor;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.SequenceFile;
@@ -28,31 +29,28 @@
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.util.ReflectionUtils;
-import org.apache.asterix.metadata.external.ExternalFileIndexAccessor;
-
-
@SuppressWarnings("deprecation")
-public class SequenceFileLookupInputStream extends AbstractHDFSLookupInputStream{
+public class SequenceFileLookupInputStream extends AbstractHDFSLookupInputStream {
private SequenceFile.Reader reader;
private Writable seqKey;
private Text seqValue = new Text();
private Configuration conf;
-
- public SequenceFileLookupInputStream(ExternalFileIndexAccessor fileIndexAccessor, JobConf conf) throws IOException{
+
+ public SequenceFileLookupInputStream(ExternalFileIndexAccessor fileIndexAccessor, JobConf conf) throws IOException {
super(fileIndexAccessor, conf);
this.conf = conf;
}
-
+
@Override
protected void openFile(String fileName) throws IOException {
if (reader != null) {
reader.close();
}
reader = new SequenceFile.Reader(fs, new Path(fileName), conf);
- seqKey = (Writable) ReflectionUtils.newInstance(reader.getKeyClass(), conf);
+ seqKey = (Writable) ReflectionUtils.newInstance(reader.getKeyClass(), conf);
}
-
+
@Override
public void close() throws IOException {
if (reader != null) {
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/input/SequenceFileLookupReader.java b/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/input/SequenceFileLookupReader.java
index ed146e9..76b3660 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/input/SequenceFileLookupReader.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/input/SequenceFileLookupReader.java
@@ -21,6 +21,9 @@
import java.io.FileNotFoundException;
import java.io.IOException;
+import org.apache.asterix.common.config.DatasetConfig.ExternalFilePendingOp;
+import org.apache.asterix.external.indexing.ExternalFile;
+import org.apache.asterix.external.indexing.ExternalFileIndexAccessor;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
@@ -30,10 +33,6 @@
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.util.ReflectionUtils;
-import org.apache.asterix.common.config.DatasetConfig.ExternalFilePendingOp;
-import org.apache.asterix.metadata.entities.ExternalFile;
-import org.apache.asterix.metadata.external.ExternalFileIndexAccessor;
-
public class SequenceFileLookupReader implements ILookupReader {
private Reader reader;
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/input/TextFileLookupInputStream.java b/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/input/TextFileLookupInputStream.java
index 1bf2572..ea82c18 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/input/TextFileLookupInputStream.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/input/TextFileLookupInputStream.java
@@ -20,12 +20,11 @@
import java.io.IOException;
+import org.apache.asterix.external.indexing.ExternalFileIndexAccessor;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobConf;
-import org.apache.asterix.metadata.external.ExternalFileIndexAccessor;
-
public class TextFileLookupInputStream extends AbstractHDFSLookupInputStream {
private HDFSSeekableLineReader lineReader = new HDFSSeekableLineReader();
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/input/TextFileLookupReader.java b/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/input/TextFileLookupReader.java
index 1214f43..5864df2 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/input/TextFileLookupReader.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/input/TextFileLookupReader.java
@@ -21,16 +21,15 @@
import java.io.FileNotFoundException;
import java.io.IOException;
+import org.apache.asterix.common.config.DatasetConfig.ExternalFilePendingOp;
+import org.apache.asterix.external.indexing.ExternalFile;
+import org.apache.asterix.external.indexing.ExternalFileIndexAccessor;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
-import org.apache.asterix.common.config.DatasetConfig.ExternalFilePendingOp;
-import org.apache.asterix.metadata.entities.ExternalFile;
-import org.apache.asterix.metadata.external.ExternalFileIndexAccessor;
-
public class TextFileLookupReader implements ILookupReader {
private FileSystem fs;
private int fileNumber = -1;
@@ -78,14 +77,15 @@
}
private void openFile(String FileName) throws IOException {
- if(lineReader.getReader() != null){
+ if (lineReader.getReader() != null) {
lineReader.getReader().close();
}
lineReader.resetReader(fs.open(new Path(FileName)));
}
+ @Override
public void close() {
- if (lineReader.getReader() != null){
+ if (lineReader.getReader() != null) {
try {
lineReader.getReader().close();
} catch (IOException e) {
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/input/TextualDataReader.java b/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/input/TextualDataReader.java
index 797b961..5e4f013 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/input/TextualDataReader.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/input/TextualDataReader.java
@@ -21,7 +21,7 @@
import java.io.IOException;
import java.util.List;
-import org.apache.asterix.metadata.entities.ExternalFile;
+import org.apache.asterix.external.indexing.ExternalFile;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/operators/ExternalIndexBulkModifyOperatorNodePushable.java b/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/operators/ExternalIndexBulkModifyOperatorNodePushable.java
index cc7a212..a9c9ac7 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/operators/ExternalIndexBulkModifyOperatorNodePushable.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/operators/ExternalIndexBulkModifyOperatorNodePushable.java
@@ -20,7 +20,7 @@
import java.nio.ByteBuffer;
-import org.apache.asterix.metadata.external.FilesIndexDescription;
+import org.apache.asterix.external.indexing.FilesIndexDescription;
import org.apache.asterix.om.base.AMutableInt32;
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
diff --git a/asterix-metadata/src/main/java/org/apache/asterix/metadata/external/ExternalLoopkupOperatorDiscriptor.java b/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/operators/ExternalLoopkupOperatorDiscriptor.java
similarity index 94%
rename from asterix-metadata/src/main/java/org/apache/asterix/metadata/external/ExternalLoopkupOperatorDiscriptor.java
rename to asterix-external-data/src/main/java/org/apache/asterix/external/indexing/operators/ExternalLoopkupOperatorDiscriptor.java
index be1603b..ca2e7ca 100644
--- a/asterix-metadata/src/main/java/org/apache/asterix/metadata/external/ExternalLoopkupOperatorDiscriptor.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/operators/ExternalLoopkupOperatorDiscriptor.java
@@ -16,10 +16,14 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.asterix.metadata.external;
+package org.apache.asterix.external.indexing.operators;
import java.nio.ByteBuffer;
+import org.apache.asterix.external.adapter.factory.IControlledAdapterFactory;
+import org.apache.asterix.external.dataset.adapter.IControlledAdapter;
+import org.apache.asterix.external.indexing.ExternalFileIndexAccessor;
+import org.apache.asterix.external.indexing.FilesIndexDescription;
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.dataflow.IOperatorNodePushable;
import org.apache.hyracks.api.dataflow.value.INullWriterFactory;
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/library/ExternalFunction.java b/asterix-external-data/src/main/java/org/apache/asterix/external/library/ExternalFunction.java
index e769ad1..14f831b 100755
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/library/ExternalFunction.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/library/ExternalFunction.java
@@ -22,7 +22,6 @@
import org.apache.asterix.common.exceptions.AsterixException;
import org.apache.asterix.formats.nontagged.AqlSerializerDeserializerProvider;
-import org.apache.asterix.metadata.functions.ExternalLibraryManager;
import org.apache.asterix.om.functions.IExternalFunctionInfo;
import org.apache.asterix.om.types.ATypeTag;
import org.apache.asterix.om.types.EnumDeserializer;
diff --git a/asterix-metadata/src/main/java/org/apache/asterix/metadata/functions/ExternalLibraryManager.java b/asterix-external-data/src/main/java/org/apache/asterix/external/library/ExternalLibraryManager.java
similarity index 97%
rename from asterix-metadata/src/main/java/org/apache/asterix/metadata/functions/ExternalLibraryManager.java
rename to asterix-external-data/src/main/java/org/apache/asterix/external/library/ExternalLibraryManager.java
index 296d7ee..629c770 100755
--- a/asterix-metadata/src/main/java/org/apache/asterix/metadata/functions/ExternalLibraryManager.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/library/ExternalLibraryManager.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.asterix.metadata.functions;
+package org.apache.asterix.external.library;
import java.util.HashMap;
import java.util.Map;
diff --git a/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedAdapterFactory.java b/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedAdapterFactory.java
index 0100ea1..c177a58 100644
--- a/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedAdapterFactory.java
+++ b/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedAdapterFactory.java
@@ -22,7 +22,8 @@
import org.apache.asterix.common.feeds.api.IDatasourceAdapter;
import org.apache.asterix.common.feeds.api.IIntakeProgressTracker;
-import org.apache.asterix.metadata.feeds.IFeedAdapterFactory;
+import org.apache.asterix.external.adapter.factory.IAdapterFactory.SupportedOperation;
+import org.apache.asterix.external.adapter.factory.IFeedAdapterFactory;
import org.apache.asterix.om.types.ARecordType;
import org.apache.asterix.runtime.operators.file.AsterixTupleParserFactory;
import org.apache.asterix.runtime.operators.file.AsterixTupleParserFactory.InputDataFormat;
diff --git a/asterix-lang-aql/src/main/java/org/apache/asterix/lang/aql/statement/SubscribeFeedStatement.java b/asterix-lang-aql/src/main/java/org/apache/asterix/lang/aql/statement/SubscribeFeedStatement.java
index e6f1999..a62abaa 100644
--- a/asterix-lang-aql/src/main/java/org/apache/asterix/lang/aql/statement/SubscribeFeedStatement.java
+++ b/asterix-lang-aql/src/main/java/org/apache/asterix/lang/aql/statement/SubscribeFeedStatement.java
@@ -29,6 +29,7 @@
import org.apache.asterix.common.feeds.FeedId;
import org.apache.asterix.common.feeds.FeedPolicyAccessor;
import org.apache.asterix.common.functions.FunctionSignature;
+import org.apache.asterix.external.adapter.factory.IFeedAdapterFactory;
import org.apache.asterix.lang.aql.parser.AQLParserFactory;
import org.apache.asterix.lang.common.base.IParser;
import org.apache.asterix.lang.common.base.IParserFactory;
@@ -46,7 +47,6 @@
import org.apache.asterix.metadata.entities.PrimaryFeed;
import org.apache.asterix.metadata.entities.SecondaryFeed;
import org.apache.asterix.metadata.feeds.FeedUtil;
-import org.apache.asterix.metadata.feeds.IFeedAdapterFactory;
import org.apache.asterix.om.types.ARecordType;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.algebricks.common.utils.Triple;
diff --git a/asterix-metadata/pom.xml b/asterix-metadata/pom.xml
index 301b06c..d665a15 100644
--- a/asterix-metadata/pom.xml
+++ b/asterix-metadata/pom.xml
@@ -17,71 +17,73 @@
! under the License.
!-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
- <modelVersion>4.0.0</modelVersion>
- <parent>
- <artifactId>asterix</artifactId>
- <groupId>org.apache.asterix</groupId>
- <version>0.8.8-SNAPSHOT</version>
- </parent>
- <artifactId>asterix-metadata</artifactId>
-
- <licenses>
- <license>
- <name>Apache License, Version 2.0</name>
- <url>http://www.apache.org/licenses/LICENSE-2.0.txt</url>
- <distribution>repo</distribution>
- <comments>A business-friendly OSS license</comments>
- </license>
- </licenses>
-
- <dependencies>
- <dependency>
- <groupId>org.apache.asterix</groupId>
- <artifactId>asterix-common</artifactId>
- <version>0.8.8-SNAPSHOT</version>
- <scope>compile</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.asterix</groupId>
- <artifactId>asterix-om</artifactId>
- <version>0.8.8-SNAPSHOT</version>
- <scope>compile</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.asterix</groupId>
- <artifactId>asterix-runtime</artifactId>
- <version>0.8.8-SNAPSHOT</version>
- <scope>compile</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.asterix</groupId>
- <artifactId>asterix-events</artifactId>
- <version>0.8.8-SNAPSHOT</version>
- <scope>compile</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-client</artifactId>
- <type>jar</type>
- <scope>compile</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.hyracks</groupId>
- <artifactId>hyracks-storage-am-lsm-invertedindex</artifactId>
- </dependency>
- <dependency>
- <groupId>org.apache.hyracks</groupId>
- <artifactId>hyracks-storage-am-lsm-btree</artifactId>
- </dependency>
- <dependency>
- <groupId>org.apache.hyracks</groupId>
- <artifactId>hyracks-storage-am-lsm-rtree</artifactId>
- </dependency>
- <dependency>
- <groupId>org.apache.hyracks</groupId>
- <artifactId>hyracks-hdfs-core</artifactId>
- <version>${hyracks.version}</version>
- </dependency>
-
- </dependencies>
-</project>
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <artifactId>asterix</artifactId>
+ <groupId>org.apache.asterix</groupId>
+ <version>0.8.8-SNAPSHOT</version>
+ </parent>
+ <artifactId>asterix-metadata</artifactId>
+ <licenses>
+ <license>
+ <name>Apache License, Version 2.0</name>
+ <url>http://www.apache.org/licenses/LICENSE-2.0.txt</url>
+ <distribution>repo</distribution>
+ <comments>A business-friendly OSS license</comments>
+ </license>
+ </licenses>
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.asterix</groupId>
+ <artifactId>asterix-common</artifactId>
+ <version>0.8.8-SNAPSHOT</version>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.asterix</groupId>
+ <artifactId>asterix-om</artifactId>
+ <version>0.8.8-SNAPSHOT</version>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.asterix</groupId>
+ <artifactId>asterix-runtime</artifactId>
+ <version>0.8.8-SNAPSHOT</version>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.asterix</groupId>
+ <artifactId>asterix-events</artifactId>
+ <version>0.8.8-SNAPSHOT</version>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-client</artifactId>
+ <type>jar</type>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hyracks</groupId>
+ <artifactId>hyracks-storage-am-lsm-invertedindex</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hyracks</groupId>
+ <artifactId>hyracks-storage-am-lsm-btree</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hyracks</groupId>
+ <artifactId>hyracks-storage-am-lsm-rtree</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hyracks</groupId>
+ <artifactId>hyracks-hdfs-core</artifactId>
+ <version>${hyracks.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.asterix</groupId>
+ <artifactId>asterix-external-data</artifactId>
+ <version>0.8.8-SNAPSHOT</version>
+ </dependency>
+ </dependencies>
+</project>
\ No newline at end of file
diff --git a/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataManager.java b/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataManager.java
index 602cb4c..d748ef8 100644
--- a/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataManager.java
+++ b/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataManager.java
@@ -29,6 +29,7 @@
import org.apache.asterix.common.exceptions.ACIDException;
import org.apache.asterix.common.functions.FunctionSignature;
import org.apache.asterix.common.transactions.JobId;
+import org.apache.asterix.external.indexing.ExternalFile;
import org.apache.asterix.metadata.api.IAsterixStateProxy;
import org.apache.asterix.metadata.api.IMetadataManager;
import org.apache.asterix.metadata.api.IMetadataNode;
@@ -37,7 +38,6 @@
import org.apache.asterix.metadata.entities.DatasourceAdapter;
import org.apache.asterix.metadata.entities.Datatype;
import org.apache.asterix.metadata.entities.Dataverse;
-import org.apache.asterix.metadata.entities.ExternalFile;
import org.apache.asterix.metadata.entities.Feed;
import org.apache.asterix.metadata.entities.FeedPolicy;
import org.apache.asterix.metadata.entities.Function;
diff --git a/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataNode.java b/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataNode.java
index c402bef..9f897f5 100644
--- a/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataNode.java
+++ b/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataNode.java
@@ -34,6 +34,7 @@
import org.apache.asterix.common.transactions.AbstractOperationCallback;
import org.apache.asterix.common.transactions.DatasetId;
import org.apache.asterix.common.transactions.IRecoveryManager.ResourceType;
+import org.apache.asterix.external.indexing.ExternalFile;
import org.apache.asterix.common.transactions.ITransactionContext;
import org.apache.asterix.common.transactions.ITransactionSubsystem;
import org.apache.asterix.common.transactions.JobId;
@@ -49,7 +50,6 @@
import org.apache.asterix.metadata.entities.DatasourceAdapter;
import org.apache.asterix.metadata.entities.Datatype;
import org.apache.asterix.metadata.entities.Dataverse;
-import org.apache.asterix.metadata.entities.ExternalFile;
import org.apache.asterix.metadata.entities.Feed;
import org.apache.asterix.metadata.entities.FeedPolicy;
import org.apache.asterix.metadata.entities.Function;
diff --git a/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IMetadataManager.java b/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IMetadataManager.java
index 285e36c..9d85fb5 100644
--- a/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IMetadataManager.java
+++ b/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IMetadataManager.java
@@ -24,6 +24,7 @@
import org.apache.asterix.common.exceptions.ACIDException;
import org.apache.asterix.common.functions.FunctionSignature;
+import org.apache.asterix.external.indexing.ExternalFile;
import org.apache.asterix.metadata.MetadataException;
import org.apache.asterix.metadata.MetadataTransactionContext;
import org.apache.asterix.metadata.entities.CompactionPolicy;
@@ -31,7 +32,6 @@
import org.apache.asterix.metadata.entities.DatasourceAdapter;
import org.apache.asterix.metadata.entities.Datatype;
import org.apache.asterix.metadata.entities.Dataverse;
-import org.apache.asterix.metadata.entities.ExternalFile;
import org.apache.asterix.metadata.entities.Feed;
import org.apache.asterix.metadata.entities.FeedPolicy;
import org.apache.asterix.metadata.entities.Function;
diff --git a/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IMetadataNode.java b/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IMetadataNode.java
index 68955a8..59a8f76 100644
--- a/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IMetadataNode.java
+++ b/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IMetadataNode.java
@@ -27,13 +27,13 @@
import org.apache.asterix.common.exceptions.ACIDException;
import org.apache.asterix.common.functions.FunctionSignature;
import org.apache.asterix.common.transactions.JobId;
+import org.apache.asterix.external.indexing.ExternalFile;
import org.apache.asterix.metadata.MetadataException;
import org.apache.asterix.metadata.entities.CompactionPolicy;
import org.apache.asterix.metadata.entities.Dataset;
import org.apache.asterix.metadata.entities.DatasourceAdapter;
import org.apache.asterix.metadata.entities.Datatype;
import org.apache.asterix.metadata.entities.Dataverse;
-import org.apache.asterix.metadata.entities.ExternalFile;
import org.apache.asterix.metadata.entities.Feed;
import org.apache.asterix.metadata.entities.FeedPolicy;
import org.apache.asterix.metadata.entities.Function;
diff --git a/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java b/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java
index d76af86..f22b2f1 100644
--- a/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java
+++ b/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java
@@ -39,6 +39,8 @@
import org.apache.asterix.common.config.IAsterixPropertiesProvider;
import org.apache.asterix.common.context.BaseOperationTracker;
import org.apache.asterix.common.ioopcallbacks.LSMBTreeIOOperationCallbackFactory;
+import org.apache.asterix.external.adapter.factory.IAdapterFactory;
+import org.apache.asterix.external.indexing.ExternalFile;
import org.apache.asterix.metadata.IDatasetDetails;
import org.apache.asterix.metadata.MetadataException;
import org.apache.asterix.metadata.MetadataManager;
@@ -51,7 +53,6 @@
import org.apache.asterix.metadata.entities.DatasourceAdapter;
import org.apache.asterix.metadata.entities.Datatype;
import org.apache.asterix.metadata.entities.Dataverse;
-import org.apache.asterix.metadata.entities.ExternalFile;
import org.apache.asterix.metadata.entities.FeedPolicy;
import org.apache.asterix.metadata.entities.Index;
import org.apache.asterix.metadata.entities.InternalDatasetDetails;
@@ -59,7 +60,6 @@
import org.apache.asterix.metadata.entities.InternalDatasetDetails.PartitioningStrategy;
import org.apache.asterix.metadata.entities.Node;
import org.apache.asterix.metadata.entities.NodeGroup;
-import org.apache.asterix.metadata.external.IAdapterFactory;
import org.apache.asterix.metadata.feeds.AdapterIdentifier;
import org.apache.asterix.metadata.feeds.BuiltinFeedPolicies;
import org.apache.asterix.om.types.BuiltinType;
diff --git a/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/AqlMetadataProvider.java b/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/AqlMetadataProvider.java
index d31dfcf..d61d323 100644
--- a/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/AqlMetadataProvider.java
+++ b/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/AqlMetadataProvider.java
@@ -56,6 +56,13 @@
import org.apache.asterix.common.transactions.IRecoveryManager.ResourceType;
import org.apache.asterix.common.transactions.JobId;
import org.apache.asterix.dataflow.data.nontagged.valueproviders.AqlPrimitiveValueProviderFactory;
+import org.apache.asterix.external.adapter.factory.IAdapterFactory;
+import org.apache.asterix.external.adapter.factory.IAdapterFactory.SupportedOperation;
+import org.apache.asterix.external.adapter.factory.IFeedAdapterFactory;
+import org.apache.asterix.external.indexing.ExternalFile;
+import org.apache.asterix.external.indexing.IndexingConstants;
+import org.apache.asterix.external.indexing.dataflow.HDFSLookupAdapterFactory;
+import org.apache.asterix.external.indexing.operators.ExternalLoopkupOperatorDiscriptor;
import org.apache.asterix.formats.base.IDataFormat;
import org.apache.asterix.formats.nontagged.AqlBinaryComparatorFactoryProvider;
import org.apache.asterix.formats.nontagged.AqlLinearizeComparatorFactoryProvider;
@@ -72,21 +79,16 @@
import org.apache.asterix.metadata.entities.Datatype;
import org.apache.asterix.metadata.entities.Dataverse;
import org.apache.asterix.metadata.entities.ExternalDatasetDetails;
-import org.apache.asterix.metadata.entities.ExternalFile;
import org.apache.asterix.metadata.entities.Feed;
import org.apache.asterix.metadata.entities.FeedPolicy;
import org.apache.asterix.metadata.entities.Index;
import org.apache.asterix.metadata.entities.InternalDatasetDetails;
import org.apache.asterix.metadata.entities.PrimaryFeed;
-import org.apache.asterix.metadata.external.IAdapterFactory;
-import org.apache.asterix.metadata.external.IAdapterFactory.SupportedOperation;
-import org.apache.asterix.metadata.external.IndexingConstants;
import org.apache.asterix.metadata.feeds.BuiltinFeedPolicies;
import org.apache.asterix.metadata.feeds.ExternalDataScanOperatorDescriptor;
import org.apache.asterix.metadata.feeds.FeedCollectOperatorDescriptor;
import org.apache.asterix.metadata.feeds.FeedIntakeOperatorDescriptor;
import org.apache.asterix.metadata.feeds.FeedUtil;
-import org.apache.asterix.metadata.feeds.IFeedAdapterFactory;
import org.apache.asterix.metadata.utils.DatasetUtils;
import org.apache.asterix.metadata.utils.ExternalDatasetsRegistry;
import org.apache.asterix.om.functions.AsterixBuiltinFunctions;
@@ -168,6 +170,7 @@
import org.apache.hyracks.storage.am.common.impls.NoOpOperationCallbackFactory;
import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation;
import org.apache.hyracks.storage.am.common.tuples.TypeAwareTupleWriterFactory;
+import org.apache.hyracks.storage.am.lsm.btree.dataflow.ExternalBTreeDataflowHelperFactory;
import org.apache.hyracks.storage.am.lsm.btree.dataflow.ExternalBTreeWithBuddyDataflowHelperFactory;
import org.apache.hyracks.storage.am.lsm.btree.dataflow.LSMBTreeDataflowHelperFactory;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMMergePolicyFactory;
@@ -349,8 +352,8 @@
// querying an external dataset
Dataset dataset = ((DatasetDataSource) dataSource).getDataset();
String itemTypeName = dataset.getItemTypeName();
- IAType itemType = MetadataManager.INSTANCE.getDatatype(mdTxnCtx, dataset.getDataverseName(),
- itemTypeName).getDatatype();
+ IAType itemType = MetadataManager.INSTANCE
+ .getDatatype(mdTxnCtx, dataset.getDataverseName(), itemTypeName).getDatatype();
ExternalDatasetDetails edd = (ExternalDatasetDetails) dataset.getDatasetDetails();
IAdapterFactory adapterFactory = getConfiguredAdapterFactory(dataset, edd.getAdapter(),
edd.getProperties(), itemType, false, null);
@@ -395,8 +398,8 @@
.getSerializerDeserializer(feedOutputType);
RecordDescriptor feedDesc = new RecordDescriptor(new ISerializerDeserializer[] { payloadSerde });
- FeedPolicy feedPolicy = (FeedPolicy) ((AqlDataSource) dataSource).getProperties().get(
- BuiltinFeedPolicies.CONFIG_FEED_POLICY_KEY);
+ FeedPolicy feedPolicy = (FeedPolicy) ((AqlDataSource) dataSource).getProperties()
+ .get(BuiltinFeedPolicies.CONFIG_FEED_POLICY_KEY);
if (feedPolicy == null) {
throw new AlgebricksException("Feed not configured with a policy");
}
@@ -404,8 +407,8 @@
FeedConnectionId feedConnectionId = new FeedConnectionId(feedDataSource.getId().getDataverseName(),
feedDataSource.getId().getDatasourceName(), feedDataSource.getTargetDataset());
feedCollector = new FeedCollectOperatorDescriptor(jobSpec, feedConnectionId,
- feedDataSource.getSourceFeedId(), (ARecordType) feedOutputType, feedDesc,
- feedPolicy.getProperties(), feedDataSource.getLocation());
+ feedDataSource.getSourceFeedId(), feedOutputType, feedDesc, feedPolicy.getProperties(),
+ feedDataSource.getLocation());
return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(feedCollector,
determineLocationConstraint(feedDataSource));
@@ -435,8 +438,8 @@
if (activity.getDataverseName().equals(feedDataSource.getSourceFeedId().getDataverse())
&& activity.getFeedName()
.equals(feedDataSource.getSourceFeedId().getFeedName())) {
- locations = activity.getFeedActivityDetails().get(
- FeedActivityDetails.COMPUTE_LOCATIONS);
+ locations = activity.getFeedActivityDetails()
+ .get(FeedActivityDetails.COMPUTE_LOCATIONS);
locationArray = locations.split(",");
break;
}
@@ -490,8 +493,8 @@
private Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> buildLoadableDatasetScan(JobSpecification jobSpec,
LoadableDataSource alds, IAdapterFactory adapterFactory, RecordDescriptor rDesc, boolean isPKAutoGenerated,
List<List<String>> primaryKeys, ARecordType recType, int pkIndex) throws AlgebricksException {
- if (!(adapterFactory.getSupportedOperations().equals(SupportedOperation.READ) || adapterFactory
- .getSupportedOperations().equals(SupportedOperation.READ_WRITE))) {
+ if (!(adapterFactory.getSupportedOperations().equals(SupportedOperation.READ)
+ || adapterFactory.getSupportedOperations().equals(SupportedOperation.READ_WRITE))) {
throw new AlgebricksException(" External dataset adapter does not support read operation");
}
ExternalDataScanOperatorDescriptor dataScanner = new ExternalDataScanOperatorDescriptor(jobSpec, rDesc,
@@ -597,13 +600,13 @@
public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> buildExternalDatasetDataScannerRuntime(
JobSpecification jobSpec, IAType itemType, IAdapterFactory adapterFactory, IDataFormat format)
- throws AlgebricksException {
+ throws AlgebricksException {
if (itemType.getTypeTag() != ATypeTag.RECORD) {
throw new AlgebricksException("Can only scan datasets of records.");
}
- if (!(adapterFactory.getSupportedOperations().equals(SupportedOperation.READ) || adapterFactory
- .getSupportedOperations().equals(SupportedOperation.READ_WRITE))) {
+ if (!(adapterFactory.getSupportedOperations().equals(SupportedOperation.READ)
+ || adapterFactory.getSupportedOperations().equals(SupportedOperation.READ_WRITE))) {
throw new AlgebricksException(" External dataset adapter does not support read operation");
}
@@ -659,8 +662,8 @@
case EXTERNAL:
String libraryName = primaryFeed.getAdaptorName().trim()
.split(FeedConstants.NamingConstants.LIBRARY_NAME_SEPARATOR)[0];
- feedIngestor = new FeedIntakeOperatorDescriptor(jobSpec, primaryFeed, libraryName, adapterFactory
- .getClass().getName(), factoryOutput.second, policyAccessor);
+ feedIngestor = new FeedIntakeOperatorDescriptor(jobSpec, primaryFeed, libraryName,
+ adapterFactory.getClass().getName(), factoryOutput.second, policyAccessor);
break;
}
@@ -691,11 +694,11 @@
IBinaryComparatorFactory[] comparatorFactories;
String itemTypeName = dataset.getItemTypeName();
- ARecordType itemType = (ARecordType) MetadataManager.INSTANCE.getDatatype(mdTxnCtx,
- dataset.getDataverseName(), itemTypeName).getDatatype();
+ ARecordType itemType = (ARecordType) MetadataManager.INSTANCE
+ .getDatatype(mdTxnCtx, dataset.getDataverseName(), itemTypeName).getDatatype();
ITypeTraits[] filterTypeTraits = DatasetUtils.computeFilterTypeTraits(dataset, itemType);
- IBinaryComparatorFactory[] filterCmpFactories = DatasetUtils.computeFilterBinaryComparatorFactories(
- dataset, itemType, context.getBinaryComparatorFactoryProvider());
+ IBinaryComparatorFactory[] filterCmpFactories = DatasetUtils.computeFilterBinaryComparatorFactories(dataset,
+ itemType, context.getBinaryComparatorFactoryProvider());
int[] filterFields = null;
int[] btreeFields = null;
@@ -768,33 +771,34 @@
txnSubsystemProvider, ResourceType.LSM_BTREE);
}
}
- Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo = DatasetUtils.getMergePolicyFactory(
- dataset, mdTxnCtx);
+ Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo = DatasetUtils
+ .getMergePolicyFactory(dataset, mdTxnCtx);
AsterixRuntimeComponentsProvider rtcProvider = AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER;
BTreeSearchOperatorDescriptor btreeSearchOp;
if (dataset.getDatasetType() == DatasetType.INTERNAL) {
btreeSearchOp = new BTreeSearchOperatorDescriptor(jobSpec, outputRecDesc,
appContext.getStorageManagerInterface(), appContext.getIndexLifecycleManagerProvider(),
spPc.first, typeTraits, comparatorFactories, bloomFilterKeyFields, lowKeyFields, highKeyFields,
- lowKeyInclusive, highKeyInclusive, new LSMBTreeDataflowHelperFactory(
- new AsterixVirtualBufferCacheProvider(dataset.getDatasetId()), compactionInfo.first,
- compactionInfo.second, isSecondary ? new SecondaryIndexOperationTrackerProvider(
- dataset.getDatasetId()) : new PrimaryIndexOperationTrackerProvider(
- dataset.getDatasetId()), rtcProvider,
- LSMBTreeIOOperationCallbackFactory.INSTANCE,
+ lowKeyInclusive, highKeyInclusive,
+ new LSMBTreeDataflowHelperFactory(new AsterixVirtualBufferCacheProvider(dataset.getDatasetId()),
+ compactionInfo.first, compactionInfo.second,
+ isSecondary ? new SecondaryIndexOperationTrackerProvider(dataset.getDatasetId())
+ : new PrimaryIndexOperationTrackerProvider(dataset.getDatasetId()),
+ rtcProvider, LSMBTreeIOOperationCallbackFactory.INSTANCE,
storageProperties.getBloomFilterFalsePositiveRate(), !isSecondary, filterTypeTraits,
- filterCmpFactories, btreeFields, filterFields, !temp), retainInput, retainNull,
- context.getNullWriterFactory(), searchCallbackFactory, minFilterFieldIndexes,
- maxFilterFieldIndexes);
+ filterCmpFactories, btreeFields, filterFields, !temp),
+ retainInput, retainNull, context.getNullWriterFactory(), searchCallbackFactory,
+ minFilterFieldIndexes, maxFilterFieldIndexes);
} else {
// External dataset <- use the btree with buddy btree->
// Be Careful of Key Start Index ?
int[] buddyBreeFields = new int[] { numSecondaryKeys };
ExternalBTreeWithBuddyDataflowHelperFactory indexDataflowHelperFactory = new ExternalBTreeWithBuddyDataflowHelperFactory(
- compactionInfo.first, compactionInfo.second, new SecondaryIndexOperationTrackerProvider(
- dataset.getDatasetId()), AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
- LSMBTreeWithBuddyIOOperationCallbackFactory.INSTANCE, getStorageProperties()
- .getBloomFilterFalsePositiveRate(), buddyBreeFields,
+ compactionInfo.first, compactionInfo.second,
+ new SecondaryIndexOperationTrackerProvider(dataset.getDatasetId()),
+ AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
+ LSMBTreeWithBuddyIOOperationCallbackFactory.INSTANCE,
+ getStorageProperties().getBloomFilterFalsePositiveRate(), buddyBreeFields,
ExternalDatasetsRegistry.INSTANCE.getAndLockDatasetVersion(dataset, this), !temp);
btreeSearchOp = new ExternalBTreeSearchOperatorDescriptor(jobSpec, outputRecDesc, rtcProvider,
rtcProvider, spPc.first, typeTraits, comparatorFactories, bloomFilterKeyFields, lowKeyFields,
@@ -858,17 +862,15 @@
Index secondaryIndex = MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataset.getDataverseName(),
dataset.getDatasetName(), indexName);
if (secondaryIndex == null) {
- throw new AlgebricksException("Code generation error: no index " + indexName + " for dataset "
- + dataset.getDatasetName());
+ throw new AlgebricksException(
+ "Code generation error: no index " + indexName + " for dataset " + dataset.getDatasetName());
}
List<List<String>> secondaryKeyFields = secondaryIndex.getKeyFieldNames();
List<IAType> secondaryKeyTypes = secondaryIndex.getKeyFieldTypes();
int numSecondaryKeys = secondaryKeyFields.size();
if (numSecondaryKeys != 1) {
- throw new AlgebricksException(
- "Cannot use "
- + numSecondaryKeys
- + " fields as a key for the R-tree index. There can be only one field as a key for the R-tree index.");
+ throw new AlgebricksException("Cannot use " + numSecondaryKeys
+ + " fields as a key for the R-tree index. There can be only one field as a key for the R-tree index.");
}
Pair<IAType, Boolean> keyTypePair = Index.getNonNullableOpenFieldType(secondaryKeyTypes.get(0),
secondaryKeyFields.get(0), recType);
@@ -905,8 +907,8 @@
}
ITypeTraits[] filterTypeTraits = DatasetUtils.computeFilterTypeTraits(dataset, recType);
- IBinaryComparatorFactory[] filterCmpFactories = DatasetUtils.computeFilterBinaryComparatorFactories(
- dataset, recType, context.getBinaryComparatorFactoryProvider());
+ IBinaryComparatorFactory[] filterCmpFactories = DatasetUtils.computeFilterBinaryComparatorFactories(dataset,
+ recType, context.getBinaryComparatorFactoryProvider());
int[] filterFields = null;
int[] rtreeFields = null;
if (filterTypeTraits != null) {
@@ -919,8 +921,8 @@
}
IAType nestedKeyType = NonTaggedFormatUtil.getNestedSpatialType(keyType.getTypeTag());
- Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo = DatasetUtils.getMergePolicyFactory(
- dataset, mdTxnCtx);
+ Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo = DatasetUtils
+ .getMergePolicyFactory(dataset, mdTxnCtx);
ISearchOperationCallbackFactory searchCallbackFactory = temp ? NoOpOperationCallbackFactory.INSTANCE
: new SecondaryIndexSearchOperationCallbackFactory();
@@ -928,17 +930,19 @@
if (dataset.getDatasetType() == DatasetType.INTERNAL) {
rtreeSearchOp = new RTreeSearchOperatorDescriptor(jobSpec, outputRecDesc,
appContext.getStorageManagerInterface(), appContext.getIndexLifecycleManagerProvider(),
- spPc.first, typeTraits, comparatorFactories, keyFields, new LSMRTreeDataflowHelperFactory(
- valueProviderFactories, RTreePolicyType.RTREE, primaryComparatorFactories,
+ spPc.first, typeTraits, comparatorFactories, keyFields,
+ new LSMRTreeDataflowHelperFactory(valueProviderFactories, RTreePolicyType.RTREE,
+ primaryComparatorFactories,
new AsterixVirtualBufferCacheProvider(dataset.getDatasetId()), compactionInfo.first,
- compactionInfo.second, new SecondaryIndexOperationTrackerProvider(
- dataset.getDatasetId()), AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
- LSMRTreeIOOperationCallbackFactory.INSTANCE, proposeLinearizer(
- nestedKeyType.getTypeTag(), comparatorFactories.length),
+ compactionInfo.second,
+ new SecondaryIndexOperationTrackerProvider(dataset.getDatasetId()),
+ AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
+ LSMRTreeIOOperationCallbackFactory.INSTANCE,
+ proposeLinearizer(nestedKeyType.getTypeTag(), comparatorFactories.length),
storageProperties.getBloomFilterFalsePositiveRate(), rtreeFields, btreeFields,
- filterTypeTraits, filterCmpFactories, filterFields, !temp), retainInput, retainNull,
- context.getNullWriterFactory(), searchCallbackFactory, minFilterFieldIndexes,
- maxFilterFieldIndexes);
+ filterTypeTraits, filterCmpFactories, filterFields, !temp),
+ retainInput, retainNull, context.getNullWriterFactory(), searchCallbackFactory,
+ minFilterFieldIndexes, maxFilterFieldIndexes);
} else {
// External Dataset
@@ -954,8 +958,8 @@
// Create the operator
rtreeSearchOp = new ExternalRTreeSearchOperatorDescriptor(jobSpec, outputRecDesc,
appContext.getStorageManagerInterface(), appContext.getIndexLifecycleManagerProvider(),
- spPc.first, typeTraits, comparatorFactories, keyFields, indexDataflowHelperFactory,
- retainInput, retainNull, context.getNullWriterFactory(), searchCallbackFactory);
+ spPc.first, typeTraits, comparatorFactories, keyFields, indexDataflowHelperFactory, retainInput,
+ retainNull, context.getNullWriterFactory(), searchCallbackFactory);
}
return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(rtreeSearchOp, spPc.second);
@@ -1034,8 +1038,8 @@
}
String tName = dataset.getItemTypeName();
IAType itemType = MetadataManager.INSTANCE.getDatatype(mdTxnCtx, aqlId.getDataverseName(), tName).getDatatype();
- AqlDataSourceType datasourceType = dataset.getDatasetType().equals(DatasetType.EXTERNAL) ? AqlDataSourceType.EXTERNAL_DATASET
- : AqlDataSourceType.INTERNAL_DATASET;
+ AqlDataSourceType datasourceType = dataset.getDatasetType().equals(DatasetType.EXTERNAL)
+ ? AqlDataSourceType.EXTERNAL_DATASET : AqlDataSourceType.INTERNAL_DATASET;
return new DatasetDataSource(aqlId, aqlId.getDataverseName(), aqlId.getDatasourceName(), itemType,
datasourceType);
}
@@ -1102,8 +1106,8 @@
String indexName = primaryIndex.getIndexName();
String itemTypeName = dataset.getItemTypeName();
- ARecordType itemType = (ARecordType) MetadataManager.INSTANCE.getDatatype(mdTxnCtx,
- dataset.getDataverseName(), itemTypeName).getDatatype();
+ ARecordType itemType = (ARecordType) MetadataManager.INSTANCE
+ .getDatatype(mdTxnCtx, dataset.getDataverseName(), itemTypeName).getDatatype();
ITypeTraits[] typeTraits = DatasetUtils.computeTupleTypeTraits(dataset, itemType);
IBinaryComparatorFactory[] comparatorFactories = DatasetUtils.computeKeysBinaryComparatorFactories(dataset,
itemType, context.getBinaryComparatorFactoryProvider());
@@ -1115,8 +1119,8 @@
long numElementsHint = getCardinalityPerPartitionHint(dataset);
ITypeTraits[] filterTypeTraits = DatasetUtils.computeFilterTypeTraits(dataset, itemType);
- IBinaryComparatorFactory[] filterCmpFactories = DatasetUtils.computeFilterBinaryComparatorFactories(
- dataset, itemType, context.getBinaryComparatorFactoryProvider());
+ IBinaryComparatorFactory[] filterCmpFactories = DatasetUtils.computeFilterBinaryComparatorFactories(dataset,
+ itemType, context.getBinaryComparatorFactoryProvider());
int[] filterFields = DatasetUtils.createFilterFields(dataset);
int[] btreeFields = DatasetUtils.createBTreeFieldsWhenThereisAFilter(dataset);
@@ -1125,15 +1129,16 @@
// right callback
// (ex. what's the expected behavior when there is an error during
// bulkload?)
- Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo = DatasetUtils.getMergePolicyFactory(
- dataset, mdTxnCtx);
+ Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo = DatasetUtils
+ .getMergePolicyFactory(dataset, mdTxnCtx);
TreeIndexBulkLoadOperatorDescriptor btreeBulkLoad = new TreeIndexBulkLoadOperatorDescriptor(spec, null,
appContext.getStorageManagerInterface(), appContext.getIndexLifecycleManagerProvider(),
splitsAndConstraint.first, typeTraits, comparatorFactories, bloomFilterKeyFields, fieldPermutation,
GlobalConfig.DEFAULT_TREE_FILL_FACTOR, false, numElementsHint, true,
new LSMBTreeDataflowHelperFactory(new AsterixVirtualBufferCacheProvider(dataset.getDatasetId()),
- compactionInfo.first, compactionInfo.second, new PrimaryIndexOperationTrackerProvider(
- dataset.getDatasetId()), AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
+ compactionInfo.first, compactionInfo.second,
+ new PrimaryIndexOperationTrackerProvider(dataset.getDatasetId()),
+ AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
LSMBTreeIOOperationCallbackFactory.INSTANCE,
storageProperties.getBloomFilterFalsePositiveRate(), true, filterTypeTraits,
filterCmpFactories, btreeFields, filterFields, !temp));
@@ -1148,13 +1153,13 @@
IDataSource<AqlSourceId> dataSource, IOperatorSchema propagatedSchema, IVariableTypeEnvironment typeEnv,
List<LogicalVariable> keys, LogicalVariable payload, List<LogicalVariable> additionalNonKeyFields,
RecordDescriptor recordDesc, JobGenContext context, JobSpecification spec, boolean bulkload)
- throws AlgebricksException {
+ throws AlgebricksException {
String datasetName = dataSource.getId().getDatasourceName();
Dataset dataset = findDataset(dataSource.getId().getDataverseName(), datasetName);
if (dataset == null) {
- throw new AlgebricksException("Unknown dataset " + datasetName + " in dataverse "
- + dataSource.getId().getDataverseName());
+ throw new AlgebricksException(
+ "Unknown dataset " + datasetName + " in dataverse " + dataSource.getId().getDataverseName());
}
boolean temp = dataset.getDatasetDetails().isTemp();
isTemporaryDatasetWriteJob = isTemporaryDatasetWriteJob && temp;
@@ -1183,8 +1188,8 @@
String indexName = primaryIndex.getIndexName();
String itemTypeName = dataset.getItemTypeName();
- ARecordType itemType = (ARecordType) MetadataManager.INSTANCE.getDatatype(mdTxnCtx,
- dataSource.getId().getDataverseName(), itemTypeName).getDatatype();
+ ARecordType itemType = (ARecordType) MetadataManager.INSTANCE
+ .getDatatype(mdTxnCtx, dataSource.getId().getDataverseName(), itemTypeName).getDatatype();
ITypeTraits[] typeTraits = DatasetUtils.computeTupleTypeTraits(dataset, itemType);
@@ -1203,24 +1208,26 @@
}
ITypeTraits[] filterTypeTraits = DatasetUtils.computeFilterTypeTraits(dataset, itemType);
- IBinaryComparatorFactory[] filterCmpFactories = DatasetUtils.computeFilterBinaryComparatorFactories(
- dataset, itemType, context.getBinaryComparatorFactoryProvider());
+ IBinaryComparatorFactory[] filterCmpFactories = DatasetUtils.computeFilterBinaryComparatorFactories(dataset,
+ itemType, context.getBinaryComparatorFactoryProvider());
int[] filterFields = DatasetUtils.createFilterFields(dataset);
int[] btreeFields = DatasetUtils.createBTreeFieldsWhenThereisAFilter(dataset);
TransactionSubsystemProvider txnSubsystemProvider = new TransactionSubsystemProvider();
- IModificationOperationCallbackFactory modificationCallbackFactory = temp ? new TempDatasetPrimaryIndexModificationOperationCallbackFactory(
- jobId, datasetId, primaryKeyFields, txnSubsystemProvider, indexOp, ResourceType.LSM_BTREE)
+ IModificationOperationCallbackFactory modificationCallbackFactory = temp
+ ? new TempDatasetPrimaryIndexModificationOperationCallbackFactory(jobId, datasetId,
+ primaryKeyFields, txnSubsystemProvider, indexOp, ResourceType.LSM_BTREE)
: new PrimaryIndexModificationOperationCallbackFactory(jobId, datasetId, primaryKeyFields,
txnSubsystemProvider, indexOp, ResourceType.LSM_BTREE);
- Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo = DatasetUtils.getMergePolicyFactory(
- dataset, mdTxnCtx);
- IIndexDataflowHelperFactory idfh = new LSMBTreeDataflowHelperFactory(new AsterixVirtualBufferCacheProvider(
- datasetId), compactionInfo.first, compactionInfo.second, new PrimaryIndexOperationTrackerProvider(
- dataset.getDatasetId()), AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
- LSMBTreeIOOperationCallbackFactory.INSTANCE, storageProperties.getBloomFilterFalsePositiveRate(),
- true, filterTypeTraits, filterCmpFactories, btreeFields, filterFields, !temp);
+ Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo = DatasetUtils
+ .getMergePolicyFactory(dataset, mdTxnCtx);
+ IIndexDataflowHelperFactory idfh = new LSMBTreeDataflowHelperFactory(
+ new AsterixVirtualBufferCacheProvider(datasetId), compactionInfo.first, compactionInfo.second,
+ new PrimaryIndexOperationTrackerProvider(dataset.getDatasetId()),
+ AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, LSMBTreeIOOperationCallbackFactory.INSTANCE,
+ storageProperties.getBloomFilterFalsePositiveRate(), true, filterTypeTraits, filterCmpFactories,
+ btreeFields, filterFields, !temp);
IOperatorDescriptor op;
if (bulkload) {
long numElementsHint = getCardinalityPerPartitionHint(dataset);
@@ -1246,7 +1253,7 @@
IDataSource<AqlSourceId> dataSource, IOperatorSchema propagatedSchema, IVariableTypeEnvironment typeEnv,
List<LogicalVariable> keys, LogicalVariable payload, List<LogicalVariable> additionalNonKeyFields,
RecordDescriptor recordDesc, JobGenContext context, JobSpecification spec, boolean bulkload)
- throws AlgebricksException {
+ throws AlgebricksException {
return getInsertOrDeleteRuntime(IndexOperation.INSERT, dataSource, propagatedSchema, typeEnv, keys, payload,
additionalNonKeyFields, recordDesc, context, spec, bulkload);
}
@@ -1284,14 +1291,14 @@
AsterixTupleFilterFactory filterFactory = createTupleFilterFactory(inputSchemas, typeEnv, filterExpr, context);
switch (secondaryIndex.getIndexType()) {
case BTREE: {
- return getBTreeDmlRuntime(dataverseName, datasetName, indexName, propagatedSchema, typeEnv,
- primaryKeys, secondaryKeys, additionalNonKeyFields, filterFactory, recordDesc, context, spec,
- indexOp, bulkload);
+ return getBTreeDmlRuntime(dataverseName, datasetName, indexName, propagatedSchema, typeEnv, primaryKeys,
+ secondaryKeys, additionalNonKeyFields, filterFactory, recordDesc, context, spec, indexOp,
+ bulkload);
}
case RTREE: {
- return getRTreeDmlRuntime(dataverseName, datasetName, indexName, propagatedSchema, typeEnv,
- primaryKeys, secondaryKeys, additionalNonKeyFields, filterFactory, recordDesc, context, spec,
- indexOp, bulkload);
+ return getRTreeDmlRuntime(dataverseName, datasetName, indexName, propagatedSchema, typeEnv, primaryKeys,
+ secondaryKeys, additionalNonKeyFields, filterFactory, recordDesc, context, spec, indexOp,
+ bulkload);
}
case SINGLE_PARTITION_WORD_INVIX:
case SINGLE_PARTITION_NGRAM_INVIX:
@@ -1302,8 +1309,8 @@
indexOp, secondaryIndex.getIndexType(), bulkload);
}
default: {
- throw new AlgebricksException("Insert and delete not implemented for index type: "
- + secondaryIndex.getIndexType());
+ throw new AlgebricksException(
+ "Insert and delete not implemented for index type: " + secondaryIndex.getIndexType());
}
}
}
@@ -1374,7 +1381,7 @@
IVariableTypeEnvironment typeEnv, List<LogicalVariable> primaryKeys, List<LogicalVariable> secondaryKeys,
AsterixTupleFilterFactory filterFactory, RecordDescriptor recordDesc, JobGenContext context,
JobSpecification spec, IndexOperation indexOp, IndexType indexType, boolean bulkload)
- throws AlgebricksException {
+ throws AlgebricksException {
// Sanity checks.
if (primaryKeys.size() > 1) {
@@ -1549,7 +1556,8 @@
tokenizerOp = new BinaryTokenizerOperatorDescriptor(spec, tokenKeyPairRecDesc, tokenizerFactory, docField,
keyFields, isPartitioned, true);
- return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(tokenizerOp, splitsAndConstraint.second);
+ return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(tokenizerOp,
+ splitsAndConstraint.second);
} catch (MetadataException e) {
throw new AlgebricksException(e);
@@ -1564,7 +1572,7 @@
IOperatorSchema[] inputSchemas, IVariableTypeEnvironment typeEnv, List<LogicalVariable> primaryKeys,
List<LogicalVariable> secondaryKeys, List<LogicalVariable> additionalNonKeyFields,
ILogicalExpression filterExpr, RecordDescriptor recordDesc, JobGenContext context, JobSpecification spec)
- throws AlgebricksException {
+ throws AlgebricksException {
return getIndexInsertOrDeleteRuntime(IndexOperation.DELETE, dataSourceIndex, propagatedSchema, inputSchemas,
typeEnv, primaryKeys, secondaryKeys, additionalNonKeyFields, filterExpr, recordDesc, context, spec,
false);
@@ -1572,7 +1580,7 @@
private AsterixTupleFilterFactory createTupleFilterFactory(IOperatorSchema[] inputSchemas,
IVariableTypeEnvironment typeEnv, ILogicalExpression filterExpr, JobGenContext context)
- throws AlgebricksException {
+ throws AlgebricksException {
// No filtering condition.
if (filterExpr == null) {
return null;
@@ -1641,8 +1649,8 @@
dataset.getDatasetName(), indexName);
ITypeTraits[] filterTypeTraits = DatasetUtils.computeFilterTypeTraits(dataset, recType);
- IBinaryComparatorFactory[] filterCmpFactories = DatasetUtils.computeFilterBinaryComparatorFactories(
- dataset, recType, context.getBinaryComparatorFactoryProvider());
+ IBinaryComparatorFactory[] filterCmpFactories = DatasetUtils.computeFilterBinaryComparatorFactories(dataset,
+ recType, context.getBinaryComparatorFactoryProvider());
int[] filterFields = null;
int[] btreeFields = null;
if (filterTypeTraits != null) {
@@ -1662,15 +1670,15 @@
Pair<IAType, Boolean> keyPairType = Index.getNonNullableOpenFieldType(secondaryKeyTypes.get(i),
secondaryKeyNames.get(i), recType);
IAType keyType = keyPairType.first;
- comparatorFactories[i] = AqlBinaryComparatorFactoryProvider.INSTANCE.getBinaryComparatorFactory(
- keyType, true);
+ comparatorFactories[i] = AqlBinaryComparatorFactoryProvider.INSTANCE.getBinaryComparatorFactory(keyType,
+ true);
typeTraits[i] = AqlTypeTraitProvider.INSTANCE.getTypeTrait(keyType);
}
List<List<String>> partitioningKeys = DatasetUtils.getPartitioningKeys(dataset);
for (List<String> partitioningKey : partitioningKeys) {
IAType keyType = recType.getSubFieldType(partitioningKey);
- comparatorFactories[i] = AqlBinaryComparatorFactoryProvider.INSTANCE.getBinaryComparatorFactory(
- keyType, true);
+ comparatorFactories[i] = AqlBinaryComparatorFactoryProvider.INSTANCE.getBinaryComparatorFactory(keyType,
+ true);
typeTraits[i] = AqlTypeTraitProvider.INSTANCE.getTypeTrait(keyType);
++i;
}
@@ -1683,15 +1691,17 @@
JobId jobId = ((JobEventListenerFactory) spec.getJobletEventListenerFactory()).getJobId();
int datasetId = dataset.getDatasetId();
TransactionSubsystemProvider txnSubsystemProvider = new TransactionSubsystemProvider();
- IModificationOperationCallbackFactory modificationCallbackFactory = temp ? new TempDatasetSecondaryIndexModificationOperationCallbackFactory(
- jobId, datasetId, modificationCallbackPrimaryKeyFields, txnSubsystemProvider, indexOp,
- ResourceType.LSM_BTREE) : new SecondaryIndexModificationOperationCallbackFactory(jobId, datasetId,
- modificationCallbackPrimaryKeyFields, txnSubsystemProvider, indexOp, ResourceType.LSM_BTREE);
+ IModificationOperationCallbackFactory modificationCallbackFactory = temp
+ ? new TempDatasetSecondaryIndexModificationOperationCallbackFactory(jobId, datasetId,
+ modificationCallbackPrimaryKeyFields, txnSubsystemProvider, indexOp, ResourceType.LSM_BTREE)
+ : new SecondaryIndexModificationOperationCallbackFactory(jobId, datasetId,
+ modificationCallbackPrimaryKeyFields, txnSubsystemProvider, indexOp,
+ ResourceType.LSM_BTREE);
- Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo = DatasetUtils.getMergePolicyFactory(
- dataset, mdTxnCtx);
- IIndexDataflowHelperFactory idfh = new LSMBTreeDataflowHelperFactory(new AsterixVirtualBufferCacheProvider(
- datasetId), compactionInfo.first, compactionInfo.second,
+ Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo = DatasetUtils
+ .getMergePolicyFactory(dataset, mdTxnCtx);
+ IIndexDataflowHelperFactory idfh = new LSMBTreeDataflowHelperFactory(
+ new AsterixVirtualBufferCacheProvider(datasetId), compactionInfo.first, compactionInfo.second,
new SecondaryIndexOperationTrackerProvider(dataset.getDatasetId()),
AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, LSMBTreeIOOperationCallbackFactory.INSTANCE,
storageProperties.getBloomFilterFalsePositiveRate(), false, filterTypeTraits, filterCmpFactories,
@@ -1707,14 +1717,15 @@
op = new AsterixLSMTreeInsertDeleteOperatorDescriptor(spec, recordDesc,
appContext.getStorageManagerInterface(), appContext.getIndexLifecycleManagerProvider(),
splitsAndConstraint.first, typeTraits, comparatorFactories, bloomFilterKeyFields,
- fieldPermutation, indexOp, new LSMBTreeDataflowHelperFactory(
- new AsterixVirtualBufferCacheProvider(datasetId), compactionInfo.first,
- compactionInfo.second, new SecondaryIndexOperationTrackerProvider(
- dataset.getDatasetId()), AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
+ fieldPermutation, indexOp,
+ new LSMBTreeDataflowHelperFactory(new AsterixVirtualBufferCacheProvider(datasetId),
+ compactionInfo.first, compactionInfo.second,
+ new SecondaryIndexOperationTrackerProvider(dataset.getDatasetId()),
+ AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
LSMBTreeIOOperationCallbackFactory.INSTANCE,
storageProperties.getBloomFilterFalsePositiveRate(), false, filterTypeTraits,
- filterCmpFactories, btreeFields, filterFields, !temp), filterFactory,
- modificationCallbackFactory, false, indexName);
+ filterCmpFactories, btreeFields, filterFields, !temp),
+ filterFactory, modificationCallbackFactory, false, indexName);
}
return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(op, splitsAndConstraint.second);
} catch (MetadataException e) {
@@ -1853,8 +1864,8 @@
secondaryKeyType.getTypeTag(), indexType, secondaryIndex.getGramLength());
ITypeTraits[] filterTypeTraits = DatasetUtils.computeFilterTypeTraits(dataset, recType);
- IBinaryComparatorFactory[] filterCmpFactories = DatasetUtils.computeFilterBinaryComparatorFactories(
- dataset, recType, context.getBinaryComparatorFactoryProvider());
+ IBinaryComparatorFactory[] filterCmpFactories = DatasetUtils.computeFilterBinaryComparatorFactories(dataset,
+ recType, context.getBinaryComparatorFactoryProvider());
int[] filterFields = null;
int[] invertedIndexFields = null;
@@ -1884,18 +1895,20 @@
JobId jobId = ((JobEventListenerFactory) spec.getJobletEventListenerFactory()).getJobId();
int datasetId = dataset.getDatasetId();
TransactionSubsystemProvider txnSubsystemProvider = new TransactionSubsystemProvider();
- IModificationOperationCallbackFactory modificationCallbackFactory = temp ? new TempDatasetSecondaryIndexModificationOperationCallbackFactory(
- jobId, datasetId, modificationCallbackPrimaryKeyFields, txnSubsystemProvider, indexOp,
- ResourceType.LSM_INVERTED_INDEX) : new SecondaryIndexModificationOperationCallbackFactory(jobId,
- datasetId, modificationCallbackPrimaryKeyFields, txnSubsystemProvider, indexOp,
- ResourceType.LSM_INVERTED_INDEX);
+ IModificationOperationCallbackFactory modificationCallbackFactory = temp
+ ? new TempDatasetSecondaryIndexModificationOperationCallbackFactory(jobId, datasetId,
+ modificationCallbackPrimaryKeyFields, txnSubsystemProvider, indexOp,
+ ResourceType.LSM_INVERTED_INDEX)
+ : new SecondaryIndexModificationOperationCallbackFactory(jobId, datasetId,
+ modificationCallbackPrimaryKeyFields, txnSubsystemProvider, indexOp,
+ ResourceType.LSM_INVERTED_INDEX);
- Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo = DatasetUtils.getMergePolicyFactory(
- dataset, mdTxnCtx);
+ Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo = DatasetUtils
+ .getMergePolicyFactory(dataset, mdTxnCtx);
IIndexDataflowHelperFactory indexDataFlowFactory;
if (!isPartitioned) {
- indexDataFlowFactory = new LSMInvertedIndexDataflowHelperFactory(new AsterixVirtualBufferCacheProvider(
- datasetId), compactionInfo.first, compactionInfo.second,
+ indexDataFlowFactory = new LSMInvertedIndexDataflowHelperFactory(
+ new AsterixVirtualBufferCacheProvider(datasetId), compactionInfo.first, compactionInfo.second,
new SecondaryIndexOperationTrackerProvider(dataset.getDatasetId()),
AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
LSMInvertedIndexIOOperationCallbackFactory.INSTANCE,
@@ -1992,8 +2005,8 @@
IAType nestedKeyType = NonTaggedFormatUtil.getNestedSpatialType(spatialType.getTypeTag());
IPrimitiveValueProviderFactory[] valueProviderFactories = new IPrimitiveValueProviderFactory[numSecondaryKeys];
for (i = 0; i < numSecondaryKeys; i++) {
- comparatorFactories[i] = AqlBinaryComparatorFactoryProvider.INSTANCE.getBinaryComparatorFactory(
- nestedKeyType, true);
+ comparatorFactories[i] = AqlBinaryComparatorFactoryProvider.INSTANCE
+ .getBinaryComparatorFactory(nestedKeyType, true);
typeTraits[i] = AqlTypeTraitProvider.INSTANCE.getTypeTrait(nestedKeyType);
valueProviderFactories[i] = AqlPrimitiveValueProviderFactory.INSTANCE;
}
@@ -2015,8 +2028,8 @@
}
ITypeTraits[] filterTypeTraits = DatasetUtils.computeFilterTypeTraits(dataset, recType);
- IBinaryComparatorFactory[] filterCmpFactories = DatasetUtils.computeFilterBinaryComparatorFactories(
- dataset, recType, context.getBinaryComparatorFactoryProvider());
+ IBinaryComparatorFactory[] filterCmpFactories = DatasetUtils.computeFilterBinaryComparatorFactories(dataset,
+ recType, context.getBinaryComparatorFactoryProvider());
int[] filterFields = null;
int[] rtreeFields = null;
if (filterTypeTraits != null) {
@@ -2032,17 +2045,19 @@
JobId jobId = ((JobEventListenerFactory) spec.getJobletEventListenerFactory()).getJobId();
int datasetId = dataset.getDatasetId();
TransactionSubsystemProvider txnSubsystemProvider = new TransactionSubsystemProvider();
- IModificationOperationCallbackFactory modificationCallbackFactory = temp ? new TempDatasetSecondaryIndexModificationOperationCallbackFactory(
- jobId, datasetId, modificationCallbackPrimaryKeyFields, txnSubsystemProvider, indexOp,
- ResourceType.LSM_RTREE) : new SecondaryIndexModificationOperationCallbackFactory(jobId, datasetId,
- modificationCallbackPrimaryKeyFields, txnSubsystemProvider, indexOp, ResourceType.LSM_RTREE);
+ IModificationOperationCallbackFactory modificationCallbackFactory = temp
+ ? new TempDatasetSecondaryIndexModificationOperationCallbackFactory(jobId, datasetId,
+ modificationCallbackPrimaryKeyFields, txnSubsystemProvider, indexOp, ResourceType.LSM_RTREE)
+ : new SecondaryIndexModificationOperationCallbackFactory(jobId, datasetId,
+ modificationCallbackPrimaryKeyFields, txnSubsystemProvider, indexOp,
+ ResourceType.LSM_RTREE);
- Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo = DatasetUtils.getMergePolicyFactory(
- dataset, mdTxnCtx);
+ Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo = DatasetUtils
+ .getMergePolicyFactory(dataset, mdTxnCtx);
IIndexDataflowHelperFactory idfh = new LSMRTreeDataflowHelperFactory(valueProviderFactories,
- RTreePolicyType.RTREE, primaryComparatorFactories, new AsterixVirtualBufferCacheProvider(
- dataset.getDatasetId()), compactionInfo.first, compactionInfo.second,
- new SecondaryIndexOperationTrackerProvider(dataset.getDatasetId()),
+ RTreePolicyType.RTREE, primaryComparatorFactories,
+ new AsterixVirtualBufferCacheProvider(dataset.getDatasetId()), compactionInfo.first,
+ compactionInfo.second, new SecondaryIndexOperationTrackerProvider(dataset.getDatasetId()),
AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, LSMRTreeIOOperationCallbackFactory.INSTANCE,
proposeLinearizer(nestedKeyType.getTypeTag(), comparatorFactories.length),
storageProperties.getBloomFilterFalsePositiveRate(), rtreeFields, btreeFields, filterTypeTraits,
@@ -2059,15 +2074,16 @@
appContext.getStorageManagerInterface(), appContext.getIndexLifecycleManagerProvider(),
splitsAndConstraint.first, typeTraits, comparatorFactories, null, fieldPermutation, indexOp,
new LSMRTreeDataflowHelperFactory(valueProviderFactories, RTreePolicyType.RTREE,
- primaryComparatorFactories, new AsterixVirtualBufferCacheProvider(dataset
- .getDatasetId()), compactionInfo.first, compactionInfo.second,
+ primaryComparatorFactories,
+ new AsterixVirtualBufferCacheProvider(dataset.getDatasetId()), compactionInfo.first,
+ compactionInfo.second,
new SecondaryIndexOperationTrackerProvider(dataset.getDatasetId()),
AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
- LSMRTreeIOOperationCallbackFactory.INSTANCE, proposeLinearizer(
- nestedKeyType.getTypeTag(), comparatorFactories.length), storageProperties
- .getBloomFilterFalsePositiveRate(), rtreeFields, btreeFields, filterTypeTraits,
- filterCmpFactories, filterFields, !temp), filterFactory,
- modificationCallbackFactory, false, indexName);
+ LSMRTreeIOOperationCallbackFactory.INSTANCE,
+ proposeLinearizer(nestedKeyType.getTypeTag(), comparatorFactories.length),
+ storageProperties.getBloomFilterFalsePositiveRate(), rtreeFields, btreeFields,
+ filterTypeTraits, filterCmpFactories, filterFields, !temp),
+ filterFactory, modificationCallbackFactory, false, indexName);
}
return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(op, splitsAndConstraint.second);
} catch (MetadataException | IOException e) {
@@ -2257,8 +2273,8 @@
try {
type = MetadataManager.INSTANCE.getDatatype(mdTxnCtx, dataverse, typeName);
} catch (MetadataException e) {
- throw new AlgebricksException("Metadata exception while looking up type '" + typeName + "' in dataverse '"
- + dataverse + "'", e);
+ throw new AlgebricksException(
+ "Metadata exception while looking up type '" + typeName + "' in dataverse '" + dataverse + "'", e);
}
if (type == null) {
throw new AlgebricksException("Type name '" + typeName + "' unknown in dataverse '" + dataverse + "'");
@@ -2368,16 +2384,16 @@
String[] ioDevices = AsterixClusterProperties.INSTANCE.getIODevices(nd);
if (create) {
for (int j = 0; j < nodeStores.length; j++) {
- File f = new File(ioDevices[0] + File.separator + nodeStores[j] + File.separator
- + relPathFile);
+ File f = new File(
+ ioDevices[0] + File.separator + nodeStores[j] + File.separator + relPathFile);
splitArray.add(new FileSplit(nd, new FileReference(f), 0));
}
} else {
int numIODevices = AsterixClusterProperties.INSTANCE.getNumberOfIODevices(nd);
for (int j = 0; j < nodeStores.length; j++) {
for (int k = 0; k < numIODevices; k++) {
- File f = new File(ioDevices[0] + File.separator + nodeStores[j] + File.separator
- + relPathFile);
+ File f = new File(
+ ioDevices[0] + File.separator + nodeStores[j] + File.separator + relPathFile);
splitArray.add(new FileSplit(nd, new FileReference(f), 0));
}
}
@@ -2407,4 +2423,72 @@
this.locks = locks;
}
+ public static Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> buildExternalDataLookupRuntime(
+ JobSpecification jobSpec, Dataset dataset, Index secondaryIndex, int[] ridIndexes, boolean retainInput,
+ IVariableTypeEnvironment typeEnv, List<LogicalVariable> outputVars, IOperatorSchema opSchema,
+ JobGenContext context, AqlMetadataProvider metadataProvider, boolean retainNull)
+ throws AlgebricksException {
+ // Get data type
+ IAType itemType = null;
+ try {
+ itemType = MetadataManager.INSTANCE.getDatatype(metadataProvider.getMetadataTxnContext(),
+ dataset.getDataverseName(), dataset.getItemTypeName()).getDatatype();
+ } catch (MetadataException e) {
+ e.printStackTrace();
+ throw new AlgebricksException("Unable to get item type from metadata " + e);
+ }
+ if (itemType.getTypeTag() != ATypeTag.RECORD) {
+ throw new AlgebricksException("Can only scan datasets of records.");
+ }
+
+ // Create the adapter factory <- right now there is only one. if there are more in the future, we can create a map->
+ ExternalDatasetDetails datasetDetails = (ExternalDatasetDetails) dataset.getDatasetDetails();
+ HDFSLookupAdapterFactory adapterFactory = new HDFSLookupAdapterFactory();
+ adapterFactory.configure(itemType, retainInput, ridIndexes, datasetDetails.getProperties(), retainNull);
+
+ Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo;
+ try {
+ compactionInfo = DatasetUtils.getMergePolicyFactory(dataset, metadataProvider.getMetadataTxnContext());
+ } catch (MetadataException e) {
+ throw new AlgebricksException(" Unabel to create merge policy factory for external dataset", e);
+ }
+
+ boolean temp = dataset.getDatasetDetails().isTemp();
+ // Create the file index data flow helper
+ ExternalBTreeDataflowHelperFactory indexDataflowHelperFactory = new ExternalBTreeDataflowHelperFactory(
+ compactionInfo.first, compactionInfo.second,
+ new SecondaryIndexOperationTrackerProvider(dataset.getDatasetId()),
+ AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, LSMBTreeIOOperationCallbackFactory.INSTANCE,
+ metadataProvider.getStorageProperties().getBloomFilterFalsePositiveRate(),
+ ExternalDatasetsRegistry.INSTANCE.getAndLockDatasetVersion(dataset, metadataProvider), !temp);
+
+ // Create the out record descriptor, appContext and fileSplitProvider for the files index
+ RecordDescriptor outRecDesc = JobGenHelper.mkRecordDescriptor(typeEnv, opSchema, context);
+ IAsterixApplicationContextInfo appContext = (IAsterixApplicationContextInfo) context.getAppContext();
+ Pair<IFileSplitProvider, AlgebricksPartitionConstraint> spPc;
+ try {
+ spPc = metadataProvider.splitProviderAndPartitionConstraintsForFilesIndex(dataset.getDataverseName(),
+ dataset.getDatasetName(), dataset.getDatasetName().
+
+ concat(IndexingConstants.EXTERNAL_FILE_INDEX_NAME_SUFFIX), false);
+ } catch (
+
+ Exception e)
+
+ {
+ throw new AlgebricksException(e);
+ }
+
+ ISearchOperationCallbackFactory searchOpCallbackFactory = temp ? NoOpOperationCallbackFactory.INSTANCE
+ : new SecondaryIndexSearchOperationCallbackFactory();
+ // Create the operator
+ ExternalLoopkupOperatorDiscriptor op = new ExternalLoopkupOperatorDiscriptor(jobSpec, adapterFactory,
+ outRecDesc, indexDataflowHelperFactory, retainInput, appContext.getIndexLifecycleManagerProvider(),
+ appContext.getStorageManagerInterface(), spPc.first, dataset.getDatasetId(),
+ metadataProvider.getStorageProperties().getBloomFilterFalsePositiveRate(), searchOpCallbackFactory,
+ retainNull, context.getNullWriterFactory());
+
+ // Return value
+ return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(op, spPc.second);
+ }
}
diff --git a/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/FieldExtractingAdapterFactory.java b/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/FieldExtractingAdapterFactory.java
index b3378a4..989e4a3 100644
--- a/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/FieldExtractingAdapterFactory.java
+++ b/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/FieldExtractingAdapterFactory.java
@@ -21,8 +21,8 @@
import java.util.Map;
import org.apache.asterix.common.feeds.api.IDatasourceAdapter;
-import org.apache.asterix.metadata.external.IAdapterFactory;
-import org.apache.asterix.metadata.external.IAdapterFactory.SupportedOperation;
+import org.apache.asterix.external.adapter.factory.IAdapterFactory;
+import org.apache.asterix.external.adapter.factory.IAdapterFactory.SupportedOperation;
import org.apache.asterix.om.types.ARecordType;
import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
import org.apache.hyracks.api.context.IHyracksTaskContext;
diff --git a/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/PKGeneratingAdapterFactory.java b/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/PKGeneratingAdapterFactory.java
index 4f79d01..2930662 100644
--- a/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/PKGeneratingAdapterFactory.java
+++ b/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/PKGeneratingAdapterFactory.java
@@ -21,7 +21,7 @@
import java.util.Map;
import org.apache.asterix.common.feeds.api.IDatasourceAdapter;
-import org.apache.asterix.metadata.external.IAdapterFactory;
+import org.apache.asterix.external.adapter.factory.IAdapterFactory;
import org.apache.asterix.om.types.ARecordType;
import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
import org.apache.hyracks.api.context.IHyracksTaskContext;
diff --git a/asterix-metadata/src/main/java/org/apache/asterix/metadata/entitytupletranslators/ExternalFileTupleTranslator.java b/asterix-metadata/src/main/java/org/apache/asterix/metadata/entitytupletranslators/ExternalFileTupleTranslator.java
index 7095a7d..efb9318 100644
--- a/asterix-metadata/src/main/java/org/apache/asterix/metadata/entitytupletranslators/ExternalFileTupleTranslator.java
+++ b/asterix-metadata/src/main/java/org/apache/asterix/metadata/entitytupletranslators/ExternalFileTupleTranslator.java
@@ -26,11 +26,11 @@
import org.apache.asterix.common.config.DatasetConfig.ExternalFilePendingOp;
import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.external.indexing.ExternalFile;
import org.apache.asterix.formats.nontagged.AqlSerializerDeserializerProvider;
import org.apache.asterix.metadata.MetadataException;
import org.apache.asterix.metadata.bootstrap.MetadataPrimaryIndexes;
import org.apache.asterix.metadata.bootstrap.MetadataRecordTypes;
-import org.apache.asterix.metadata.entities.ExternalFile;
import org.apache.asterix.om.base.ADateTime;
import org.apache.asterix.om.base.AInt32;
import org.apache.asterix.om.base.AInt64;
diff --git a/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/AbstractFeedDatasourceAdapter.java b/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/AbstractFeedDatasourceAdapter.java
index 09ed3cf..a87d2fb 100644
--- a/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/AbstractFeedDatasourceAdapter.java
+++ b/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/AbstractFeedDatasourceAdapter.java
@@ -19,6 +19,7 @@
package org.apache.asterix.metadata.feeds;
import org.apache.asterix.common.feeds.api.IDatasourceAdapter;
+import org.apache.asterix.external.feeds.FeedPolicyEnforcer;
public abstract class AbstractFeedDatasourceAdapter implements IDatasourceAdapter {
diff --git a/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/ExternalDataScanOperatorDescriptor.java b/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/ExternalDataScanOperatorDescriptor.java
index 4321b6d..ba985bc 100644
--- a/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/ExternalDataScanOperatorDescriptor.java
+++ b/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/ExternalDataScanOperatorDescriptor.java
@@ -19,7 +19,7 @@
package org.apache.asterix.metadata.feeds;
import org.apache.asterix.common.feeds.api.IDatasourceAdapter;
-import org.apache.asterix.metadata.external.IAdapterFactory;
+import org.apache.asterix.external.adapter.factory.IAdapterFactory;
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.dataflow.IOperatorNodePushable;
import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
diff --git a/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedIntakeOperatorDescriptor.java b/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedIntakeOperatorDescriptor.java
index eed8b0d..486f45b 100644
--- a/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedIntakeOperatorDescriptor.java
+++ b/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedIntakeOperatorDescriptor.java
@@ -27,9 +27,10 @@
import org.apache.asterix.common.feeds.IngestionRuntime;
import org.apache.asterix.common.feeds.SubscribableFeedRuntimeId;
import org.apache.asterix.common.feeds.api.IFeedRuntime.FeedRuntimeType;
+import org.apache.asterix.external.adapter.factory.IFeedAdapterFactory;
+import org.apache.asterix.external.library.ExternalLibraryManager;
import org.apache.asterix.common.feeds.api.IFeedSubscriptionManager;
import org.apache.asterix.metadata.entities.PrimaryFeed;
-import org.apache.asterix.metadata.functions.ExternalLibraryManager;
import org.apache.asterix.om.types.ARecordType;
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.dataflow.IOperatorNodePushable;
diff --git a/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedIntakeOperatorNodePushable.java b/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedIntakeOperatorNodePushable.java
index ba5aac7..404d37c 100644
--- a/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedIntakeOperatorNodePushable.java
+++ b/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedIntakeOperatorNodePushable.java
@@ -36,6 +36,7 @@
import org.apache.asterix.common.feeds.api.IFeedAdapter;
import org.apache.asterix.common.feeds.api.IFeedManager;
import org.apache.asterix.common.feeds.api.IFeedRuntime.FeedRuntimeType;
+import org.apache.asterix.external.adapter.factory.IFeedAdapterFactory;
import org.apache.asterix.common.feeds.api.IFeedSubscriptionManager;
import org.apache.asterix.common.feeds.api.IIntakeProgressTracker;
import org.apache.asterix.common.feeds.api.ISubscriberRuntime;
diff --git a/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedMetaComputeNodePushable.java b/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedMetaComputeNodePushable.java
index d9d6bdf..f833019 100644
--- a/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedMetaComputeNodePushable.java
+++ b/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedMetaComputeNodePushable.java
@@ -34,6 +34,7 @@
import org.apache.asterix.common.feeds.api.IFeedManager;
import org.apache.asterix.common.feeds.api.IFeedRuntime.FeedRuntimeType;
import org.apache.asterix.common.feeds.api.IFeedRuntime.Mode;
+import org.apache.asterix.external.feeds.FeedPolicyEnforcer;
import org.apache.asterix.common.feeds.api.ISubscribableRuntime;
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.dataflow.IActivity;
diff --git a/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedMetaNodePushable.java b/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedMetaNodePushable.java
index 3f4b30d..86f8750 100644
--- a/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedMetaNodePushable.java
+++ b/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedMetaNodePushable.java
@@ -31,6 +31,7 @@
import org.apache.asterix.common.feeds.api.IFeedManager;
import org.apache.asterix.common.feeds.api.IFeedRuntime.FeedRuntimeType;
import org.apache.asterix.common.feeds.api.IFeedRuntime.Mode;
+import org.apache.asterix.external.feeds.FeedPolicyEnforcer;
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.dataflow.IActivity;
import org.apache.hyracks.api.dataflow.IOperatorDescriptor;
diff --git a/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedMetaStoreNodePushable.java b/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedMetaStoreNodePushable.java
index a18f4d8..b409745 100644
--- a/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedMetaStoreNodePushable.java
+++ b/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedMetaStoreNodePushable.java
@@ -32,6 +32,7 @@
import org.apache.asterix.common.feeds.api.IFeedManager;
import org.apache.asterix.common.feeds.api.IFeedRuntime.FeedRuntimeType;
import org.apache.asterix.common.feeds.api.IFeedRuntime.Mode;
+import org.apache.asterix.external.feeds.FeedPolicyEnforcer;
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.dataflow.IActivity;
import org.apache.hyracks.api.dataflow.IOperatorDescriptor;
diff --git a/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedUtil.java b/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedUtil.java
index e5e9df3..72b7c15 100644
--- a/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedUtil.java
+++ b/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedUtil.java
@@ -38,6 +38,9 @@
import org.apache.asterix.common.feeds.FeedRuntimeId;
import org.apache.asterix.common.feeds.api.IFeedRuntime.FeedRuntimeType;
import org.apache.asterix.common.functions.FunctionSignature;
+import org.apache.asterix.external.adapter.factory.IAdapterFactory;
+import org.apache.asterix.external.adapter.factory.IFeedAdapterFactory;
+import org.apache.asterix.external.library.ExternalLibraryManager;
import org.apache.asterix.metadata.MetadataException;
import org.apache.asterix.metadata.MetadataManager;
import org.apache.asterix.metadata.MetadataTransactionContext;
@@ -52,8 +55,6 @@
import org.apache.asterix.metadata.entities.Function;
import org.apache.asterix.metadata.entities.PrimaryFeed;
import org.apache.asterix.metadata.entities.SecondaryFeed;
-import org.apache.asterix.metadata.external.IAdapterFactory;
-import org.apache.asterix.metadata.functions.ExternalLibraryManager;
import org.apache.asterix.om.types.ARecordType;
import org.apache.asterix.om.types.ATypeTag;
import org.apache.asterix.om.types.IAType;
diff --git a/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/ITypedAdapterFactory.java b/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/ITypedAdapterFactory.java
index d5bf70f..7a26560 100644
--- a/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/ITypedAdapterFactory.java
+++ b/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/ITypedAdapterFactory.java
@@ -20,7 +20,7 @@
import java.util.Map;
-import org.apache.asterix.metadata.external.IAdapterFactory;
+import org.apache.asterix.external.adapter.factory.IAdapterFactory;
import org.apache.asterix.om.types.ARecordType;
public interface ITypedAdapterFactory extends IAdapterFactory {
diff --git a/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtils.java b/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtils.java
index dba29a4..f7a0e5d 100644
--- a/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtils.java
+++ b/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtils.java
@@ -29,6 +29,7 @@
import org.apache.asterix.common.config.DatasetConfig.DatasetType;
import org.apache.asterix.common.context.CorrelatedPrefixMergePolicyFactory;
import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.external.indexing.IndexingConstants;
import org.apache.asterix.formats.nontagged.AqlSerializerDeserializerProvider;
import org.apache.asterix.formats.nontagged.AqlTypeTraitProvider;
import org.apache.asterix.metadata.MetadataException;
@@ -37,8 +38,8 @@
import org.apache.asterix.metadata.bootstrap.MetadataConstants;
import org.apache.asterix.metadata.entities.CompactionPolicy;
import org.apache.asterix.metadata.entities.Dataset;
+import org.apache.asterix.metadata.entities.ExternalDatasetDetails;
import org.apache.asterix.metadata.entities.InternalDatasetDetails;
-import org.apache.asterix.metadata.external.IndexingConstants;
import org.apache.asterix.om.base.AMutableString;
import org.apache.asterix.om.base.AString;
import org.apache.asterix.om.types.ARecordType;
@@ -57,9 +58,8 @@
import org.apache.hyracks.storage.am.lsm.common.api.ILSMMergePolicyFactory;
public class DatasetUtils {
- public static IBinaryComparatorFactory[] computeKeysBinaryComparatorFactories(Dataset dataset,
- ARecordType itemType, IBinaryComparatorFactoryProvider comparatorFactoryProvider)
- throws AlgebricksException {
+ public static IBinaryComparatorFactory[] computeKeysBinaryComparatorFactories(Dataset dataset, ARecordType itemType,
+ IBinaryComparatorFactoryProvider comparatorFactoryProvider) throws AlgebricksException {
List<List<String>> partitioningKeys = getPartitioningKeys(dataset);
IBinaryComparatorFactory[] bcfs = new IBinaryComparatorFactory[partitioningKeys.size()];
if (dataset.getDatasetType() == DatasetType.EXTERNAL) {
@@ -139,7 +139,7 @@
public static List<List<String>> getPartitioningKeys(Dataset dataset) {
if (dataset.getDatasetType() == DatasetType.EXTERNAL) {
- return IndexingConstants.getRIDKeys(dataset);
+ return IndexingConstants.getRIDKeys(((ExternalDatasetDetails) dataset.getDatasetDetails()).getProperties());
}
return ((InternalDatasetDetails) dataset.getDatasetDetails()).getPartitioningKey();
}
@@ -150,7 +150,7 @@
public static IBinaryComparatorFactory[] computeFilterBinaryComparatorFactories(Dataset dataset,
ARecordType itemType, IBinaryComparatorFactoryProvider comparatorFactoryProvider)
- throws AlgebricksException {
+ throws AlgebricksException {
if (dataset.getDatasetType() == DatasetType.EXTERNAL) {
return null;
}
diff --git a/asterix-tools/src/main/java/org/apache/asterix/tools/external/data/ExternalFilesIndexOperatorDescriptor.java b/asterix-tools/src/main/java/org/apache/asterix/tools/external/data/ExternalFilesIndexOperatorDescriptor.java
index f4b3392..bc73eba 100644
--- a/asterix-tools/src/main/java/org/apache/asterix/tools/external/data/ExternalFilesIndexOperatorDescriptor.java
+++ b/asterix-tools/src/main/java/org/apache/asterix/tools/external/data/ExternalFilesIndexOperatorDescriptor.java
@@ -22,9 +22,9 @@
import java.util.List;
import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.external.indexing.ExternalFile;
+import org.apache.asterix.external.indexing.FilesIndexDescription;
import org.apache.asterix.external.indexing.dataflow.FileIndexTupleTranslator;
-import org.apache.asterix.metadata.entities.ExternalFile;
-import org.apache.asterix.metadata.external.FilesIndexDescription;
import org.apache.hyracks.api.comm.IFrameWriter;
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.dataflow.IOperatorNodePushable;
diff --git a/asterix-tools/src/main/java/org/apache/asterix/tools/external/data/GenericSocketFeedAdapterFactory.java b/asterix-tools/src/main/java/org/apache/asterix/tools/external/data/GenericSocketFeedAdapterFactory.java
index 2838bd4..5d28f3d 100644
--- a/asterix-tools/src/main/java/org/apache/asterix/tools/external/data/GenericSocketFeedAdapterFactory.java
+++ b/asterix-tools/src/main/java/org/apache/asterix/tools/external/data/GenericSocketFeedAdapterFactory.java
@@ -29,9 +29,9 @@
import org.apache.asterix.common.feeds.api.IDatasourceAdapter;
import org.apache.asterix.common.feeds.api.IIntakeProgressTracker;
+import org.apache.asterix.external.adapter.factory.IFeedAdapterFactory;
import org.apache.asterix.external.adapter.factory.StreamBasedAdapterFactory;
-import org.apache.asterix.metadata.feeds.IFeedAdapterFactory;
-import org.apache.asterix.metadata.entities.ExternalFile;
+import org.apache.asterix.external.indexing.ExternalFile;
import org.apache.asterix.om.types.ARecordType;
import org.apache.asterix.om.util.AsterixRuntimeUtil;
import org.apache.asterix.runtime.operators.file.AsterixTupleParserFactory.InputDataFormat;
diff --git a/asterix-tools/src/main/java/org/apache/asterix/tools/external/data/RateControlledFileSystemBasedAdapterFactory.java b/asterix-tools/src/main/java/org/apache/asterix/tools/external/data/RateControlledFileSystemBasedAdapterFactory.java
index 82f9c7b..a8c77ac 100644
--- a/asterix-tools/src/main/java/org/apache/asterix/tools/external/data/RateControlledFileSystemBasedAdapterFactory.java
+++ b/asterix-tools/src/main/java/org/apache/asterix/tools/external/data/RateControlledFileSystemBasedAdapterFactory.java
@@ -24,11 +24,11 @@
import org.apache.asterix.common.feeds.api.IDatasourceAdapter;
import org.apache.asterix.common.feeds.api.IIntakeProgressTracker;
import org.apache.asterix.external.adapter.factory.HDFSAdapterFactory;
+import org.apache.asterix.external.adapter.factory.IAdapterFactory;
+import org.apache.asterix.external.adapter.factory.IFeedAdapterFactory;
import org.apache.asterix.external.adapter.factory.NCFileSystemAdapterFactory;
import org.apache.asterix.external.adapter.factory.StreamBasedAdapterFactory;
import org.apache.asterix.external.dataset.adapter.FileSystemBasedAdapter;
-import org.apache.asterix.metadata.external.IAdapterFactory;
-import org.apache.asterix.metadata.feeds.IFeedAdapterFactory;
import org.apache.asterix.om.types.ARecordType;
import org.apache.asterix.runtime.operators.file.AsterixTupleParserFactory.InputDataFormat;
import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
diff --git a/asterix-tools/src/main/java/org/apache/asterix/tools/external/data/SocketClientAdapterFactory.java b/asterix-tools/src/main/java/org/apache/asterix/tools/external/data/SocketClientAdapterFactory.java
index 4234a88..3d2f5af 100644
--- a/asterix-tools/src/main/java/org/apache/asterix/tools/external/data/SocketClientAdapterFactory.java
+++ b/asterix-tools/src/main/java/org/apache/asterix/tools/external/data/SocketClientAdapterFactory.java
@@ -22,7 +22,7 @@
import org.apache.asterix.common.feeds.api.IDatasourceAdapter;
import org.apache.asterix.common.feeds.api.IIntakeProgressTracker;
-import org.apache.asterix.metadata.feeds.IFeedAdapterFactory;
+import org.apache.asterix.external.adapter.factory.IFeedAdapterFactory;
import org.apache.asterix.om.types.ARecordType;
import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
import org.apache.hyracks.algebricks.common.utils.Pair;
diff --git a/asterix-tools/src/main/java/org/apache/asterix/tools/external/data/TwitterFirehoseFeedAdapterFactory.java b/asterix-tools/src/main/java/org/apache/asterix/tools/external/data/TwitterFirehoseFeedAdapterFactory.java
index e9004c4..f7e79f7 100644
--- a/asterix-tools/src/main/java/org/apache/asterix/tools/external/data/TwitterFirehoseFeedAdapterFactory.java
+++ b/asterix-tools/src/main/java/org/apache/asterix/tools/external/data/TwitterFirehoseFeedAdapterFactory.java
@@ -24,8 +24,8 @@
import org.apache.asterix.common.feeds.api.IDatasourceAdapter;
import org.apache.asterix.common.feeds.api.IIntakeProgressTracker;
+import org.apache.asterix.external.adapter.factory.IFeedAdapterFactory;
import org.apache.asterix.external.adapter.factory.StreamBasedAdapterFactory;
-import org.apache.asterix.metadata.feeds.IFeedAdapterFactory;
import org.apache.asterix.om.types.ARecordType;
import org.apache.asterix.om.util.AsterixClusterProperties;
import org.apache.asterix.runtime.operators.file.AsterixTupleParserFactory;