Introduces Feeds 2.0
commit c3f577861fc705d848c1641605689cadd6973bae
Merge: ebc4cae fc0c2c0
Author: ramangrover29 <ramangrover29@gmail.com>
Date: Fri Jun 26 13:04:05 2015 -0700
Merge branch 'raman/feeds_2_release' of https://code.google.com/p/asterixdb-sandbox into raman/feeds_2_release
Conflicts:
asterix-app/src/main/java/edu/uci/ics/asterix/api/http/servlet/FeedServlet.java
asterix-external-data/src/main/java/edu/uci/ics/asterix/external/library/java/JObjectAccessors.java
commit ebc4cae21a7302869f953df1ebda601e798d12d2
Author: ramangrover29 <ramangrover29@gmail.com>
Date: Sat Jun 20 17:14:45 2015 -0700
Introduces Feeds 2.0
Some of the prominent chnages introduced are as follows
a) Support for building a cascade network of feeds (via secondary feeds feature)
b) Feed Management Console for tracking active feeds and associated metrics
c) Support for elastic runtime for data ingestion
d) Improved fault-tolerance with support for logging of failed records
Documentation has been added at asterix-doc/src/site/markdown/feeds/
commit fc0c2c0549a6ee8b202e57607d2e110478cd57bb
Author: ramangrover29 <ramangrover29@gmail.com>
Date: Sat Jun 20 17:14:45 2015 -0700
Introduces Feeds 2.0
Some of the prominent chnages introduced are as follows
a) Support for building a cascade network of feeds (via secondary feeds feature)
b) Feed Management Console for tracking active feeds and associated metrics
c) Support for elastic runtime for data ingestion
d) Improved fault-tolerance with support for logging of failed records
Documentation has been added at asterix-doc/src/site/markdown/feeds/
Change-Id: I498f01c591a229aaf51cec43ab20f3e5c4f072f4
Reviewed-on: https://asterix-gerrit.ics.uci.edu/297
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Steven Jacobs <sjaco002@ucr.edu>
Reviewed-by: abdullah alamoudi <bamousaa@gmail.com>
diff --git a/asterix-external-data/pom.xml b/asterix-external-data/pom.xml
index 0b045f1..7d64b61 100644
--- a/asterix-external-data/pom.xml
+++ b/asterix-external-data/pom.xml
@@ -136,9 +136,14 @@
<dependency>
<groupId>org.twitter4j</groupId>
<artifactId>twitter4j-core</artifactId>
- <version>2.2.3</version>
+ <version>[3.0,)</version>
</dependency>
<dependency>
+ <groupId>org.twitter4j</groupId>
+ <artifactId>twitter4j-stream</artifactId>
+ <version>4.0.2</version>
+ </dependency>
+ <dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<type>jar</type>
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/CNNFeedAdapterFactory.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/CNNFeedAdapterFactory.java
index fa66715..e4f118f 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/CNNFeedAdapterFactory.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/CNNFeedAdapterFactory.java
@@ -19,12 +19,12 @@
import java.util.List;
import java.util.Map;
+import edu.uci.ics.asterix.common.feeds.FeedPolicyAccessor;
+import edu.uci.ics.asterix.common.feeds.api.IDatasourceAdapter;
+import edu.uci.ics.asterix.common.feeds.api.IIntakeProgressTracker;
import edu.uci.ics.asterix.external.dataset.adapter.RSSFeedAdapter;
-import edu.uci.ics.asterix.metadata.feeds.IDatasourceAdapter;
-import edu.uci.ics.asterix.metadata.feeds.ITypedAdapterFactory;
+import edu.uci.ics.asterix.metadata.feeds.IFeedAdapterFactory;
import edu.uci.ics.asterix.om.types.ARecordType;
-import edu.uci.ics.asterix.om.types.BuiltinType;
-import edu.uci.ics.asterix.om.types.IAType;
import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksCountPartitionConstraint;
import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
@@ -32,7 +32,7 @@
/**
* A factory class for creating the @see {CNNFeedAdapter}.
*/
-public class CNNFeedAdapterFactory implements ITypedAdapterFactory {
+public class CNNFeedAdapterFactory implements IFeedAdapterFactory {
private static final long serialVersionUID = 1L;
private Map<String, String> configuration;
@@ -40,6 +40,7 @@
private List<String> feedURLs = new ArrayList<String>();
private static Map<String, String> topicFeeds = new HashMap<String, String>();
private ARecordType recordType;
+ private FeedPolicyAccessor policyAccessor;
public static final String KEY_RSS_URL = "topic";
public static final String KEY_INTERVAL = "interval";
@@ -91,22 +92,14 @@
}
@Override
- public AdapterType getAdapterType() {
- return AdapterType.TYPED;
- }
-
- @Override
- public void configure(Map<String, String> configuration) throws Exception {
+ public void configure(Map<String, String> configuration, ARecordType outputType) throws Exception {
this.configuration = configuration;
String rssURLProperty = (String) configuration.get(KEY_RSS_URL);
if (rssURLProperty == null) {
throw new IllegalArgumentException("no rss url provided");
}
initializeFeedURLs(rssURLProperty);
- recordType = new ARecordType("FeedRecordType", new String[] { "id", "title", "description", "link" },
- new IAType[] { BuiltinType.ASTRING, BuiltinType.ASTRING, BuiltinType.ASTRING, BuiltinType.ASTRING },
- false);
-
+ this.recordType = outputType;
}
private void initializeFeedURLs(String rssURLProperty) {
@@ -147,4 +140,14 @@
return recordType;
}
+ @Override
+ public boolean isRecordTrackingEnabled() {
+ return false;
+ }
+
+ @Override
+ public IIntakeProgressTracker createIntakeProgressTracker() {
+ return null;
+ }
+
}
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/HDFSAdapterFactory.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/HDFSAdapterFactory.java
index dbf3fd7..ee481de 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/HDFSAdapterFactory.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/HDFSAdapterFactory.java
@@ -31,15 +31,17 @@
import org.apache.hadoop.mapred.JobConf;
import edu.uci.ics.asterix.common.config.DatasetConfig.ExternalFilePendingOp;
+import edu.uci.ics.asterix.common.feeds.api.IDatasourceAdapter;
import edu.uci.ics.asterix.external.dataset.adapter.HDFSAdapter;
import edu.uci.ics.asterix.external.indexing.dataflow.HDFSObjectTupleParserFactory;
import edu.uci.ics.asterix.metadata.entities.ExternalFile;
-import edu.uci.ics.asterix.metadata.feeds.IDatasourceAdapter;
-import edu.uci.ics.asterix.metadata.feeds.IGenericAdapterFactory;
+import edu.uci.ics.asterix.metadata.external.IAdapterFactory;
import edu.uci.ics.asterix.om.types.ARecordType;
import edu.uci.ics.asterix.om.types.IAType;
import edu.uci.ics.asterix.om.util.AsterixAppContextInfo;
import edu.uci.ics.asterix.om.util.AsterixClusterProperties;
+import edu.uci.ics.asterix.runtime.operators.file.AsterixTupleParserFactory;
+import edu.uci.ics.asterix.runtime.operators.file.AsterixTupleParserFactory.InputDataFormat;
import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
import edu.uci.ics.hyracks.api.context.ICCContext;
@@ -53,7 +55,7 @@
/**
* A factory class for creating an instance of HDFSAdapter
*/
-public class HDFSAdapterFactory extends StreamBasedAdapterFactory implements IGenericAdapterFactory {
+public class HDFSAdapterFactory extends StreamBasedAdapterFactory implements IAdapterFactory {
private static final long serialVersionUID = 1L;
public static final String HDFS_ADAPTER_NAME = "hdfs";
@@ -80,6 +82,7 @@
private boolean configured = false;
public static Scheduler hdfsScheduler;
private static boolean initialized = false;
+ protected List<ExternalFile> files;
private static Scheduler initializeHDFSScheduler() {
ICCContext ccContext = AsterixAppContextInfo.getInstance().getCCApplicationContext().getCCContext();
@@ -137,11 +140,6 @@
}
@Override
- public AdapterType getAdapterType() {
- return AdapterType.GENERIC;
- }
-
- @Override
public AlgebricksPartitionConstraint getPartitionConstraint() throws Exception {
if (!configured) {
throw new IllegalStateException("Adapter factory has not been configured yet");
@@ -203,25 +201,41 @@
return new AlgebricksAbsolutePartitionConstraint(cluster);
}
+ @Override
+ public ARecordType getAdapterOutputType() {
+ return (ARecordType) atype;
+ }
+
+ @Override
+ public InputDataFormat getInputDataFormat() {
+ return InputDataFormat.UNKNOWN;
+ }
+
/*
* This method is overridden to do the following:
* if data is text data (adm or delimited text), it will use a text tuple parser,
* otherwise it will use hdfs record object parser
*/
protected void configureFormat(IAType sourceDatatype) throws Exception {
- String specifiedFormat = (String) configuration.get(KEY_FORMAT);
- if (specifiedFormat == null) {
- throw new IllegalArgumentException(" Unspecified data format");
- } else if (FORMAT_DELIMITED_TEXT.equalsIgnoreCase(specifiedFormat)) {
- parserFactory = getDelimitedDataTupleParserFactory((ARecordType) sourceDatatype, false);
- } else if (FORMAT_ADM.equalsIgnoreCase((String) configuration.get(KEY_FORMAT))) {
- parserFactory = getADMDataTupleParserFactory((ARecordType) sourceDatatype, false);
- } else if (FORMAT_BINARY.equalsIgnoreCase((String) configuration.get(KEY_FORMAT))) {
- parserFactory = new HDFSObjectTupleParserFactory((ARecordType) atype, this, configuration);
- } else {
- throw new IllegalArgumentException(" format " + configuration.get(KEY_FORMAT) + " not supported");
- }
- }
+ String specifiedFormat = (String) configuration.get(AsterixTupleParserFactory.KEY_FORMAT);
+ if (specifiedFormat == null) {
+ throw new IllegalArgumentException(" Unspecified data format");
+ }
+
+ if(AsterixTupleParserFactory.FORMAT_BINARY.equalsIgnoreCase(specifiedFormat)){
+ parserFactory = new HDFSObjectTupleParserFactory((ARecordType) atype, this, configuration);
+ } else {
+ InputDataFormat inputFormat = InputDataFormat.UNKNOWN;
+ if (AsterixTupleParserFactory.FORMAT_DELIMITED_TEXT.equalsIgnoreCase(specifiedFormat)) {
+ inputFormat = InputDataFormat.DELIMITED;
+ } else if (AsterixTupleParserFactory.FORMAT_ADM.equalsIgnoreCase(specifiedFormat)) {
+ inputFormat = InputDataFormat.ADM;
+ }
+ parserFactory = new AsterixTupleParserFactory(configuration, (ARecordType) sourceDatatype
+ , inputFormat);
+ }
+
+ }
/**
* Instead of creating the split using the input format, we do it manually
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/HDFSIndexingAdapterFactory.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/HDFSIndexingAdapterFactory.java
index 2a4836c..37b8050 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/HDFSIndexingAdapterFactory.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/HDFSIndexingAdapterFactory.java
@@ -16,22 +16,24 @@
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
+import edu.uci.ics.asterix.common.feeds.api.IDatasourceAdapter;
import edu.uci.ics.asterix.external.dataset.adapter.HDFSIndexingAdapter;
import edu.uci.ics.asterix.external.indexing.dataflow.HDFSIndexingParserFactory;
import edu.uci.ics.asterix.external.indexing.dataflow.IndexingScheduler;
-import edu.uci.ics.asterix.metadata.feeds.IDatasourceAdapter;
import edu.uci.ics.asterix.om.types.ARecordType;
import edu.uci.ics.asterix.om.types.ATypeTag;
import edu.uci.ics.asterix.om.types.AUnionType;
import edu.uci.ics.asterix.om.types.IAType;
import edu.uci.ics.asterix.om.util.AsterixAppContextInfo;
import edu.uci.ics.asterix.om.util.AsterixClusterProperties;
+import edu.uci.ics.asterix.runtime.operators.file.AsterixTupleParserFactory;
import edu.uci.ics.asterix.runtime.operators.file.DelimitedDataParser;
import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
@@ -39,7 +41,12 @@
import edu.uci.ics.hyracks.api.context.ICCContext;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.api.exceptions.HyracksException;
+import edu.uci.ics.hyracks.dataflow.common.data.parsers.DoubleParserFactory;
+import edu.uci.ics.hyracks.dataflow.common.data.parsers.FloatParserFactory;
import edu.uci.ics.hyracks.dataflow.common.data.parsers.IValueParserFactory;
+import edu.uci.ics.hyracks.dataflow.common.data.parsers.IntegerParserFactory;
+import edu.uci.ics.hyracks.dataflow.common.data.parsers.LongParserFactory;
+import edu.uci.ics.hyracks.dataflow.common.data.parsers.UTF8StringParserFactory;
import edu.uci.ics.hyracks.hdfs.dataflow.ConfFactory;
import edu.uci.ics.hyracks.hdfs.dataflow.InputSplitsFactory;
@@ -83,11 +90,6 @@
}
@Override
- public AdapterType getAdapterType() {
- return AdapterType.GENERIC;
- }
-
- @Override
public AlgebricksPartitionConstraint getPartitionConstraint() throws Exception {
if (!configured) {
throw new IllegalStateException("Adapter factory has not been configured yet");
@@ -104,7 +106,8 @@
((HDFSIndexingParserFactory) parserFactory).setArguments(configuration);
HDFSIndexingAdapter hdfsIndexingAdapter = new HDFSIndexingAdapter(atype, readSchedule, executed, inputSplits,
conf, clusterLocations, files, parserFactory, ctx, nodeName,
- (String) configuration.get(HDFSAdapterFactory.KEY_INPUT_FORMAT), (String) configuration.get(KEY_FORMAT));
+ (String) configuration.get(HDFSAdapterFactory.KEY_INPUT_FORMAT),
+ (String) configuration.get(AsterixTupleParserFactory.KEY_FORMAT));
return hdfsIndexingAdapter;
}
@@ -131,12 +134,13 @@
protected void configureFormat(IAType sourceDatatype) throws Exception {
- char delimiter = StreamBasedAdapterFactory.getDelimiter(configuration);
- char quote = StreamBasedAdapterFactory.getQuote(configuration, delimiter);
+ char delimiter = AsterixTupleParserFactory.getDelimiter(configuration);
+ char quote = AsterixTupleParserFactory.getQuote(configuration, delimiter);
parserFactory = new HDFSIndexingParserFactory((ARecordType) atype,
- configuration.get(HDFSAdapterFactory.KEY_INPUT_FORMAT), configuration.get(KEY_FORMAT), delimiter,
- quote, configuration.get(HDFSAdapterFactory.KEY_PARSER));
+ configuration.get(HDFSAdapterFactory.KEY_INPUT_FORMAT),
+ configuration.get(AsterixTupleParserFactory.KEY_FORMAT), delimiter, quote,
+ configuration.get(HDFSAdapterFactory.KEY_PARSER));
}
/**
@@ -165,7 +169,7 @@
if (tag == null) {
throw new NotImplementedException("Failed to get the type information for field " + i + ".");
}
- IValueParserFactory vpf = typeToValueParserFactMap.get(tag);
+ IValueParserFactory vpf = valueParserFactoryMap.get(tag);
if (vpf == null) {
throw new NotImplementedException("No value parser factory for delimited fields of type " + tag);
}
@@ -190,4 +194,16 @@
cluster = locs.toArray(cluster);
return new AlgebricksAbsolutePartitionConstraint(cluster);
}
+
+ private static Map<ATypeTag, IValueParserFactory> valueParserFactoryMap = initializeValueParserFactoryMap();
+
+ private static Map<ATypeTag, IValueParserFactory> initializeValueParserFactoryMap() {
+ Map<ATypeTag, IValueParserFactory> m = new HashMap<ATypeTag, IValueParserFactory>();
+ m.put(ATypeTag.INT32, IntegerParserFactory.INSTANCE);
+ m.put(ATypeTag.FLOAT, FloatParserFactory.INSTANCE);
+ m.put(ATypeTag.DOUBLE, DoubleParserFactory.INSTANCE);
+ m.put(ATypeTag.INT64, LongParserFactory.INSTANCE);
+ m.put(ATypeTag.STRING, UTF8StringParserFactory.INSTANCE);
+ return m;
+ }
}
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/HiveAdapterFactory.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/HiveAdapterFactory.java
index ab59241..c5ebe67 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/HiveAdapterFactory.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/HiveAdapterFactory.java
@@ -17,20 +17,22 @@
import java.util.List;
import java.util.Map;
+import edu.uci.ics.asterix.common.feeds.api.IDatasourceAdapter;
import edu.uci.ics.asterix.external.dataset.adapter.HDFSAdapter;
import edu.uci.ics.asterix.external.dataset.adapter.HiveAdapter;
import edu.uci.ics.asterix.metadata.entities.ExternalFile;
-import edu.uci.ics.asterix.metadata.feeds.IDatasourceAdapter;
-import edu.uci.ics.asterix.metadata.feeds.IGenericAdapterFactory;
+import edu.uci.ics.asterix.metadata.external.IAdapterFactory;
import edu.uci.ics.asterix.om.types.ARecordType;
import edu.uci.ics.asterix.om.types.IAType;
+import edu.uci.ics.asterix.runtime.operators.file.AsterixTupleParserFactory;
+import edu.uci.ics.asterix.runtime.operators.file.AsterixTupleParserFactory.InputDataFormat;
import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
/**
* A factory class for creating an instance of HiveAdapter
*/
-public class HiveAdapterFactory extends StreamBasedAdapterFactory implements IGenericAdapterFactory {
+public class HiveAdapterFactory extends StreamBasedAdapterFactory implements IAdapterFactory {
private static final long serialVersionUID = 1L;
public static final String HIVE_DATABASE = "database";
@@ -61,11 +63,7 @@
return "hive";
}
- @Override
- public AdapterType getAdapterType() {
- return AdapterType.GENERIC;
- }
-
+
@Override
public SupportedOperation getSupportedOperations() {
return SupportedOperation.READ;
@@ -76,6 +74,7 @@
if (!configured) {
populateConfiguration(configuration);
hdfsAdapterFactory.configure(configuration, outputType);
+ this.atype = (IAType) outputType;
}
}
@@ -90,8 +89,8 @@
+ configuration.get(HIVE_TABLE);
}
configuration.put(HDFSAdapterFactory.KEY_PATH, tablePath);
- if (!configuration.get(KEY_FORMAT).equals(FORMAT_DELIMITED_TEXT)) {
- throw new IllegalArgumentException("format" + configuration.get(KEY_FORMAT) + " is not supported");
+ if (!configuration.get(AsterixTupleParserFactory.KEY_FORMAT).equals(AsterixTupleParserFactory.FORMAT_DELIMITED_TEXT)) {
+ throw new IllegalArgumentException("format" + configuration.get(AsterixTupleParserFactory.KEY_FORMAT) + " is not supported");
}
if (!(configuration.get(HDFSAdapterFactory.KEY_INPUT_FORMAT).equals(HDFSAdapterFactory.INPUT_FORMAT_TEXT) || configuration
@@ -106,9 +105,19 @@
return hdfsAdapterFactory.getPartitionConstraint();
}
+
@Override
+ public ARecordType getAdapterOutputType() {
+ return (ARecordType) atype;
+ }
+
+ @Override
+ public InputDataFormat getInputDataFormat() {
+ return InputDataFormat.UNKNOWN;
+ }
+
public void setFiles(List<ExternalFile> files) {
- this.files = files;
+ hdfsAdapterFactory.setFiles(files);
}
}
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/NCFileSystemAdapterFactory.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/NCFileSystemAdapterFactory.java
index 110a965..446199e 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/NCFileSystemAdapterFactory.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/NCFileSystemAdapterFactory.java
@@ -20,15 +20,17 @@
import java.util.logging.Level;
import edu.uci.ics.asterix.common.exceptions.AsterixException;
+import edu.uci.ics.asterix.common.feeds.api.IDatasourceAdapter;
import edu.uci.ics.asterix.external.dataset.adapter.NCFileSystemAdapter;
import edu.uci.ics.asterix.external.util.DNSResolverFactory;
import edu.uci.ics.asterix.external.util.INodeResolver;
import edu.uci.ics.asterix.external.util.INodeResolverFactory;
import edu.uci.ics.asterix.metadata.entities.ExternalFile;
-import edu.uci.ics.asterix.metadata.feeds.IDatasourceAdapter;
-import edu.uci.ics.asterix.metadata.feeds.IGenericAdapterFactory;
+import edu.uci.ics.asterix.metadata.external.IAdapterFactory;
import edu.uci.ics.asterix.om.types.ARecordType;
import edu.uci.ics.asterix.om.types.IAType;
+import edu.uci.ics.asterix.runtime.operators.file.AsterixTupleParserFactory;
+import edu.uci.ics.asterix.runtime.operators.file.AsterixTupleParserFactory.InputDataFormat;
import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
@@ -41,7 +43,7 @@
* NCFileSystemAdapter reads external data residing on the local file system of
* an NC.
*/
-public class NCFileSystemAdapterFactory extends StreamBasedAdapterFactory implements IGenericAdapterFactory {
+public class NCFileSystemAdapterFactory extends StreamBasedAdapterFactory implements IAdapterFactory {
private static final long serialVersionUID = 1L;
public static final String NC_FILE_SYSTEM_ADAPTER_NAME = "localfs";
@@ -50,6 +52,8 @@
private IAType sourceDatatype;
private FileSplit[] fileSplits;
+ private ARecordType outputType;
+
@Override
public IDatasourceAdapter createAdapter(IHyracksTaskContext ctx, int partition) throws Exception {
@@ -62,10 +66,6 @@
return NC_FILE_SYSTEM_ADAPTER_NAME;
}
- @Override
- public AdapterType getAdapterType() {
- return AdapterType.GENERIC;
- }
@Override
public SupportedOperation getSupportedOperations() {
@@ -75,7 +75,8 @@
@Override
public void configure(Map<String, String> configuration, ARecordType outputType) throws Exception {
this.configuration = configuration;
- String[] splits = ((String) configuration.get(KEY_PATH)).split(",");
+ this.outputType = outputType;
+ String[] splits = ((String) configuration.get(AsterixTupleParserFactory.KEY_PATH)).split(",");
IAType sourceDatatype = (IAType) outputType;
configureFileSplits(splits);
configureFormat(sourceDatatype);
@@ -127,7 +128,7 @@
private static INodeResolver initializeNodeResolver() {
INodeResolver nodeResolver = null;
- String configuredNodeResolverFactory = System.getProperty(NODE_RESOLVER_FACTORY_PROPERTY);
+ String configuredNodeResolverFactory = System.getProperty(AsterixTupleParserFactory.NODE_RESOLVER_FACTORY_PROPERTY);
if (configuredNodeResolverFactory != null) {
try {
nodeResolver = ((INodeResolverFactory) (Class.forName(configuredNodeResolverFactory).newInstance()))
@@ -145,8 +146,17 @@
}
return nodeResolver;
}
-
+
@Override
+ public ARecordType getAdapterOutputType() {
+ return outputType;
+ }
+
+ @Override
+ public InputDataFormat getInputDataFormat() {
+ return InputDataFormat.UNKNOWN;
+ }
+
public void setFiles(List<ExternalFile> files) throws AlgebricksException {
throw new AlgebricksException("can't set files for this Adapter");
}
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/PullBasedAzureTwitterAdapterFactory.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/PullBasedAzureTwitterAdapterFactory.java
index b4dbe13..298a443 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/PullBasedAzureTwitterAdapterFactory.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/PullBasedAzureTwitterAdapterFactory.java
@@ -3,12 +3,14 @@
import java.util.Map;
import edu.uci.ics.asterix.common.exceptions.AsterixException;
+import edu.uci.ics.asterix.common.feeds.FeedPolicyAccessor;
+import edu.uci.ics.asterix.common.feeds.api.IDatasourceAdapter;
+import edu.uci.ics.asterix.common.feeds.api.IIntakeProgressTracker;
import edu.uci.ics.asterix.external.dataset.adapter.PullBasedAzureTwitterAdapter;
import edu.uci.ics.asterix.metadata.MetadataManager;
import edu.uci.ics.asterix.metadata.MetadataTransactionContext;
import edu.uci.ics.asterix.metadata.entities.Datatype;
-import edu.uci.ics.asterix.metadata.feeds.IDatasourceAdapter;
-import edu.uci.ics.asterix.metadata.feeds.ITypedAdapterFactory;
+import edu.uci.ics.asterix.metadata.feeds.IFeedAdapterFactory;
import edu.uci.ics.asterix.om.types.ARecordType;
import edu.uci.ics.asterix.om.types.ATypeTag;
import edu.uci.ics.asterix.om.types.IAType;
@@ -16,7 +18,7 @@
import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
-public class PullBasedAzureTwitterAdapterFactory implements ITypedAdapterFactory {
+public class PullBasedAzureTwitterAdapterFactory implements IFeedAdapterFactory {
private static final long serialVersionUID = 1L;
@@ -27,13 +29,14 @@
private static final String ACCOUNT_NAME_KEY = "account-name";
private static final String ACCOUNT_KEY_KEY = "account-key";
- private ARecordType recordType;
+ private ARecordType outputType;
private Map<String, String> configuration;
private String tableName;
private String azureAccountName;
private String azureAccountKey;
private String[] locations;
private String[] partitions;
+ private FeedPolicyAccessor ingestionPolicy;
@Override
public SupportedOperation getSupportedOperations() {
@@ -46,11 +49,6 @@
}
@Override
- public AdapterType getAdapterType() {
- return AdapterType.TYPED;
- }
-
- @Override
public AlgebricksPartitionConstraint getPartitionConstraint() throws Exception {
String locationsStr = configuration.get(INGESTOR_LOCATIONS_KEY);
if (locationsStr == null) {
@@ -63,17 +61,18 @@
@Override
public IDatasourceAdapter createAdapter(IHyracksTaskContext ctx, int partition) throws Exception {
return new PullBasedAzureTwitterAdapter(azureAccountName, azureAccountKey, tableName, partitions,
- configuration, ctx, recordType);
+ configuration, ctx, outputType);
}
@Override
public ARecordType getAdapterOutputType() {
- return recordType;
+ return outputType;
}
@Override
- public void configure(Map<String, String> configuration) throws Exception {
+ public void configure(Map<String, String> configuration, ARecordType outputType) throws Exception {
this.configuration = configuration;
+ this.outputType = outputType;
tableName = configuration.get(TABLE_NAME_KEY);
if (tableName == null) {
@@ -125,7 +124,7 @@
if (type.getTypeTag() != ATypeTag.RECORD) {
throw new IllegalStateException();
}
- recordType = (ARecordType) t.getDatatype();
+ outputType = (ARecordType) t.getDatatype();
MetadataManager.INSTANCE.commitTransaction(ctx);
} catch (Exception e) {
if (ctx != null) {
@@ -136,4 +135,21 @@
MetadataManager.INSTANCE.releaseReadLatch();
}
}
+
+ @Override
+ public boolean isRecordTrackingEnabled() {
+ return false;
+ }
+
+ @Override
+ public IIntakeProgressTracker createIntakeProgressTracker() {
+ return null;
+ }
+
+ public FeedPolicyAccessor getIngestionPolicy() {
+ return ingestionPolicy;
+ }
+
+
+
}
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/PullBasedTwitterAdapterFactory.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/PullBasedTwitterAdapterFactory.java
index 6058bd2..c43ca05 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/PullBasedTwitterAdapterFactory.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/PullBasedTwitterAdapterFactory.java
@@ -15,13 +15,18 @@
package edu.uci.ics.asterix.external.adapter.factory;
import java.util.Map;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+import edu.uci.ics.asterix.common.exceptions.AsterixException;
+import edu.uci.ics.asterix.common.feeds.FeedPolicyAccessor;
+import edu.uci.ics.asterix.common.feeds.api.IDatasourceAdapter;
+import edu.uci.ics.asterix.common.feeds.api.IIntakeProgressTracker;
import edu.uci.ics.asterix.external.dataset.adapter.PullBasedTwitterAdapter;
-import edu.uci.ics.asterix.metadata.feeds.IDatasourceAdapter;
-import edu.uci.ics.asterix.metadata.feeds.ITypedAdapterFactory;
+import edu.uci.ics.asterix.external.util.TwitterUtil;
+import edu.uci.ics.asterix.external.util.TwitterUtil.SearchAPIConstants;
+import edu.uci.ics.asterix.metadata.feeds.IFeedAdapterFactory;
import edu.uci.ics.asterix.om.types.ARecordType;
-import edu.uci.ics.asterix.om.types.BuiltinType;
-import edu.uci.ics.asterix.om.types.IAType;
import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksCountPartitionConstraint;
import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
@@ -31,29 +36,22 @@
* This adapter provides the functionality of fetching tweets from Twitter service
* via pull-based Twitter API.
*/
-public class PullBasedTwitterAdapterFactory implements ITypedAdapterFactory {
+public class PullBasedTwitterAdapterFactory implements IFeedAdapterFactory {
private static final long serialVersionUID = 1L;
+ private static final Logger LOGGER = Logger.getLogger(PullBasedTwitterAdapterFactory.class.getName());
+
public static final String PULL_BASED_TWITTER_ADAPTER_NAME = "pull_twitter";
- private Map<String, String> configuration;
- private static ARecordType recordType = initOutputType();
+ private static final String DEFAULT_INTERVAL = "10"; // 10 seconds
+ private static final int INTAKE_CARDINALITY = 1; // degree of parallelism at intake stage
- private static ARecordType initOutputType() {
- ARecordType recordType = null;
- String[] fieldNames = { "id", "username", "location", "text", "timestamp" };
- IAType[] fieldTypes = { BuiltinType.ASTRING, BuiltinType.ASTRING, BuiltinType.ASTRING, BuiltinType.ASTRING,
- BuiltinType.ASTRING };
- try {
- recordType = new ARecordType("TweetType", fieldNames, fieldTypes, false);
- } catch (Exception e) {
- throw new IllegalStateException("Unable to create adapter output type");
- }
- return recordType;
- }
+ private ARecordType outputType;
+
+ private Map<String, String> configuration;
@Override
public IDatasourceAdapter createAdapter(IHyracksTaskContext ctx, int partition) throws Exception {
- return new PullBasedTwitterAdapter(configuration, recordType, ctx);
+ return new PullBasedTwitterAdapter(configuration, outputType, ctx);
}
@Override
@@ -62,28 +60,57 @@
}
@Override
- public AdapterType getAdapterType() {
- return AdapterType.TYPED;
- }
-
- @Override
public SupportedOperation getSupportedOperations() {
return SupportedOperation.READ;
}
@Override
- public void configure(Map<String, String> configuration) throws Exception {
+ public void configure(Map<String, String> configuration, ARecordType outputType) throws Exception {
+ this.outputType = outputType;
this.configuration = configuration;
+ TwitterUtil.initializeConfigurationWithAuthInfo(configuration);
+
+ if (configuration.get(SearchAPIConstants.QUERY) == null) {
+ throw new AsterixException("parameter " + SearchAPIConstants.QUERY
+ + " not specified as part of adaptor configuration");
+ }
+
+ String interval = configuration.get(SearchAPIConstants.INTERVAL);
+ if (interval != null) {
+ try {
+ Integer.parseInt(interval);
+ } catch (NumberFormatException nfe) {
+ throw new IllegalArgumentException("parameter " + SearchAPIConstants.INTERVAL
+ + " is defined incorrectly, expecting a number");
+ }
+ } else {
+ configuration.put(SearchAPIConstants.INTERVAL, DEFAULT_INTERVAL);
+ if (LOGGER.isLoggable(Level.WARNING)) {
+ LOGGER.warning(" Parameter " + SearchAPIConstants.INTERVAL + " not defined, using default ("
+ + DEFAULT_INTERVAL + ")");
+ }
+ }
+
}
@Override
public AlgebricksPartitionConstraint getPartitionConstraint() throws Exception {
- return new AlgebricksCountPartitionConstraint(1);
+ return new AlgebricksCountPartitionConstraint(INTAKE_CARDINALITY);
+ }
+
+ @Override
+ public boolean isRecordTrackingEnabled() {
+ return false;
+ }
+
+ @Override
+ public IIntakeProgressTracker createIntakeProgressTracker() {
+ return null;
}
@Override
public ARecordType getAdapterOutputType() {
- return recordType;
+ return outputType;
}
}
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/PushBasedTwitterAdapterFactory.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/PushBasedTwitterAdapterFactory.java
new file mode 100644
index 0000000..c947a6a
--- /dev/null
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/PushBasedTwitterAdapterFactory.java
@@ -0,0 +1,69 @@
+package edu.uci.ics.asterix.external.adapter.factory;
+
+import java.util.Map;
+
+import edu.uci.ics.asterix.common.feeds.api.IDatasourceAdapter;
+import edu.uci.ics.asterix.common.feeds.api.IIntakeProgressTracker;
+import edu.uci.ics.asterix.external.dataset.adapter.PushBasedTwitterAdapter;
+import edu.uci.ics.asterix.external.util.TwitterUtil;
+import edu.uci.ics.asterix.metadata.feeds.IFeedAdapterFactory;
+import edu.uci.ics.asterix.om.types.ARecordType;
+import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksCountPartitionConstraint;
+import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+
+public class PushBasedTwitterAdapterFactory implements IFeedAdapterFactory {
+
+ private static final long serialVersionUID = 1L;
+
+ private static final String NAME = "push_twitter";
+
+ private ARecordType outputType;
+
+ private Map<String, String> configuration;
+
+ @Override
+ public SupportedOperation getSupportedOperations() {
+ return SupportedOperation.READ;
+ }
+
+ @Override
+ public String getName() {
+ return NAME;
+ }
+
+ @Override
+ public AlgebricksPartitionConstraint getPartitionConstraint() throws Exception {
+ return new AlgebricksCountPartitionConstraint(1);
+ }
+
+ @Override
+ public IDatasourceAdapter createAdapter(IHyracksTaskContext ctx, int partition) throws Exception {
+ PushBasedTwitterAdapter twitterAdapter = new PushBasedTwitterAdapter(configuration, outputType, ctx);
+ return twitterAdapter;
+ }
+
+ @Override
+ public void configure(Map<String, String> configuration, ARecordType outputType) throws Exception {
+ this.outputType = outputType;
+ this.configuration = configuration;
+ TwitterUtil.initializeConfigurationWithAuthInfo(configuration);
+ }
+
+ @Override
+ public ARecordType getAdapterOutputType() {
+ return outputType;
+ }
+
+
+ @Override
+ public boolean isRecordTrackingEnabled() {
+ return false;
+ }
+
+ @Override
+ public IIntakeProgressTracker createIntakeProgressTracker() {
+ return null;
+ }
+
+}
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/RSSFeedAdapterFactory.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/RSSFeedAdapterFactory.java
index 41f1d56..01ae2f2 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/RSSFeedAdapterFactory.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/RSSFeedAdapterFactory.java
@@ -18,12 +18,12 @@
import java.util.List;
import java.util.Map;
+import edu.uci.ics.asterix.common.feeds.FeedPolicyAccessor;
+import edu.uci.ics.asterix.common.feeds.api.IDatasourceAdapter;
+import edu.uci.ics.asterix.common.feeds.api.IIntakeProgressTracker;
import edu.uci.ics.asterix.external.dataset.adapter.RSSFeedAdapter;
-import edu.uci.ics.asterix.metadata.feeds.IDatasourceAdapter;
-import edu.uci.ics.asterix.metadata.feeds.ITypedAdapterFactory;
+import edu.uci.ics.asterix.metadata.feeds.IFeedAdapterFactory;
import edu.uci.ics.asterix.om.types.ARecordType;
-import edu.uci.ics.asterix.om.types.BuiltinType;
-import edu.uci.ics.asterix.om.types.IAType;
import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksCountPartitionConstraint;
import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
@@ -32,7 +32,7 @@
* Factory class for creating an instance of @see {RSSFeedAdapter}.
* RSSFeedAdapter provides the functionality of fetching an RSS based feed.
*/
-public class RSSFeedAdapterFactory implements ITypedAdapterFactory {
+public class RSSFeedAdapterFactory implements IFeedAdapterFactory {
private static final long serialVersionUID = 1L;
public static final String RSS_FEED_ADAPTER_NAME = "rss_feed";
@@ -40,12 +40,13 @@
public static final String KEY_INTERVAL = "interval";
private Map<String, String> configuration;
- private ARecordType recordType;
+ private ARecordType outputType;
private List<String> feedURLs = new ArrayList<String>();
+ private FeedPolicyAccessor ingestionPolicy;
@Override
public IDatasourceAdapter createAdapter(IHyracksTaskContext ctx, int partition) throws Exception {
- RSSFeedAdapter rssFeedAdapter = new RSSFeedAdapter(configuration, recordType, ctx);
+ RSSFeedAdapter rssFeedAdapter = new RSSFeedAdapter(configuration, outputType, ctx);
return rssFeedAdapter;
}
@@ -55,28 +56,20 @@
}
@Override
- public AdapterType getAdapterType() {
- return AdapterType.TYPED;
- }
-
- @Override
public SupportedOperation getSupportedOperations() {
return SupportedOperation.READ;
}
@Override
- public void configure(Map<String, String> configuration) throws Exception {
+ public void configure(Map<String, String> configuration, ARecordType outputType) throws Exception {
this.configuration = configuration;
+ this.outputType = outputType;
String rssURLProperty = (String) configuration.get(KEY_RSS_URL);
if (rssURLProperty == null) {
throw new IllegalArgumentException("no rss url provided");
}
initializeFeedURLs(rssURLProperty);
configurePartitionConstraints();
- recordType = new ARecordType("FeedRecordType", new String[] { "id", "title", "description", "link" },
- new IAType[] { BuiltinType.ASTRING, BuiltinType.ASTRING, BuiltinType.ASTRING, BuiltinType.ASTRING },
- false);
-
}
@Override
@@ -98,7 +91,21 @@
@Override
public ARecordType getAdapterOutputType() {
- return recordType;
+ return outputType;
+ }
+
+ @Override
+ public boolean isRecordTrackingEnabled() {
+ return false;
+ }
+
+ @Override
+ public IIntakeProgressTracker createIntakeProgressTracker() {
+ return null;
+ }
+
+ public FeedPolicyAccessor getIngestionPolicy() {
+ return ingestionPolicy;
}
}
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/StreamBasedAdapterFactory.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/StreamBasedAdapterFactory.java
index 0f340bc..7d45448 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/StreamBasedAdapterFactory.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/StreamBasedAdapterFactory.java
@@ -14,30 +14,15 @@
*/
package edu.uci.ics.asterix.external.adapter.factory;
-import java.util.HashMap;
-import java.util.List;
import java.util.Map;
import java.util.logging.Logger;
-import edu.uci.ics.asterix.common.exceptions.AsterixException;
import edu.uci.ics.asterix.external.util.INodeResolver;
-import edu.uci.ics.asterix.metadata.entities.ExternalFile;
-import edu.uci.ics.asterix.metadata.feeds.ConditionalPushTupleParserFactory;
-import edu.uci.ics.asterix.metadata.feeds.IAdapterFactory;
+import edu.uci.ics.asterix.metadata.external.IAdapterFactory;
import edu.uci.ics.asterix.om.types.ARecordType;
-import edu.uci.ics.asterix.om.types.ATypeTag;
-import edu.uci.ics.asterix.om.types.AUnionType;
import edu.uci.ics.asterix.om.types.IAType;
-import edu.uci.ics.asterix.runtime.operators.file.AdmSchemafullRecordParserFactory;
-import edu.uci.ics.asterix.runtime.operators.file.NtDelimitedDataTupleParserFactory;
-import edu.uci.ics.hyracks.algebricks.common.exceptions.NotImplementedException;
-import edu.uci.ics.hyracks.dataflow.common.data.parsers.DoubleParserFactory;
-import edu.uci.ics.hyracks.dataflow.common.data.parsers.FloatParserFactory;
-import edu.uci.ics.hyracks.dataflow.common.data.parsers.IValueParserFactory;
-import edu.uci.ics.hyracks.dataflow.common.data.parsers.IntegerParserFactory;
-import edu.uci.ics.hyracks.dataflow.common.data.parsers.LongParserFactory;
-import edu.uci.ics.hyracks.dataflow.common.data.parsers.UTF8StringParserFactory;
-import edu.uci.ics.hyracks.dataflow.std.file.ITupleParser;
+import edu.uci.ics.asterix.runtime.operators.file.AsterixTupleParserFactory;
+import edu.uci.ics.asterix.runtime.operators.file.AsterixTupleParserFactory.InputDataFormat;
import edu.uci.ics.hyracks.dataflow.std.file.ITupleParserFactory;
public abstract class StreamBasedAdapterFactory implements IAdapterFactory {
@@ -45,146 +30,18 @@
private static final long serialVersionUID = 1L;
protected static final Logger LOGGER = Logger.getLogger(StreamBasedAdapterFactory.class.getName());
- protected Map<String, String> configuration;
protected static INodeResolver nodeResolver;
- public static final String KEY_FORMAT = "format";
- public static final String KEY_PARSER_FACTORY = "parser";
- public static final String KEY_DELIMITER = "delimiter";
- public static final String KEY_QUOTE = "quote";
- public static final String KEY_HEADER = "header";
- public static final String KEY_PATH = "path";
- public static final String KEY_SOURCE_DATATYPE = "output-type-name";
- // The length of a delimiter should be 1.
- public static final String DEFAULT_DELIMITER = ",";
- // A quote is used to enclose a string if it includes delimiter(s) in it.
- // The length of a quote should be 1.
- public static final String DEFAULT_QUOTE = "\"";
- public static final String FORMAT_DELIMITED_TEXT = "delimited-text";
- public static final String FORMAT_ADM = "adm";
- public static final String NODE_RESOLVER_FACTORY_PROPERTY = "node.Resolver";
- public static final String BATCH_SIZE = "batch-size";
- public static final String BATCH_INTERVAL = "batch-interval";
-
+ protected Map<String, String> configuration;
protected ITupleParserFactory parserFactory;
- protected ITupleParser parser;
+
- protected List<ExternalFile> files;
-
- protected static final HashMap<ATypeTag, IValueParserFactory> typeToValueParserFactMap = new HashMap<ATypeTag, IValueParserFactory>();
-
- static {
- typeToValueParserFactMap.put(ATypeTag.INT32, IntegerParserFactory.INSTANCE);
- typeToValueParserFactMap.put(ATypeTag.FLOAT, FloatParserFactory.INSTANCE);
- typeToValueParserFactMap.put(ATypeTag.DOUBLE, DoubleParserFactory.INSTANCE);
- typeToValueParserFactMap.put(ATypeTag.INT64, LongParserFactory.INSTANCE);
- typeToValueParserFactMap.put(ATypeTag.STRING, UTF8StringParserFactory.INSTANCE);
- }
-
- protected ITupleParserFactory getDelimitedDataTupleParserFactory(ARecordType recordType, boolean conditionalPush)
- throws AsterixException {
- int n = recordType.getFieldTypes().length;
- IValueParserFactory[] fieldParserFactories = new IValueParserFactory[n];
- for (int i = 0; i < n; i++) {
- ATypeTag tag = null;
- if (recordType.getFieldTypes()[i].getTypeTag() == ATypeTag.UNION) {
- List<IAType> unionTypes = ((AUnionType) recordType.getFieldTypes()[i]).getUnionList();
- if (unionTypes.size() != 2 && unionTypes.get(0).getTypeTag() != ATypeTag.NULL) {
- throw new NotImplementedException("Non-optional UNION type is not supported.");
- }
- tag = unionTypes.get(1).getTypeTag();
- } else {
- tag = recordType.getFieldTypes()[i].getTypeTag();
- }
- if (tag == null) {
- throw new NotImplementedException("Failed to get the type information for field " + i + ".");
- }
- IValueParserFactory vpf = typeToValueParserFactMap.get(tag);
- if (vpf == null) {
- throw new NotImplementedException("No value parser factory for delimited fields of type " + tag);
- }
- fieldParserFactories[i] = vpf;
- }
-
- char delimiter = getDelimiter(configuration);
- char quote = getQuote(configuration, delimiter);
- boolean hasHeader = getHasHeader(configuration);
-
- return conditionalPush ? new ConditionalPushTupleParserFactory(recordType, fieldParserFactories, delimiter,
- quote, hasHeader, configuration) : new NtDelimitedDataTupleParserFactory(recordType,
- fieldParserFactories, delimiter, quote, hasHeader);
- }
-
- protected ITupleParserFactory getADMDataTupleParserFactory(ARecordType recordType, boolean conditionalPush)
- throws AsterixException {
- try {
- return conditionalPush ? new ConditionalPushTupleParserFactory(recordType, configuration)
- : new AdmSchemafullRecordParserFactory(recordType);
- } catch (Exception e) {
- throw new AsterixException(e);
- }
-
- }
+ public abstract InputDataFormat getInputDataFormat();
protected void configureFormat(IAType sourceDatatype) throws Exception {
- String propValue = (String) configuration.get(BATCH_SIZE);
- int batchSize = propValue != null ? Integer.parseInt(propValue) : -1;
- propValue = configuration.get(BATCH_INTERVAL);
- long batchInterval = propValue != null ? Long.parseLong(propValue) : -1;
- boolean conditionalPush = batchSize > 0 || batchInterval > 0;
-
- String parserFactoryClassname = configuration.get(KEY_PARSER_FACTORY);
- if (parserFactoryClassname == null) {
- String specifiedFormat = configuration.get(KEY_FORMAT);
- if (specifiedFormat == null) {
- throw new IllegalArgumentException(" Unspecified data format");
- } else if (FORMAT_DELIMITED_TEXT.equalsIgnoreCase(specifiedFormat)) {
- parserFactory = getDelimitedDataTupleParserFactory((ARecordType) sourceDatatype, conditionalPush);
- } else if (FORMAT_ADM.equalsIgnoreCase((String) configuration.get(KEY_FORMAT))) {
- parserFactory = getADMDataTupleParserFactory((ARecordType) sourceDatatype, conditionalPush);
- } else {
- throw new IllegalArgumentException(" format " + configuration.get(KEY_FORMAT) + " not supported");
- }
- } else {
- parserFactory = (ITupleParserFactory) Class.forName(parserFactoryClassname).newInstance();
- }
+ parserFactory = new AsterixTupleParserFactory(configuration, (ARecordType) sourceDatatype, getInputDataFormat());
}
- // Get a delimiter from the given configuration
- public static char getDelimiter(Map<String, String> configuration) throws AsterixException {
- String delimiterValue = configuration.get(KEY_DELIMITER);
- if (delimiterValue == null) {
- delimiterValue = DEFAULT_DELIMITER;
- } else if (delimiterValue.length() != 1) {
- throw new AsterixException("'" + delimiterValue
- + "' is not a valid delimiter. The length of a delimiter should be 1.");
- }
- return delimiterValue.charAt(0);
- }
-
- // Get a quote from the given configuration when the delimiter is given
- // Need to pass delimiter to check whether they share the same character
- public static char getQuote(Map<String, String> configuration, char delimiter) throws AsterixException {
- String quoteValue = configuration.get(KEY_QUOTE);
- if (quoteValue == null) {
- quoteValue = DEFAULT_QUOTE;
- } else if (quoteValue.length() != 1) {
- throw new AsterixException("'" + quoteValue + "' is not a valid quote. The length of a quote should be 1.");
- }
-
- // Since delimiter (char type value) can't be null,
- // we only check whether delimiter and quote use the same character
- if (quoteValue.charAt(0) == delimiter) {
- throw new AsterixException("Quote '" + quoteValue + "' cannot be used with the delimiter '" + delimiter
- + "'. ");
- }
-
- return quoteValue.charAt(0);
- }
-
- // Get the header flag
- public static boolean getHasHeader(Map<String, String> configuration) {
- return Boolean.parseBoolean(configuration.get(KEY_HEADER));
- }
+
}
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/ClientBasedFeedAdapter.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/ClientBasedFeedAdapter.java
new file mode 100644
index 0000000..4712e7c
--- /dev/null
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/ClientBasedFeedAdapter.java
@@ -0,0 +1,150 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.external.dataset.adapter;
+
+import java.util.Map;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import edu.uci.ics.asterix.common.feeds.api.IFeedAdapter;
+import edu.uci.ics.asterix.common.parse.ITupleForwardPolicy;
+import edu.uci.ics.asterix.external.dataset.adapter.IFeedClient.InflowState;
+import edu.uci.ics.asterix.metadata.feeds.FeedPolicyEnforcer;
+import edu.uci.ics.asterix.om.types.ARecordType;
+import edu.uci.ics.hyracks.api.comm.IFrame;
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.comm.VSizeFrame;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+
+/**
+ * Acts as an abstract class for all pull-based external data adapters. Captures
+ * the common logic for obtaining bytes from an external source and packing them
+ * into frames as tuples.
+ */
+public abstract class ClientBasedFeedAdapter implements IFeedAdapter {
+
+ private static final long serialVersionUID = 1L;
+ private static final Logger LOGGER = Logger.getLogger(ClientBasedFeedAdapter.class.getName());
+ private static final int timeout = 5; // seconds
+
+ protected ArrayTupleBuilder tupleBuilder = new ArrayTupleBuilder(1);
+ protected IFeedClient pullBasedFeedClient;
+ protected ARecordType adapterOutputType;
+ protected boolean continueIngestion = true;
+ protected Map<String, String> configuration;
+
+ private FrameTupleAppender appender;
+ private IFrame frame;
+ private long tupleCount = 0;
+ private final IHyracksTaskContext ctx;
+ private int frameTupleCount = 0;
+
+ protected FeedPolicyEnforcer policyEnforcer;
+
+ public FeedPolicyEnforcer getPolicyEnforcer() {
+ return policyEnforcer;
+ }
+
+ public void setFeedPolicyEnforcer(FeedPolicyEnforcer policyEnforcer) {
+ this.policyEnforcer = policyEnforcer;
+ }
+
+ public abstract IFeedClient getFeedClient(int partition) throws Exception;
+
+ public abstract ITupleForwardPolicy getTupleParserPolicy();
+
+ public ClientBasedFeedAdapter(Map<String, String> configuration, IHyracksTaskContext ctx) {
+ this.ctx = ctx;
+ this.configuration = configuration;
+ }
+
+ public long getIngestedRecordsCount() {
+ return tupleCount;
+ }
+
+ @Override
+ public void start(int partition, IFrameWriter writer) throws Exception {
+ appender = new FrameTupleAppender();
+ frame = new VSizeFrame(ctx);
+ appender.reset(frame, true);
+ ITupleForwardPolicy policy = getTupleParserPolicy();
+ policy.configure(configuration);
+ pullBasedFeedClient = getFeedClient(partition);
+ InflowState inflowState = null;
+ policy.initialize(ctx, writer);
+ while (continueIngestion) {
+ tupleBuilder.reset();
+ try {
+ inflowState = pullBasedFeedClient.nextTuple(tupleBuilder.getDataOutput(), timeout);
+ switch (inflowState) {
+ case DATA_AVAILABLE:
+ tupleBuilder.addFieldEndOffset();
+ policy.addTuple(tupleBuilder);
+ frameTupleCount++;
+ break;
+ case NO_MORE_DATA:
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Reached end of feed");
+ }
+ policy.close();
+ tupleCount += frameTupleCount;
+ frameTupleCount = 0;
+ continueIngestion = false;
+ break;
+ case DATA_NOT_AVAILABLE:
+ if (LOGGER.isLoggable(Level.WARNING)) {
+ LOGGER.warning("Timed out on obtaining data from pull based adapter. Trying again!");
+ }
+ break;
+ }
+
+ } catch (Exception failureException) {
+ try {
+ failureException.printStackTrace();
+ boolean continueIngestion = policyEnforcer.continueIngestionPostSoftwareFailure(failureException);
+ if (continueIngestion) {
+ tupleBuilder.reset();
+ continue;
+ } else {
+ throw failureException;
+ }
+ } catch (Exception recoveryException) {
+ throw new Exception(recoveryException);
+ }
+ }
+ }
+ }
+
+ /**
+ * Discontinue the ingestion of data and end the feed.
+ *
+ * @throws Exception
+ */
+ public void stop() throws Exception {
+ continueIngestion = false;
+ }
+
+ public Map<String, String> getConfiguration() {
+ return configuration;
+ }
+
+ @Override
+ public boolean handleException(Exception e) {
+ return false;
+ }
+
+}
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedFeedClient.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/FeedClient.java
similarity index 82%
rename from asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedFeedClient.java
rename to asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/FeedClient.java
index 37d93ad..da2cde0 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedFeedClient.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/FeedClient.java
@@ -20,6 +20,7 @@
import java.util.logging.Logger;
import edu.uci.ics.asterix.builders.IARecordBuilder;
+import edu.uci.ics.asterix.builders.OrderedListBuilder;
import edu.uci.ics.asterix.builders.RecordBuilder;
import edu.uci.ics.asterix.builders.UnorderedListBuilder;
import edu.uci.ics.asterix.common.exceptions.AsterixException;
@@ -29,6 +30,7 @@
import edu.uci.ics.asterix.om.base.AInt32;
import edu.uci.ics.asterix.om.base.AMutableDateTime;
import edu.uci.ics.asterix.om.base.AMutableInt32;
+import edu.uci.ics.asterix.om.base.AMutableOrderedList;
import edu.uci.ics.asterix.om.base.AMutablePoint;
import edu.uci.ics.asterix.om.base.AMutableRecord;
import edu.uci.ics.asterix.om.base.AMutableString;
@@ -36,18 +38,16 @@
import edu.uci.ics.asterix.om.base.AString;
import edu.uci.ics.asterix.om.base.IACursor;
import edu.uci.ics.asterix.om.base.IAObject;
+import edu.uci.ics.asterix.om.types.AOrderedListType;
import edu.uci.ics.asterix.om.types.ARecordType;
-import edu.uci.ics.asterix.om.types.ATypeTag;
import edu.uci.ics.asterix.om.types.AUnorderedListType;
import edu.uci.ics.asterix.om.types.BuiltinType;
-import edu.uci.ics.asterix.om.types.IAType;
import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.data.std.util.ArrayBackedValueStorage;
-public abstract class PullBasedFeedClient implements IPullBasedFeedClient {
+public abstract class FeedClient implements IFeedClient {
- protected static final Logger LOGGER = Logger.getLogger(PullBasedFeedClient.class.getName());
+ protected static final Logger LOGGER = Logger.getLogger(FeedClient.class.getName());
protected ARecordSerializerDeserializer recordSerDe;
protected AMutableRecord mutableRecord;
@@ -70,7 +70,7 @@
protected ISerializerDeserializer<AInt32> int32Serde = AqlSerializerDeserializerProvider.INSTANCE
.getSerializerDeserializer(BuiltinType.AINT32);
- public abstract InflowState setNextRecord() throws Exception;
+ public abstract InflowState retrieveNextRecord() throws Exception;
@Override
public InflowState nextTuple(DataOutput dataOutput, int timeout) throws AsterixException {
@@ -79,12 +79,9 @@
int waitCount = 0;
boolean continueWait = true;
while ((state == null || state.equals(InflowState.DATA_NOT_AVAILABLE)) && continueWait) {
- state = setNextRecord();
+ state = retrieveNextRecord();
switch (state) {
case DATA_AVAILABLE:
- IAType t = mutableRecord.getType();
- ATypeTag tag = t.getTypeTag();
- dataOutput.writeByte(tag.serialize());
recordBuilder.reset(mutableRecord.getType());
recordBuilder.init();
writeRecord(mutableRecord, dataOutput, recordBuilder);
@@ -94,7 +91,7 @@
continueWait = false;
} else {
if (LOGGER.isLoggable(Level.WARNING)) {
- LOGGER.warning("Waiting to obtaing data from pull based adapter");
+ LOGGER.warning("Waiting to obtain data from pull based adaptor");
}
Thread.sleep(1000);
waitCount++;
@@ -121,30 +118,35 @@
writeObject(obj, fieldValue.getDataOutput());
recordBuilder.addField(pos, fieldValue);
}
- recordBuilder.write(dataOutput, false);
+ recordBuilder.write(dataOutput, true);
}
private void writeObject(IAObject obj, DataOutput dataOutput) throws IOException, AsterixException {
switch (obj.getType().getTypeTag()) {
- case RECORD:
- ATypeTag tag = obj.getType().getTypeTag();
- try {
- dataOutput.writeByte(tag.serialize());
- } catch (IOException e) {
- throw new HyracksDataException(e);
- }
+ case RECORD: {
IARecordBuilder recordBuilder = new RecordBuilder();
recordBuilder.reset((ARecordType) obj.getType());
recordBuilder.init();
writeRecord((AMutableRecord) obj, dataOutput, recordBuilder);
break;
- case UNORDEREDLIST:
- tag = obj.getType().getTypeTag();
- try {
- dataOutput.writeByte(tag.serialize());
- } catch (IOException e) {
- throw new HyracksDataException(e);
+ }
+
+ case ORDEREDLIST: {
+ OrderedListBuilder listBuilder = new OrderedListBuilder();
+ listBuilder.reset((AOrderedListType) ((AMutableOrderedList) obj).getType());
+ IACursor cursor = ((AMutableOrderedList) obj).getCursor();
+ ArrayBackedValueStorage listItemValue = new ArrayBackedValueStorage();
+ while (cursor.next()) {
+ listItemValue.reset();
+ IAObject item = cursor.get();
+ writeObject(item, listItemValue.getDataOutput());
+ listBuilder.addItem(listItemValue);
}
+ listBuilder.write(dataOutput, true);
+ break;
+ }
+
+ case UNORDEREDLIST: {
UnorderedListBuilder listBuilder = new UnorderedListBuilder();
listBuilder.reset((AUnorderedListType) ((AMutableUnorderedList) obj).getType());
IACursor cursor = ((AMutableUnorderedList) obj).getCursor();
@@ -155,8 +157,10 @@
writeObject(item, listItemValue.getDataOutput());
listBuilder.addItem(listItemValue);
}
- listBuilder.write(dataOutput, false);
+ listBuilder.write(dataOutput, true);
break;
+ }
+
default:
AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(obj.getType()).serialize(obj,
dataOutput);
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/FileSystemBasedAdapter.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/FileSystemBasedAdapter.java
index c2a8a95..f9793f6 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/FileSystemBasedAdapter.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/FileSystemBasedAdapter.java
@@ -17,9 +17,8 @@
import java.io.IOException;
import java.io.InputStream;
-import edu.uci.ics.asterix.metadata.feeds.IDatasourceAdapter;
+import edu.uci.ics.asterix.common.feeds.api.IDatasourceAdapter;
import edu.uci.ics.asterix.om.types.IAType;
-import edu.uci.ics.asterix.runtime.operators.file.AbstractTupleParser;
import edu.uci.ics.hyracks.api.comm.IFrameWriter;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
@@ -34,23 +33,22 @@
public abstract InputStream getInputStream(int partition) throws IOException;
- protected final ITupleParser tupleParser;
+ protected final ITupleParserFactory parserFactory;
+ protected ITupleParser tupleParser;
protected final IAType sourceDatatype;
protected IHyracksTaskContext ctx;
public FileSystemBasedAdapter(ITupleParserFactory parserFactory, IAType sourceDatatype, IHyracksTaskContext ctx)
throws HyracksDataException {
- this.tupleParser = parserFactory.createTupleParser(ctx);
+ this.parserFactory = parserFactory;
this.sourceDatatype = sourceDatatype;
this.ctx = ctx;
}
@Override
public void start(int partition, IFrameWriter writer) throws Exception {
+ tupleParser = parserFactory.createTupleParser(ctx);
InputStream in = getInputStream(partition);
- if (tupleParser instanceof AbstractTupleParser) {
- ((AbstractTupleParser) tupleParser).setFilename(getFilename(partition));
- }
tupleParser.parse(in, writer);
}
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/HDFSAdapter.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/HDFSAdapter.java
index 9f4b97c..8036a7b 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/HDFSAdapter.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/HDFSAdapter.java
@@ -23,6 +23,7 @@
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.SequenceFileInputFormat;
import org.apache.hadoop.mapred.TextInputFormat;
+
import edu.uci.ics.asterix.external.adapter.factory.HDFSAdapterFactory;
import edu.uci.ics.asterix.external.indexing.input.GenericFileAwareRecordReader;
import edu.uci.ics.asterix.external.indexing.input.GenericRecordReader;
@@ -30,6 +31,7 @@
import edu.uci.ics.asterix.external.indexing.input.TextualFullScanDataReader;
import edu.uci.ics.asterix.metadata.entities.ExternalFile;
import edu.uci.ics.asterix.om.types.IAType;
+import edu.uci.ics.asterix.runtime.operators.file.AsterixTupleParserFactory;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.dataflow.std.file.ITupleParserFactory;
@@ -71,9 +73,9 @@
@Override
public InputStream getInputStream(int partition) throws IOException {
if ((conf.getInputFormat() instanceof TextInputFormat || conf.getInputFormat() instanceof SequenceFileInputFormat)
- && (HDFSAdapterFactory.FORMAT_ADM.equalsIgnoreCase((String) configuration
- .get(HDFSAdapterFactory.KEY_FORMAT)) || HDFSAdapterFactory.FORMAT_DELIMITED_TEXT
- .equalsIgnoreCase((String) configuration.get(HDFSAdapterFactory.KEY_FORMAT)))) {
+ && (AsterixTupleParserFactory.FORMAT_ADM.equalsIgnoreCase((String) configuration
+ .get(AsterixTupleParserFactory.KEY_FORMAT)) || AsterixTupleParserFactory.FORMAT_DELIMITED_TEXT
+ .equalsIgnoreCase((String) configuration.get(AsterixTupleParserFactory.KEY_FORMAT)))) {
if (files != null) {
return new TextualDataReader(inputSplits, readSchedule, nodeName, conf, executed, files);
} else {
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/HDFSIndexingAdapter.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/HDFSIndexingAdapter.java
index 9af1dd7..f730f28 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/HDFSIndexingAdapter.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/HDFSIndexingAdapter.java
@@ -27,6 +27,7 @@
import edu.uci.ics.asterix.external.indexing.input.TextualDataReader;
import edu.uci.ics.asterix.metadata.entities.ExternalFile;
import edu.uci.ics.asterix.om.types.IAType;
+import edu.uci.ics.asterix.runtime.operators.file.AsterixTupleParserFactory;
import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.dataflow.std.file.ITupleParserFactory;
@@ -64,8 +65,8 @@
public InputStream getInputStream(int partition) throws IOException {
if (inputFormat.equals(HDFSAdapterFactory.INPUT_FORMAT_RC)) {
return new RCFileDataReader(inputSplits, readSchedule, nodeName, conf, executed, files);
- } else if (format.equals(HDFSAdapterFactory.FORMAT_ADM)
- || format.equals(HDFSAdapterFactory.FORMAT_DELIMITED_TEXT)) {
+ } else if (format.equals(AsterixTupleParserFactory.FORMAT_ADM)
+ || format.equals(AsterixTupleParserFactory.FORMAT_DELIMITED_TEXT)) {
return new TextualDataReader(inputSplits, readSchedule, nodeName, conf, executed, files);
} else {
return new GenericFileAwareRecordReader(inputSplits, readSchedule, nodeName, conf, executed, files);
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/IFeedClient.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/IFeedClient.java
new file mode 100644
index 0000000..02e60b4
--- /dev/null
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/IFeedClient.java
@@ -0,0 +1,42 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.external.dataset.adapter;
+
+import java.io.DataOutput;
+
+import edu.uci.ics.asterix.common.exceptions.AsterixException;
+
+public interface IFeedClient {
+
+ public enum InflowState {
+ NO_MORE_DATA,
+ DATA_AVAILABLE,
+ DATA_NOT_AVAILABLE
+ }
+
+ /**
+ * Writes the next fetched tuple into the provided instance of DatatOutput. Invocation of this method blocks until
+ * a new tuple has been written or the specified time has expired.
+ *
+ * @param dataOutput
+ * The receiving channel for the feed client to write ADM records to.
+ * @param timeout
+ * Threshold time (expressed in seconds) for the next tuple to be obtained from the external source.
+ * @return
+ * @throws AsterixException
+ */
+ public InflowState nextTuple(DataOutput dataOutput, int timeout) throws AsterixException;
+
+}
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/NCFileSystemAdapter.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/NCFileSystemAdapter.java
index 78554f2..b31e824 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/NCFileSystemAdapter.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/NCFileSystemAdapter.java
@@ -21,7 +21,6 @@
import java.io.InputStream;
import edu.uci.ics.asterix.om.types.IAType;
-import edu.uci.ics.asterix.runtime.operators.file.AbstractTupleParser;
import edu.uci.ics.hyracks.api.comm.IFrameWriter;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
@@ -58,15 +57,7 @@
}
}
- @Override
- public void start(int partition, IFrameWriter writer) throws Exception {
- InputStream in = getInputStream(partition);
- if (tupleParser instanceof AbstractTupleParser) {
- ((AbstractTupleParser) tupleParser).setFilename(getFilename(partition));
- }
- tupleParser.parse(in, writer);
- }
-
+
@Override
public String getFilename(int partition) {
final FileSplit fileSplit = fileSplits[partition];
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedAzureTwitterAdapter.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedAzureTwitterAdapter.java
index c739ca3..63181dc 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedAzureTwitterAdapter.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedAzureTwitterAdapter.java
@@ -9,7 +9,7 @@
import com.microsoft.windowsazure.services.core.storage.CloudStorageAccount;
import edu.uci.ics.asterix.common.exceptions.AsterixException;
-import edu.uci.ics.asterix.metadata.feeds.IDatasourceAdapter;
+import edu.uci.ics.asterix.common.feeds.api.IDatasourceAdapter;
import edu.uci.ics.asterix.om.types.ARecordType;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
@@ -77,4 +77,9 @@
public DataExchangeMode getDataExchangeMode() {
return DataExchangeMode.PULL;
}
+
+ @Override
+ public boolean handleException(Exception e) {
+ return false;
+ }
}
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedTwitterAdapter.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedTwitterAdapter.java
index 838cfeb..27d7049 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedTwitterAdapter.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedTwitterAdapter.java
@@ -17,30 +17,33 @@
import java.util.Map;
import edu.uci.ics.asterix.common.exceptions.AsterixException;
-import edu.uci.ics.asterix.metadata.feeds.IFeedAdapter;
+import edu.uci.ics.asterix.common.feeds.api.IFeedAdapter;
+import edu.uci.ics.asterix.common.parse.ITupleForwardPolicy;
import edu.uci.ics.asterix.om.types.ARecordType;
+import edu.uci.ics.asterix.runtime.operators.file.AsterixTupleParserFactory;
+import edu.uci.ics.asterix.runtime.operators.file.CounterTimerTupleForwardPolicy;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
/**
* An adapter that provides the functionality of receiving tweets from the
* Twitter service in the form of ADM formatted records.
*/
-public class PullBasedTwitterAdapter extends PullBasedAdapter implements IFeedAdapter {
+public class PullBasedTwitterAdapter extends ClientBasedFeedAdapter implements IFeedAdapter {
private static final long serialVersionUID = 1L;
- public static final String QUERY = "query";
- public static final String INTERVAL = "interval";
+ private static final int DEFAULT_BATCH_SIZE = 5;
private ARecordType recordType;
private PullBasedTwitterFeedClient tweetClient;
@Override
- public IPullBasedFeedClient getFeedClient(int partition) {
+ public IFeedClient getFeedClient(int partition) {
return tweetClient;
}
- public PullBasedTwitterAdapter(Map<String, String> configuration, ARecordType recordType, IHyracksTaskContext ctx) throws AsterixException {
+ public PullBasedTwitterAdapter(Map<String, String> configuration, ARecordType recordType, IHyracksTaskContext ctx)
+ throws AsterixException {
super(configuration, ctx);
tweetClient = new PullBasedTwitterFeedClient(ctx, recordType, this);
}
@@ -54,4 +57,20 @@
return DataExchangeMode.PULL;
}
+ @Override
+ public boolean handleException(Exception e) {
+ return true;
+ }
+
+ @Override
+ public ITupleForwardPolicy getTupleParserPolicy() {
+ configuration.put(ITupleForwardPolicy.PARSER_POLICY,
+ ITupleForwardPolicy.TupleForwardPolicyType.COUNTER_TIMER_EXPIRED.name());
+ String propValue = configuration.get(CounterTimerTupleForwardPolicy.BATCH_SIZE);
+ if (propValue == null) {
+ configuration.put(CounterTimerTupleForwardPolicy.BATCH_SIZE, "" + DEFAULT_BATCH_SIZE);
+ }
+ return AsterixTupleParserFactory.getTupleParserPolicy(configuration);
+ }
+
}
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedTwitterFeedClient.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedTwitterFeedClient.java
index 2c8d659..2b68c88 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedTwitterFeedClient.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedTwitterFeedClient.java
@@ -16,93 +16,86 @@
import java.util.List;
import java.util.Map;
-import java.util.UUID;
import twitter4j.Query;
import twitter4j.QueryResult;
-import twitter4j.Tweet;
+import twitter4j.Status;
import twitter4j.Twitter;
import twitter4j.TwitterException;
-import twitter4j.TwitterFactory;
import edu.uci.ics.asterix.dataflow.data.nontagged.serde.ARecordSerializerDeserializer;
-import edu.uci.ics.asterix.om.base.AMutableRecord;
-import edu.uci.ics.asterix.om.base.AMutableString;
-import edu.uci.ics.asterix.om.base.IAObject;
+import edu.uci.ics.asterix.external.util.TweetProcessor;
+import edu.uci.ics.asterix.external.util.TwitterUtil;
+import edu.uci.ics.asterix.external.util.TwitterUtil.SearchAPIConstants;
import edu.uci.ics.asterix.om.types.ARecordType;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
/**
- * An implementation of @see {PullBasedFeedClient} for the Twitter service.
- * The feed client fetches data from Twitter service by sending request at
- * regular (configurable) interval.
+ * An implementation of @see {PullBasedFeedClient} for the Twitter service. The
+ * feed client fetches data from Twitter service by sending request at regular
+ * (configurable) interval.
*/
-public class PullBasedTwitterFeedClient extends PullBasedFeedClient {
+public class PullBasedTwitterFeedClient extends FeedClient {
private String keywords;
private Query query;
private Twitter twitter;
- private int requestInterval = 10; // seconds
+ private int requestInterval = 5; // seconds
private QueryResult result;
- private IAObject[] mutableFields;
- private String[] tupleFieldValues;
private ARecordType recordType;
private int nextTweetIndex = 0;
+ private long lastTweetIdReceived = 0;
+ private TweetProcessor tweetProcessor;
public PullBasedTwitterFeedClient(IHyracksTaskContext ctx, ARecordType recordType, PullBasedTwitterAdapter adapter) {
- twitter = new TwitterFactory().getInstance();
- mutableFields = new IAObject[] { new AMutableString(null), new AMutableString(null), new AMutableString(null),
- new AMutableString(null), new AMutableString(null) };
+ this.twitter = TwitterUtil.getTwitterService(adapter.getConfiguration());
this.recordType = recordType;
- recordSerDe = new ARecordSerializerDeserializer(recordType);
- mutableRecord = new AMutableRecord(recordType, mutableFields);
- tupleFieldValues = new String[recordType.getFieldNames().length];
- initialize(adapter.getConfiguration());
+ this.tweetProcessor = new TweetProcessor(recordType);
+ this.recordSerDe = new ARecordSerializerDeserializer(recordType);
+ this.mutableRecord = tweetProcessor.getMutableRecord();
+ this.initialize(adapter.getConfiguration());
}
public ARecordType getRecordType() {
return recordType;
}
- public AMutableRecord getMutableRecord() {
- return mutableRecord;
- }
-
@Override
- public InflowState setNextRecord() throws Exception {
- Tweet tweet;
+ public InflowState retrieveNextRecord() throws Exception {
+ Status tweet;
tweet = getNextTweet();
if (tweet == null) {
return InflowState.DATA_NOT_AVAILABLE;
}
- int numFields = recordType.getFieldNames().length;
- tupleFieldValues[0] = UUID.randomUUID().toString();
- tupleFieldValues[1] = tweet.getFromUser();
- tupleFieldValues[2] = tweet.getLocation() == null ? "" : tweet.getLocation();
- tupleFieldValues[3] = tweet.getText();
- tupleFieldValues[4] = tweet.getCreatedAt().toString();
- for (int i = 0; i < numFields; i++) {
- ((AMutableString) mutableFields[i]).setValue(tupleFieldValues[i]);
- mutableRecord.setValueAtPos(i, mutableFields[i]);
- }
+
+ tweetProcessor.processNextTweet(tweet);
return InflowState.DATA_AVAILABLE;
}
private void initialize(Map<String, String> params) {
- this.keywords = (String) params.get(PullBasedTwitterAdapter.QUERY);
- this.requestInterval = Integer.parseInt((String) params.get(PullBasedTwitterAdapter.INTERVAL));
+ this.keywords = (String) params.get(SearchAPIConstants.QUERY);
+ this.requestInterval = Integer.parseInt((String) params.get(SearchAPIConstants.INTERVAL));
this.query = new Query(keywords);
- query.setRpp(100);
+ this.query.setCount(100);
}
- private Tweet getNextTweet() throws TwitterException, InterruptedException {
+ private Status getNextTweet() throws TwitterException, InterruptedException {
if (result == null || nextTweetIndex >= result.getTweets().size()) {
Thread.sleep(1000 * requestInterval);
+ query.setSinceId(lastTweetIdReceived);
result = twitter.search(query);
nextTweetIndex = 0;
}
- List<Tweet> tw = result.getTweets();
- return tw.get(nextTweetIndex++);
+ if (result != null && !result.getTweets().isEmpty()) {
+ List<Status> tw = result.getTweets();
+ Status tweet = tw.get(nextTweetIndex++);
+ if (lastTweetIdReceived < tweet.getId()) {
+ lastTweetIdReceived = tweet.getId();
+ }
+ return tweet;
+ } else {
+ return null;
+ }
}
}
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PushBasedTwitterAdapter.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PushBasedTwitterAdapter.java
new file mode 100644
index 0000000..8bc9a37
--- /dev/null
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PushBasedTwitterAdapter.java
@@ -0,0 +1,52 @@
+package edu.uci.ics.asterix.external.dataset.adapter;
+
+import java.util.Map;
+
+import edu.uci.ics.asterix.common.exceptions.AsterixException;
+import edu.uci.ics.asterix.common.parse.ITupleForwardPolicy;
+import edu.uci.ics.asterix.om.types.ARecordType;
+import edu.uci.ics.asterix.runtime.operators.file.AsterixTupleParserFactory;
+import edu.uci.ics.asterix.runtime.operators.file.CounterTimerTupleForwardPolicy;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+
+public class PushBasedTwitterAdapter extends ClientBasedFeedAdapter {
+
+ private static final long serialVersionUID = 1L;
+
+ private static final int DEFAULT_BATCH_SIZE = 50;
+
+ private PushBasedTwitterFeedClient tweetClient;
+
+ public PushBasedTwitterAdapter(Map<String, String> configuration, ARecordType recordType, IHyracksTaskContext ctx) throws AsterixException {
+ super(configuration, ctx);
+ this.configuration = configuration;
+ this.tweetClient = new PushBasedTwitterFeedClient(ctx, recordType, this);
+ }
+
+ @Override
+ public DataExchangeMode getDataExchangeMode() {
+ return DataExchangeMode.PUSH;
+ }
+
+ @Override
+ public boolean handleException(Exception e) {
+ return true;
+ }
+
+ @Override
+ public IFeedClient getFeedClient(int partition) throws Exception {
+ return tweetClient;
+ }
+
+ @Override
+ public ITupleForwardPolicy getTupleParserPolicy() {
+ configuration.put(ITupleForwardPolicy.PARSER_POLICY,
+ ITupleForwardPolicy.TupleForwardPolicyType.COUNTER_TIMER_EXPIRED.name());
+ String propValue = configuration.get(CounterTimerTupleForwardPolicy.BATCH_SIZE);
+ if (propValue == null) {
+ configuration.put(CounterTimerTupleForwardPolicy.BATCH_SIZE, "" + DEFAULT_BATCH_SIZE);
+ }
+ return AsterixTupleParserFactory.getTupleParserPolicy(configuration);
+ }
+
+}
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PushBasedTwitterFeedClient.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PushBasedTwitterFeedClient.java
new file mode 100644
index 0000000..908fd34
--- /dev/null
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PushBasedTwitterFeedClient.java
@@ -0,0 +1,118 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.external.dataset.adapter;
+
+import java.util.Map;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import twitter4j.FilterQuery;
+import twitter4j.Query;
+import twitter4j.StallWarning;
+import twitter4j.Status;
+import twitter4j.StatusDeletionNotice;
+import twitter4j.StatusListener;
+import twitter4j.TwitterStream;
+import edu.uci.ics.asterix.common.exceptions.AsterixException;
+import edu.uci.ics.asterix.dataflow.data.nontagged.serde.ARecordSerializerDeserializer;
+import edu.uci.ics.asterix.external.util.TweetProcessor;
+import edu.uci.ics.asterix.external.util.TwitterUtil;
+import edu.uci.ics.asterix.external.util.TwitterUtil.SearchAPIConstants;
+import edu.uci.ics.asterix.om.types.ARecordType;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+
+/**
+ * An implementation of @see {PullBasedFeedClient} for the Twitter service. The
+ * feed client fetches data from Twitter service by sending request at regular
+ * (configurable) interval.
+ */
+public class PushBasedTwitterFeedClient extends FeedClient {
+
+ private String keywords;
+ private Query query;
+
+ private ARecordType recordType;
+ private TweetProcessor tweetProcessor;
+ private LinkedBlockingQueue<Status> inputQ;
+
+ public PushBasedTwitterFeedClient(IHyracksTaskContext ctx, ARecordType recordType, PushBasedTwitterAdapter adapter) throws AsterixException {
+ this.recordType = recordType;
+ this.tweetProcessor = new TweetProcessor(recordType);
+ this.recordSerDe = new ARecordSerializerDeserializer(recordType);
+ this.mutableRecord = tweetProcessor.getMutableRecord();
+ this.initialize(adapter.getConfiguration());
+ this.inputQ = new LinkedBlockingQueue<Status>();
+ TwitterStream twitterStream = TwitterUtil.getTwitterStream(adapter.getConfiguration());
+ twitterStream.addListener(new TweetListener(inputQ));
+ FilterQuery query = TwitterUtil.getFilterQuery(adapter.getConfiguration());
+ if (query != null) {
+ twitterStream.filter(query);
+ } else {
+ twitterStream.sample();
+ }
+ }
+
+ public ARecordType getRecordType() {
+ return recordType;
+ }
+
+ private class TweetListener implements StatusListener {
+
+ private LinkedBlockingQueue<Status> inputQ;
+
+ public TweetListener(LinkedBlockingQueue<Status> inputQ) {
+ this.inputQ = inputQ;
+ }
+
+ @Override
+ public void onStatus(Status tweet) {
+ inputQ.add(tweet);
+ }
+
+ @Override
+ public void onException(Exception arg0) {
+
+ }
+
+ @Override
+ public void onDeletionNotice(StatusDeletionNotice arg0) {
+ }
+
+ @Override
+ public void onScrubGeo(long arg0, long arg1) {
+ }
+
+ @Override
+ public void onStallWarning(StallWarning arg0) {
+ }
+
+ @Override
+ public void onTrackLimitationNotice(int arg0) {
+ }
+ }
+
+ @Override
+ public InflowState retrieveNextRecord() throws Exception {
+ Status tweet = inputQ.take();
+ tweetProcessor.processNextTweet(tweet);
+ return InflowState.DATA_AVAILABLE;
+ }
+
+ private void initialize(Map<String, String> params) {
+ this.keywords = (String) params.get(SearchAPIConstants.QUERY);
+ this.query = new Query(keywords);
+ this.query.setCount(100);
+ }
+
+}
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/RSSFeedAdapter.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/RSSFeedAdapter.java
index 4eea034..bcf809d 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/RSSFeedAdapter.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/RSSFeedAdapter.java
@@ -19,14 +19,16 @@
import java.util.Map;
import edu.uci.ics.asterix.common.exceptions.AsterixException;
-import edu.uci.ics.asterix.metadata.feeds.IFeedAdapter;
+import edu.uci.ics.asterix.common.feeds.api.IFeedAdapter;
+import edu.uci.ics.asterix.common.parse.ITupleForwardPolicy;
import edu.uci.ics.asterix.om.types.ARecordType;
+import edu.uci.ics.asterix.runtime.operators.file.AsterixTupleParserFactory;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
/**
* RSSFeedAdapter provides the functionality of fetching an RSS based feed.
*/
-public class RSSFeedAdapter extends PullBasedAdapter implements IFeedAdapter {
+public class RSSFeedAdapter extends ClientBasedFeedAdapter implements IFeedAdapter {
private static final long serialVersionUID = 1L;
@@ -35,7 +37,7 @@
private List<String> feedURLs = new ArrayList<String>();
private String id_prefix = "";
- private IPullBasedFeedClient rssFeedClient;
+ private IFeedClient rssFeedClient;
private ARecordType recordType;
@@ -62,7 +64,7 @@
}
@Override
- public IPullBasedFeedClient getFeedClient(int partition) throws Exception {
+ public IFeedClient getFeedClient(int partition) throws Exception {
if (rssFeedClient == null) {
rssFeedClient = new RSSFeedClient(this, feedURLs.get(partition), id_prefix);
}
@@ -78,4 +80,14 @@
return DataExchangeMode.PULL;
}
+ @Override
+ public boolean handleException(Exception e) {
+ return false;
+ }
+
+ @Override
+ public ITupleForwardPolicy getTupleParserPolicy() {
+ return AsterixTupleParserFactory.getTupleParserPolicy(configuration);
+ }
+
}
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/RSSFeedClient.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/RSSFeedClient.java
index 41ed923..e7cbd16 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/RSSFeedClient.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/RSSFeedClient.java
@@ -39,7 +39,7 @@
* fetching from an RSS feed source at regular interval.
*/
@SuppressWarnings("rawtypes")
-public class RSSFeedClient extends PullBasedFeedClient {
+public class RSSFeedClient extends FeedClient {
private long id = 0;
private String idPrefix;
@@ -79,7 +79,7 @@
}
@Override
- public InflowState setNextRecord() throws Exception {
+ public InflowState retrieveNextRecord() throws Exception {
SyndEntryImpl feedEntry = getNextRSSFeed();
if (feedEntry == null) {
return InflowState.DATA_NOT_AVAILABLE;
@@ -133,9 +133,9 @@
class FetcherEventListenerImpl implements FetcherListener {
- private final IPullBasedFeedClient feedClient;
+ private final IFeedClient feedClient;
- public FetcherEventListenerImpl(IPullBasedFeedClient feedClient) {
+ public FetcherEventListenerImpl(IFeedClient feedClient) {
this.feedClient = feedClient;
}
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/StreamBasedAdapter.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/StreamBasedAdapter.java
index 80965b0..25a0221 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/StreamBasedAdapter.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/StreamBasedAdapter.java
@@ -5,7 +5,7 @@
import java.util.logging.Level;
import java.util.logging.Logger;
-import edu.uci.ics.asterix.metadata.feeds.IDatasourceAdapter;
+import edu.uci.ics.asterix.common.feeds.api.IDatasourceAdapter;
import edu.uci.ics.asterix.om.types.IAType;
import edu.uci.ics.hyracks.api.comm.IFrameWriter;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
@@ -25,7 +25,7 @@
protected final IAType sourceDatatype;
- public StreamBasedAdapter(ITupleParserFactory parserFactory, IAType sourceDatatype, IHyracksTaskContext ctx)
+ public StreamBasedAdapter(ITupleParserFactory parserFactory, IAType sourceDatatype, IHyracksTaskContext ctx, int partition)
throws HyracksDataException {
this.tupleParser = parserFactory.createTupleParser(ctx);
this.sourceDatatype = sourceDatatype;
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/indexing/dataflow/HDFSIndexingParserFactory.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/indexing/dataflow/HDFSIndexingParserFactory.java
index e2ef9fd..ca15b359 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/indexing/dataflow/HDFSIndexingParserFactory.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/indexing/dataflow/HDFSIndexingParserFactory.java
@@ -24,6 +24,7 @@
import edu.uci.ics.asterix.external.adapter.factory.StreamBasedAdapterFactory;
import edu.uci.ics.asterix.om.types.ARecordType;
import edu.uci.ics.asterix.runtime.operators.file.ADMDataParser;
+import edu.uci.ics.asterix.runtime.operators.file.AsterixTupleParserFactory;
import edu.uci.ics.asterix.runtime.operators.file.DelimitedDataParser;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
@@ -84,11 +85,11 @@
* 2. RC indexing tuple parser
* 3. textual data tuple parser
*/
- if (format.equalsIgnoreCase(StreamBasedAdapterFactory.FORMAT_ADM)) {
+ if (format.equalsIgnoreCase(AsterixTupleParserFactory.FORMAT_ADM)) {
// choice 3 with adm data parser
ADMDataParser dataParser = new ADMDataParser();
return new AdmOrDelimitedIndexingTupleParser(ctx, atype, dataParser);
- } else if (format.equalsIgnoreCase(StreamBasedAdapterFactory.FORMAT_DELIMITED_TEXT)) {
+ } else if (format.equalsIgnoreCase(AsterixTupleParserFactory.FORMAT_DELIMITED_TEXT)) {
// choice 3 with delimited data parser
DelimitedDataParser dataParser = HDFSIndexingAdapterFactory.getDelimitedDataParser(atype,
delimiter, quote);
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/indexing/dataflow/HDFSLookupAdapter.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/indexing/dataflow/HDFSLookupAdapter.java
index c6176fc..7e0d9f9 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/indexing/dataflow/HDFSLookupAdapter.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/indexing/dataflow/HDFSLookupAdapter.java
@@ -35,6 +35,7 @@
import edu.uci.ics.asterix.om.types.ARecordType;
import edu.uci.ics.asterix.om.types.IAType;
import edu.uci.ics.asterix.runtime.operators.file.ADMDataParser;
+import edu.uci.ics.asterix.runtime.operators.file.AsterixTupleParserFactory;
import edu.uci.ics.asterix.runtime.operators.file.DelimitedDataParser;
import edu.uci.ics.hyracks.api.comm.IFrameWriter;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
@@ -81,7 +82,7 @@
// Create the lookup reader and the controlled parser
if (configuration.get(HDFSAdapterFactory.KEY_INPUT_FORMAT).equals(HDFSAdapterFactory.INPUT_FORMAT_RC)) {
configureRCFile(jobConf, iNullWriterFactory);
- } else if (configuration.get(HDFSAdapterFactory.KEY_FORMAT).equals(HDFSAdapterFactory.FORMAT_ADM)) {
+ } else if (configuration.get(AsterixTupleParserFactory.KEY_FORMAT).equals(AsterixTupleParserFactory.FORMAT_ADM)) {
// create an adm parser
ADMDataParser dataParser = new ADMDataParser();
if (configuration.get(HDFSAdapterFactory.KEY_INPUT_FORMAT).equals(HDFSAdapterFactory.INPUT_FORMAT_TEXT)) {
@@ -95,10 +96,10 @@
parser = new AdmOrDelimitedControlledTupleParser(ctx, (ARecordType) atype, in, propagateInput,
inRecDesc, dataParser, propagatedFields, ridFields, retainNull, iNullWriterFactory);
}
- } else if (configuration.get(HDFSAdapterFactory.KEY_FORMAT).equals(HDFSAdapterFactory.FORMAT_DELIMITED_TEXT)) {
+ } else if (configuration.get(AsterixTupleParserFactory.KEY_FORMAT).equals(AsterixTupleParserFactory.FORMAT_DELIMITED_TEXT)) {
// create a delimited text parser
- char delimiter = StreamBasedAdapterFactory.getDelimiter(configuration);
- char quote = StreamBasedAdapterFactory.getQuote(configuration, delimiter);
+ char delimiter = AsterixTupleParserFactory.getDelimiter(configuration);
+ char quote = AsterixTupleParserFactory.getQuote(configuration, delimiter);
DelimitedDataParser dataParser = HDFSIndexingAdapterFactory.getDelimitedDataParser((ARecordType) atype,
delimiter, quote);
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/library/ExternalFunction.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/library/ExternalFunction.java
index a148e66..57a8e48 100755
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/library/ExternalFunction.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/library/ExternalFunction.java
@@ -90,9 +90,9 @@
castBuffer.reset();
ATypeHierarchy.convertNumericTypeByteArray(inputVal.getByteArray(), inputVal.getStartOffset(),
inputVal.getLength(), targetTypeTag, castBuffer.getDataOutput());
- functionHelper.setArgument(i, castBuffer.getByteArray());
+ functionHelper.setArgument(i, castBuffer);
} else {
- functionHelper.setArgument(i, inputVal.getByteArray());
+ functionHelper.setArgument(i, inputVal);
}
}
}
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/library/ExternalFunctionProvider.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/library/ExternalFunctionProvider.java
index fc629ea..d989323 100755
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/library/ExternalFunctionProvider.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/library/ExternalFunctionProvider.java
@@ -14,9 +14,6 @@
*/
package edu.uci.ics.asterix.external.library;
-import java.util.HashMap;
-import java.util.Map;
-
import edu.uci.ics.asterix.om.functions.IExternalFunctionInfo;
import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluator;
@@ -26,19 +23,14 @@
public class ExternalFunctionProvider {
- private static Map<IExternalFunctionInfo, ExternalScalarFunction> functionRepo = new HashMap<IExternalFunctionInfo, ExternalScalarFunction>();
-
public static IExternalFunction getExternalFunctionEvaluator(IExternalFunctionInfo finfo,
ICopyEvaluatorFactory args[], IDataOutputProvider outputProvider) throws AlgebricksException {
switch (finfo.getKind()) {
case SCALAR:
- ExternalScalarFunction function = functionRepo.get(finfo);
- function = new ExternalScalarFunction(finfo, args, outputProvider);
- // functionRepo.put(finfo, function);
- return function;
+ return new ExternalScalarFunction(finfo, args, outputProvider);
case AGGREGATE:
case UNNEST:
- throw new IllegalArgumentException(" not supported function kind" + finfo.getKind());
+ throw new IllegalArgumentException(" UDF of kind" + finfo.getKind() + " not supported.");
default:
throw new IllegalArgumentException(" unknown function kind" + finfo.getKind());
}
@@ -62,9 +54,10 @@
try {
setArguments(tuple);
evaluate(functionHelper);
+ functionHelper.reset();
} catch (Exception e) {
e.printStackTrace();
- throw new AlgebricksException(e);
+ //throw new AlgebricksException(e);
}
}
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/library/IFunctionHelper.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/library/IFunctionHelper.java
index 43eef52..6109588 100755
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/library/IFunctionHelper.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/library/IFunctionHelper.java
@@ -29,4 +29,6 @@
public void setResult(IJObject result) throws IOException, AsterixException;
public IJObject getObject(JTypeTag jtypeTag);
+
+ public void reset();
}
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/library/JTypeObjectFactory.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/library/JTypeObjectFactory.java
index 3c5ddfd..4beb259b 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/library/JTypeObjectFactory.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/library/JTypeObjectFactory.java
@@ -14,7 +14,6 @@
*/
package edu.uci.ics.asterix.external.library;
-import java.util.ArrayList;
import java.util.List;
import edu.uci.ics.asterix.external.library.java.IJObject;
@@ -48,6 +47,11 @@
public class JTypeObjectFactory implements IObjectFactory<IJObject, IAType> {
+ public static final JTypeObjectFactory INSTANCE = new JTypeObjectFactory();
+
+ private JTypeObjectFactory() {
+ }
+
@Override
public IJObject create(IAType type) {
IJObject retValue = null;
@@ -77,7 +81,7 @@
retValue = new JPoint3D(0, 0, 0);
break;
case POLYGON:
- retValue = new JPolygon(new ArrayList<JPoint>());
+ retValue = new JPolygon(new JPoint[] {});
break;
case LINE:
retValue = new JLine(new JPoint(0, 0), new JPoint(0, 0));
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/library/JavaFunctionHelper.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/library/JavaFunctionHelper.java
index 192cf3e..7f5b3bc 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/library/JavaFunctionHelper.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/library/JavaFunctionHelper.java
@@ -14,59 +14,55 @@
*/
package edu.uci.ics.asterix.external.library;
-import java.io.DataOutput;
import java.io.IOException;
-import java.util.List;
+import java.util.HashMap;
+import java.util.Map;
-import edu.uci.ics.asterix.builders.RecordBuilder;
import edu.uci.ics.asterix.common.exceptions.AsterixException;
import edu.uci.ics.asterix.external.library.java.IJObject;
-import edu.uci.ics.asterix.external.library.java.JObjectUtil;
-import edu.uci.ics.asterix.external.library.java.JObjects.ByteArrayAccessibleDataInputStream;
-import edu.uci.ics.asterix.external.library.java.JObjects.ByteArrayAccessibleInputStream;
-import edu.uci.ics.asterix.external.library.java.JObjects.JRecord;
+import edu.uci.ics.asterix.external.library.java.JObjectPointableVisitor;
import edu.uci.ics.asterix.external.library.java.JTypeTag;
-import edu.uci.ics.asterix.formats.nontagged.AqlSerializerDeserializerProvider;
-import edu.uci.ics.asterix.om.base.ARecord;
-import edu.uci.ics.asterix.om.base.AString;
-import edu.uci.ics.asterix.om.base.IAObject;
import edu.uci.ics.asterix.om.functions.IExternalFunctionInfo;
-import edu.uci.ics.asterix.om.types.ARecordType;
-import edu.uci.ics.asterix.om.types.ATypeTag;
+import edu.uci.ics.asterix.om.pointables.AFlatValuePointable;
+import edu.uci.ics.asterix.om.pointables.AListPointable;
+import edu.uci.ics.asterix.om.pointables.ARecordPointable;
+import edu.uci.ics.asterix.om.pointables.PointableAllocator;
+import edu.uci.ics.asterix.om.pointables.base.IVisitablePointable;
import edu.uci.ics.asterix.om.types.BuiltinType;
import edu.uci.ics.asterix.om.types.IAType;
import edu.uci.ics.asterix.om.util.container.IObjectPool;
import edu.uci.ics.asterix.om.util.container.ListObjectPool;
import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
-import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.data.std.api.IDataOutputProvider;
-import edu.uci.ics.hyracks.data.std.util.ArrayBackedValueStorage;
+import edu.uci.ics.hyracks.data.std.api.IValueReference;
public class JavaFunctionHelper implements IFunctionHelper {
private final IExternalFunctionInfo finfo;
private final IDataOutputProvider outputProvider;
- private IJObject[] arguments;
+ private final IJObject[] arguments;
private IJObject resultHolder;
- private ISerializerDeserializer resultSerde;
- private IObjectPool<IJObject, IAType> objectPool = new ListObjectPool<IJObject, IAType>(new JTypeObjectFactory());
- byte[] buffer = new byte[32768];
- ByteArrayAccessibleInputStream bis = new ByteArrayAccessibleInputStream(buffer, 0, buffer.length);
- ByteArrayAccessibleDataInputStream dis = new ByteArrayAccessibleDataInputStream(bis);
+ private final IObjectPool<IJObject, IAType> objectPool = new ListObjectPool<IJObject, IAType>(
+ JTypeObjectFactory.INSTANCE);
+ private final JObjectPointableVisitor pointableVisitor;
+ private final PointableAllocator pointableAllocator;
+ private final Map<Integer, TypeInfo> poolTypeInfo;
public JavaFunctionHelper(IExternalFunctionInfo finfo, IDataOutputProvider outputProvider)
throws AlgebricksException {
this.finfo = finfo;
this.outputProvider = outputProvider;
- List<IAType> params = finfo.getParamList();
- arguments = new IJObject[params.size()];
+ this.pointableVisitor = new JObjectPointableVisitor();
+ this.pointableAllocator = new PointableAllocator();
+ this.arguments = new IJObject[finfo.getParamList().size()];
int index = 0;
- for (IAType param : params) {
- this.arguments[index] = objectPool.allocate(param);
- index++;
+ for (IAType param : finfo.getParamList()) {
+ this.arguments[index++] = objectPool.allocate(param);
}
- resultHolder = objectPool.allocate(finfo.getReturnType());
+ this.resultHolder = objectPool.allocate(finfo.getReturnType());
+ this.poolTypeInfo = new HashMap<Integer, TypeInfo>();
+
}
@Override
@@ -76,110 +72,55 @@
@Override
public void setResult(IJObject result) throws IOException, AsterixException {
- IAObject obj = result.getIAObject();
try {
- outputProvider.getDataOutput().writeByte(obj.getType().getTypeTag().serialize());
- } catch (IOException e) {
+ result.serialize(outputProvider.getDataOutput(), true);
+ result.reset();
+ } catch (IOException | AlgebricksException e) {
throw new HyracksDataException(e);
}
-
- if (obj.getType().getTypeTag().equals(ATypeTag.RECORD)) {
- ARecordType recType = (ARecordType) obj.getType();
- if (recType.isOpen()) {
- writeOpenRecord((JRecord) result, outputProvider.getDataOutput());
- } else {
- resultSerde = AqlSerializerDeserializerProvider.INSTANCE.getNonTaggedSerializerDeserializer(recType);
- resultSerde.serialize(obj, outputProvider.getDataOutput());
- }
- } else {
- resultSerde = AqlSerializerDeserializerProvider.INSTANCE.getNonTaggedSerializerDeserializer(obj.getType());
- resultSerde.serialize(obj, outputProvider.getDataOutput());
- }
- reset();
}
- private void writeOpenRecord(JRecord jRecord, DataOutput dataOutput) throws AsterixException, IOException {
- ARecord aRecord = (ARecord) jRecord.getIAObject();
- RecordBuilder recordBuilder = new RecordBuilder();
- ARecordType recordType = aRecord.getType();
- recordBuilder.reset(recordType);
- ArrayBackedValueStorage fieldName = new ArrayBackedValueStorage();
- ArrayBackedValueStorage fieldValue = new ArrayBackedValueStorage();
- List<Boolean> openField = jRecord.getOpenField();
-
- int fieldIndex = 0;
- int closedFieldId = 0;
- for (IJObject field : jRecord.getFields()) {
- fieldValue.reset();
- switch (field.getTypeTag()) {
- case RECORD:
- ARecordType recType = (ARecordType) field.getIAObject().getType();
- if (recType.isOpen()) {
- fieldValue.getDataOutput().writeByte(recType.getTypeTag().serialize());
- writeOpenRecord((JRecord) field, fieldValue.getDataOutput());
- } else {
- AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(
- field.getIAObject().getType()).serialize(field.getIAObject(),
- fieldValue.getDataOutput());
- }
- break;
- default:
- AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(field.getIAObject().getType())
- .serialize(field.getIAObject(), fieldValue.getDataOutput());
- break;
- }
- if (openField.get(fieldIndex)) {
- String fName = jRecord.getFieldNames().get(fieldIndex);
- fieldName.reset();
- AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.ASTRING).serialize(
- new AString(fName), fieldName.getDataOutput());
- recordBuilder.addField(fieldName, fieldValue);
- } else {
- recordBuilder.addField(closedFieldId, fieldValue);
- closedFieldId++;
- }
- fieldIndex++;
- }
-
- recordBuilder.write(dataOutput, false);
-
- }
-
- private void reset() {
- for (IJObject jObject : arguments) {
- switch (jObject.getTypeTag()) {
- case RECORD:
- reset((JRecord) jObject);
- break;
- }
- }
- switch (resultHolder.getTypeTag()) {
+ public void setArgument(int index, IValueReference valueReference) throws IOException, AsterixException {
+ IVisitablePointable pointable = null;
+ IJObject jObject = null;
+ IAType type = finfo.getParamList().get(index);
+ switch (type.getTypeTag()) {
case RECORD:
- reset((JRecord) resultHolder);
+ pointable = pointableAllocator.allocateRecordValue(type);
+ pointable.set(valueReference);
+ jObject = pointableVisitor.visit((ARecordPointable) pointable, getTypeInfo(index, type));
+ break;
+ case ORDEREDLIST:
+ case UNORDEREDLIST:
+ pointable = pointableAllocator.allocateListValue(type);
+ pointable.set(valueReference);
+ jObject = pointableVisitor.visit((AListPointable) pointable, getTypeInfo(index, type));
+ break;
+ case ANY:
+ throw new IllegalStateException("Cannot handle a function argument of type " + type.getTypeTag());
+ default:
+ pointable = pointableAllocator.allocateFieldValue(type);
+ pointable.set(valueReference);
+ jObject = pointableVisitor.visit((AFlatValuePointable) pointable, getTypeInfo(index, type));
break;
}
+ arguments[index] = jObject;
}
- private void reset(JRecord jRecord) {
- List<IJObject> fields = ((JRecord) jRecord).getFields();
- for (IJObject field : fields) {
- switch (field.getTypeTag()) {
- case RECORD:
- reset((JRecord) field);
- break;
- }
+ private TypeInfo getTypeInfo(int index, IAType type) {
+ TypeInfo typeInfo = poolTypeInfo.get(index);
+ if (typeInfo == null) {
+ typeInfo = new TypeInfo(objectPool, type, type.getTypeTag());
+ poolTypeInfo.put(index, typeInfo);
}
- jRecord.close();
- }
-
- public void setArgument(int index, byte[] argument) throws IOException, AsterixException {
- bis.setContent(argument, 1, argument.length);
- IAType type = finfo.getParamList().get(index);
- arguments[index] = JObjectUtil.getJType(type.getTypeTag(), type, dis, objectPool);
+ return typeInfo;
}
@Override
public IJObject getResultObject() {
+ if (resultHolder == null) {
+ resultHolder = objectPool.allocate(finfo.getReturnType());
+ }
return resultHolder;
}
@@ -197,4 +138,10 @@
return retValue;
}
+ @Override
+ public void reset() {
+ pointableAllocator.reset();
+ objectPool.reset();
+ }
+
}
\ No newline at end of file
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/library/TypeInfo.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/library/TypeInfo.java
new file mode 100644
index 0000000..e2c66ca
--- /dev/null
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/library/TypeInfo.java
@@ -0,0 +1,49 @@
+package edu.uci.ics.asterix.external.library;
+
+import edu.uci.ics.asterix.external.library.java.IJObject;
+import edu.uci.ics.asterix.om.types.ATypeTag;
+import edu.uci.ics.asterix.om.types.IAType;
+import edu.uci.ics.asterix.om.util.container.IObjectPool;
+
+public class TypeInfo {
+
+ private IObjectPool<IJObject, IAType> objectPool;
+ private IAType atype;
+ private ATypeTag typeTag;
+
+ public TypeInfo(IObjectPool<IJObject, IAType> objectPool, IAType atype, ATypeTag typeTag) {
+ this.objectPool = objectPool;
+ this.atype = atype;
+ this.typeTag = typeTag;
+ }
+
+ public void reset(IAType atype, ATypeTag typeTag) {
+ this.atype = atype;
+ this.typeTag = typeTag;
+ }
+
+ public IObjectPool<IJObject, IAType> getObjectPool() {
+ return objectPool;
+ }
+
+ public void setObjectPool(IObjectPool<IJObject, IAType> objectPool) {
+ this.objectPool = objectPool;
+ }
+
+ public IAType getAtype() {
+ return atype;
+ }
+
+ public void setAtype(IAType atype) {
+ this.atype = atype;
+ }
+
+ public ATypeTag getTypeTag() {
+ return typeTag;
+ }
+
+ public void setTypeTag(ATypeTag typeTag) {
+ this.typeTag = typeTag;
+ }
+
+}
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/library/java/IJListAccessor.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/library/java/IJListAccessor.java
new file mode 100644
index 0000000..87db84a
--- /dev/null
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/library/java/IJListAccessor.java
@@ -0,0 +1,12 @@
+package edu.uci.ics.asterix.external.library.java;
+
+import edu.uci.ics.asterix.om.pointables.AListPointable;
+import edu.uci.ics.asterix.om.types.IAType;
+import edu.uci.ics.asterix.om.util.container.IObjectPool;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+
+public interface IJListAccessor {
+
+ IJObject access(AListPointable pointable, IObjectPool<IJObject, IAType> objectPool, IAType listType,
+ JObjectPointableVisitor pointableVisitor) throws HyracksDataException;
+}
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/library/java/IJObject.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/library/java/IJObject.java
index ff8e563..3567e7f 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/library/java/IJObject.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/library/java/IJObject.java
@@ -14,12 +14,20 @@
*/
package edu.uci.ics.asterix.external.library.java;
+import java.io.DataOutput;
+
import edu.uci.ics.asterix.om.base.IAObject;
import edu.uci.ics.asterix.om.types.ATypeTag;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
public interface IJObject {
public ATypeTag getTypeTag();
public IAObject getIAObject();
+
+ public void serialize(DataOutput dataOutput, boolean writeTypeTag) throws HyracksDataException;
+
+ public void reset() throws AlgebricksException;
}
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/library/java/IJObjectAccessor.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/library/java/IJObjectAccessor.java
new file mode 100644
index 0000000..6967243
--- /dev/null
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/library/java/IJObjectAccessor.java
@@ -0,0 +1,11 @@
+package edu.uci.ics.asterix.external.library.java;
+
+import edu.uci.ics.asterix.om.pointables.base.IVisitablePointable;
+import edu.uci.ics.asterix.om.types.IAType;
+import edu.uci.ics.asterix.om.util.container.IObjectPool;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+
+public interface IJObjectAccessor {
+ IJObject access(IVisitablePointable pointable, IObjectPool<IJObject, IAType> obj) throws HyracksDataException;
+
+}
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/library/java/IJRecordAccessor.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/library/java/IJRecordAccessor.java
new file mode 100644
index 0000000..55ae262
--- /dev/null
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/library/java/IJRecordAccessor.java
@@ -0,0 +1,15 @@
+package edu.uci.ics.asterix.external.library.java;
+
+import edu.uci.ics.asterix.external.library.java.JObjects.JRecord;
+import edu.uci.ics.asterix.om.pointables.ARecordPointable;
+import edu.uci.ics.asterix.om.types.ARecordType;
+import edu.uci.ics.asterix.om.types.IAType;
+import edu.uci.ics.asterix.om.util.container.IObjectPool;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+
+public interface IJRecordAccessor {
+
+ public JRecord access(ARecordPointable pointable, IObjectPool<IJObject, IAType> objectPool, ARecordType recordType,
+ JObjectPointableVisitor pointableVisitor) throws HyracksDataException;
+
+}
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/library/java/JObjectAccessors.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/library/java/JObjectAccessors.java
new file mode 100644
index 0000000..02d11b9
--- /dev/null
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/library/java/JObjectAccessors.java
@@ -0,0 +1,571 @@
+package edu.uci.ics.asterix.external.library.java;
+
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.io.UnsupportedEncodingException;
+import java.util.LinkedHashMap;
+import java.util.List;
+
+import edu.uci.ics.asterix.common.exceptions.AsterixException;
+import edu.uci.ics.asterix.dataflow.data.nontagged.serde.ABooleanSerializerDeserializer;
+import edu.uci.ics.asterix.dataflow.data.nontagged.serde.ACircleSerializerDeserializer;
+import edu.uci.ics.asterix.dataflow.data.nontagged.serde.ADateSerializerDeserializer;
+import edu.uci.ics.asterix.dataflow.data.nontagged.serde.ADateTimeSerializerDeserializer;
+import edu.uci.ics.asterix.dataflow.data.nontagged.serde.ADoubleSerializerDeserializer;
+import edu.uci.ics.asterix.dataflow.data.nontagged.serde.ADurationSerializerDeserializer;
+import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AFloatSerializerDeserializer;
+import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt16SerializerDeserializer;
+import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt32SerializerDeserializer;
+import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt64SerializerDeserializer;
+import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt8SerializerDeserializer;
+import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AIntervalSerializerDeserializer;
+import edu.uci.ics.asterix.dataflow.data.nontagged.serde.ALineSerializerDeserializer;
+import edu.uci.ics.asterix.dataflow.data.nontagged.serde.APoint3DSerializerDeserializer;
+import edu.uci.ics.asterix.dataflow.data.nontagged.serde.APointSerializerDeserializer;
+import edu.uci.ics.asterix.dataflow.data.nontagged.serde.APolygonSerializerDeserializer;
+import edu.uci.ics.asterix.dataflow.data.nontagged.serde.ARectangleSerializerDeserializer;
+import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AStringSerializerDeserializer;
+import edu.uci.ics.asterix.dataflow.data.nontagged.serde.ATimeSerializerDeserializer;
+import edu.uci.ics.asterix.external.library.TypeInfo;
+import edu.uci.ics.asterix.external.library.java.JObjects.JBoolean;
+import edu.uci.ics.asterix.external.library.java.JObjects.JByte;
+import edu.uci.ics.asterix.external.library.java.JObjects.JCircle;
+import edu.uci.ics.asterix.external.library.java.JObjects.JDate;
+import edu.uci.ics.asterix.external.library.java.JObjects.JDateTime;
+import edu.uci.ics.asterix.external.library.java.JObjects.JDouble;
+import edu.uci.ics.asterix.external.library.java.JObjects.JDuration;
+import edu.uci.ics.asterix.external.library.java.JObjects.JFloat;
+import edu.uci.ics.asterix.external.library.java.JObjects.JInt;
+import edu.uci.ics.asterix.external.library.java.JObjects.JInterval;
+import edu.uci.ics.asterix.external.library.java.JObjects.JLine;
+import edu.uci.ics.asterix.external.library.java.JObjects.JList;
+import edu.uci.ics.asterix.external.library.java.JObjects.JLong;
+import edu.uci.ics.asterix.external.library.java.JObjects.JOrderedList;
+import edu.uci.ics.asterix.external.library.java.JObjects.JPoint;
+import edu.uci.ics.asterix.external.library.java.JObjects.JPoint3D;
+import edu.uci.ics.asterix.external.library.java.JObjects.JPolygon;
+import edu.uci.ics.asterix.external.library.java.JObjects.JRecord;
+import edu.uci.ics.asterix.external.library.java.JObjects.JRectangle;
+import edu.uci.ics.asterix.external.library.java.JObjects.JString;
+import edu.uci.ics.asterix.external.library.java.JObjects.JTime;
+import edu.uci.ics.asterix.external.library.java.JObjects.JUnorderedList;
+import edu.uci.ics.asterix.external.util.TweetProcessor;
+import edu.uci.ics.asterix.om.base.ACircle;
+import edu.uci.ics.asterix.om.base.ADuration;
+import edu.uci.ics.asterix.om.base.ALine;
+import edu.uci.ics.asterix.om.base.APoint;
+import edu.uci.ics.asterix.om.base.APoint3D;
+import edu.uci.ics.asterix.om.base.APolygon;
+import edu.uci.ics.asterix.om.base.ARectangle;
+import edu.uci.ics.asterix.om.pointables.AFlatValuePointable;
+import edu.uci.ics.asterix.om.pointables.AListPointable;
+import edu.uci.ics.asterix.om.pointables.ARecordPointable;
+import edu.uci.ics.asterix.om.pointables.base.IVisitablePointable;
+import edu.uci.ics.asterix.om.types.ARecordType;
+import edu.uci.ics.asterix.om.types.ATypeTag;
+import edu.uci.ics.asterix.om.types.AbstractCollectionType;
+import edu.uci.ics.asterix.om.types.BuiltinType;
+import edu.uci.ics.asterix.om.types.EnumDeserializer;
+import edu.uci.ics.asterix.om.types.IAType;
+import edu.uci.ics.asterix.om.util.container.IObjectPool;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+
+public class JObjectAccessors {
+
+ public static IJObjectAccessor createFlatJObjectAccessor(ATypeTag aTypeTag) {
+ IJObjectAccessor accessor = null;
+ switch (aTypeTag) {
+ case BOOLEAN:
+ accessor = new JBooleanAccessor();
+ break;
+ case INT8:
+ accessor = new JInt8Accessor();
+ break;
+ case INT16:
+ accessor = new JInt16Accessor();
+ break;
+ case INT32:
+ accessor = new JInt32Accessor();
+ break;
+ case INT64:
+ accessor = new JInt64Accessor();
+ break;
+ case FLOAT:
+ accessor = new JFloatAccessor();
+ break;
+ case DOUBLE:
+ accessor = new JDoubleAccessor();
+ break;
+ case STRING:
+ accessor = new JStringAccessor();
+ break;
+ case POINT:
+ accessor = new JPointAccessor();
+ break;
+ case POINT3D:
+ accessor = new JPoint3DAccessor();
+ break;
+ case LINE:
+ accessor = new JLineAccessor();
+ break;
+ case DATE:
+ accessor = new JDateAccessor();
+ break;
+ case DATETIME:
+ accessor = new JDateTimeAccessor();
+ break;
+ case DURATION:
+ accessor = new JDurationAccessor();
+ break;
+ }
+ return accessor;
+ }
+
+ public static class JInt8Accessor implements IJObjectAccessor {
+
+ @Override
+ public IJObject access(IVisitablePointable pointable, IObjectPool<IJObject, IAType> objectPool)
+ throws HyracksDataException {
+ byte[] b = pointable.getByteArray();
+ int s = pointable.getStartOffset();
+ byte o = AInt8SerializerDeserializer.getByte(b, s + 1);
+ IJObject jObject = objectPool.allocate(BuiltinType.AINT8);
+ ((JByte) jObject).setValue(o);
+ return null;
+ }
+
+ }
+
+ public static class JInt16Accessor implements IJObjectAccessor {
+
+ @Override
+ public IJObject access(IVisitablePointable pointable, IObjectPool<IJObject, IAType> objectPool)
+ throws HyracksDataException {
+ byte[] b = pointable.getByteArray();
+ int s = pointable.getStartOffset();
+ short i = AInt16SerializerDeserializer.getShort(b, s + 1);
+ IJObject jObject = objectPool.allocate(BuiltinType.AINT16);
+ ((JInt) jObject).setValue(i);
+ return null;
+ }
+ }
+
+ public static class JInt32Accessor implements IJObjectAccessor {
+
+ @Override
+ public IJObject access(IVisitablePointable pointable, IObjectPool<IJObject, IAType> objectPool)
+ throws HyracksDataException {
+ byte[] b = pointable.getByteArray();
+ int s = pointable.getStartOffset();
+ int i = AInt32SerializerDeserializer.getInt(b, s + 1);
+ IJObject jObject = objectPool.allocate(BuiltinType.AINT32);
+ ((JInt) jObject).setValue(i);
+ return jObject;
+ }
+ }
+
+ public static class JInt64Accessor implements IJObjectAccessor {
+
+ @Override
+ public IJObject access(IVisitablePointable pointable, IObjectPool<IJObject, IAType> objectPool)
+ throws HyracksDataException {
+ byte[] b = pointable.getByteArray();
+ int s = pointable.getStartOffset();
+ long v = AInt64SerializerDeserializer.getLong(b, s + 1);
+ IJObject jObject = objectPool.allocate(BuiltinType.AINT64);
+ ((JLong) jObject).setValue(v);
+ return jObject;
+ }
+ }
+
+ public static class JFloatAccessor implements IJObjectAccessor {
+
+ @Override
+ public IJObject access(IVisitablePointable pointable, IObjectPool<IJObject, IAType> objectPool)
+ throws HyracksDataException {
+ byte[] b = pointable.getByteArray();
+ int s = pointable.getStartOffset();
+ float v = AFloatSerializerDeserializer.getFloat(b, s + 1);
+ IJObject jObject = objectPool.allocate(BuiltinType.AFLOAT);
+ ((JFloat) jObject).setValue(v);
+ return jObject;
+ }
+ }
+
+ public static class JDoubleAccessor implements IJObjectAccessor {
+
+ @Override
+ public IJObject access(IVisitablePointable pointable, IObjectPool<IJObject, IAType> objectPool)
+ throws HyracksDataException {
+ byte[] b = pointable.getByteArray();
+ int s = pointable.getStartOffset();
+ double v = ADoubleSerializerDeserializer.getDouble(b, s + 1);
+ IJObject jObject = objectPool.allocate(BuiltinType.ADOUBLE);
+ ((JDouble) jObject).setValue(v);
+ return jObject;
+ }
+ }
+
+ public static class JStringAccessor implements IJObjectAccessor {
+
+ @Override
+ public IJObject access(IVisitablePointable pointable, IObjectPool<IJObject, IAType> objectPool)
+ throws HyracksDataException {
+ byte[] b = pointable.getByteArray();
+ int s = pointable.getStartOffset();
+ int l = pointable.getLength();
+
+ String v = null;
+ v = AStringSerializerDeserializer.INSTANCE.deserialize(
+ new DataInputStream(new ByteArrayInputStream(b, s+1, l-1))).getStringValue();
+ //v = new String(b, s+1, l, "UTF-8");
+ TweetProcessor.getNormalizedString(v);
+ IJObject jObject = objectPool.allocate(BuiltinType.ASTRING);
+ ((JString) jObject).setValue(TweetProcessor.getNormalizedString(v));
+ return jObject;
+ }
+ }
+
+ public static class JBooleanAccessor implements IJObjectAccessor {
+
+ @Override
+ public IJObject access(IVisitablePointable pointable, IObjectPool<IJObject, IAType> objectPool)
+ throws HyracksDataException {
+ byte[] b = pointable.getByteArray();
+ int s = pointable.getStartOffset();
+ Boolean v = ABooleanSerializerDeserializer.getBoolean(b, s);
+ IJObject jObject = objectPool.allocate(BuiltinType.ABOOLEAN);
+ ((JBoolean) jObject).setValue(v);
+ return jObject;
+ }
+ }
+
+ public static class JDateAccessor implements IJObjectAccessor {
+
+ @Override
+ public IJObject access(IVisitablePointable pointable, IObjectPool<IJObject, IAType> objectPool)
+ throws HyracksDataException {
+ byte[] b = pointable.getByteArray();
+ int s = pointable.getStartOffset();
+ int v = ADateSerializerDeserializer.getChronon(b, s);
+ IJObject jObject = objectPool.allocate(BuiltinType.ADATE);
+ ((JDate) jObject).setValue(v);
+ return jObject;
+ }
+ }
+
+ public static class JDateTimeAccessor implements IJObjectAccessor {
+
+ @Override
+ public IJObject access(IVisitablePointable pointable, IObjectPool<IJObject, IAType> objectPool)
+ throws HyracksDataException {
+ byte[] b = pointable.getByteArray();
+ int s = pointable.getStartOffset();
+ long v = ADateTimeSerializerDeserializer.getChronon(b, s);
+ IJObject jObject = objectPool.allocate(BuiltinType.ADATETIME);
+ ((JDateTime) jObject).setValue(v);
+ return jObject;
+ }
+ }
+
+ public static class JDurationAccessor implements IJObjectAccessor {
+
+ @Override
+ public IJObject access(IVisitablePointable pointable, IObjectPool<IJObject, IAType> objectPool)
+ throws HyracksDataException {
+ byte[] b = pointable.getByteArray();
+ int s = pointable.getStartOffset();
+ int l = pointable.getLength();
+ ADuration duration = ADurationSerializerDeserializer.INSTANCE.deserialize(new DataInputStream(
+ new ByteArrayInputStream(b, s, l)));
+ IJObject jObject = objectPool.allocate(BuiltinType.ADURATION);
+ ((JDuration) jObject).setValue(duration.getMonths(), duration.getMilliseconds());
+ return jObject;
+ }
+ }
+
+ public static class JTimeAccessor implements IJObjectAccessor {
+
+ @Override
+ public IJObject access(IVisitablePointable pointable, IObjectPool<IJObject, IAType> objectPool)
+ throws HyracksDataException {
+ byte[] b = pointable.getByteArray();
+ int s = pointable.getStartOffset();
+ int v = ATimeSerializerDeserializer.getChronon(b, s);
+ IJObject jObject = objectPool.allocate(BuiltinType.ATIME);
+ ((JTime) jObject).setValue(v);
+ return jObject;
+ }
+ }
+
+ public static class JIntervalAccessor implements IJObjectAccessor {
+
+ @Override
+ public IJObject access(IVisitablePointable pointable, IObjectPool<IJObject, IAType> objectPool)
+ throws HyracksDataException {
+ byte[] b = pointable.getByteArray();
+ int s = pointable.getStartOffset();
+ long intervalStart = AIntervalSerializerDeserializer.getIntervalStart(b, s);
+ long intervalEnd = AIntervalSerializerDeserializer.getIntervalEnd(b, s);
+ byte intervalType = AIntervalSerializerDeserializer.getIntervalTimeType(b, s);
+ IJObject jObject = objectPool.allocate(BuiltinType.AINTERVAL);
+ try {
+ ((JInterval) jObject).setValue(intervalStart, intervalEnd, intervalType);
+ } catch (AlgebricksException e) {
+ throw new HyracksDataException(e);
+ }
+ return jObject;
+ }
+ }
+
+ // Spatial Types
+
+ public static class JCircleAccessor implements IJObjectAccessor {
+
+ @Override
+ public IJObject access(IVisitablePointable pointable, IObjectPool<IJObject, IAType> objectPool)
+ throws HyracksDataException {
+ byte[] b = pointable.getByteArray();
+ int s = pointable.getStartOffset();
+ int l = pointable.getLength();
+ ACircle v = ACircleSerializerDeserializer.INSTANCE.deserialize(new DataInputStream(
+ new ByteArrayInputStream(b, s, l)));
+ JPoint jpoint = (JPoint) objectPool.allocate(BuiltinType.APOINT);
+ jpoint.setValue(v.getP().getX(), v.getP().getY());
+ IJObject jObject = objectPool.allocate(BuiltinType.ACIRCLE);
+ ((JCircle) jObject).setValue(jpoint, v.getRadius());
+ return jObject;
+ }
+ }
+
+ public static class JPointAccessor implements IJObjectAccessor {
+
+ @Override
+ public IJObject access(IVisitablePointable pointable, IObjectPool<IJObject, IAType> objectPool)
+ throws HyracksDataException {
+ byte[] b = pointable.getByteArray();
+ int s = pointable.getStartOffset();
+ int l = pointable.getLength();
+ APoint v = APointSerializerDeserializer.INSTANCE.deserialize(new DataInputStream(new ByteArrayInputStream(
+ b, s, l)));
+ JPoint jObject = (JPoint) objectPool.allocate(BuiltinType.APOINT);
+ jObject.setValue(v.getX(), v.getY());
+ return jObject;
+ }
+ }
+
+ public static class JPoint3DAccessor implements IJObjectAccessor {
+
+ @Override
+ public IJObject access(IVisitablePointable pointable, IObjectPool<IJObject, IAType> objectPool)
+ throws HyracksDataException {
+ byte[] b = pointable.getByteArray();
+ int s = pointable.getStartOffset();
+ int l = pointable.getLength();
+ APoint3D v = APoint3DSerializerDeserializer.INSTANCE.deserialize(new DataInputStream(
+ new ByteArrayInputStream(b, s, l)));
+ JPoint3D jObject = (JPoint3D) objectPool.allocate(BuiltinType.APOINT3D);
+ jObject.setValue(v.getX(), v.getY(), v.getZ());
+ return jObject;
+ }
+ }
+
+ public static class JLineAccessor implements IJObjectAccessor {
+
+ @Override
+ public IJObject access(IVisitablePointable pointable, IObjectPool<IJObject, IAType> objectPool)
+ throws HyracksDataException {
+ byte[] b = pointable.getByteArray();
+ int s = pointable.getStartOffset();
+ int l = pointable.getLength();
+ ALine v = ALineSerializerDeserializer.INSTANCE.deserialize(new DataInputStream(new ByteArrayInputStream(b,
+ s, l)));
+ JLine jObject = (JLine) objectPool.allocate(BuiltinType.ALINE);
+ jObject.setValue(v.getP1(), v.getP2());
+ return jObject;
+ }
+ }
+
+ public static class JPolygonAccessor implements IJObjectAccessor {
+
+ @Override
+ public IJObject access(IVisitablePointable pointable, IObjectPool<IJObject, IAType> objectPool)
+ throws HyracksDataException {
+ byte[] b = pointable.getByteArray();
+ int s = pointable.getStartOffset();
+ int l = pointable.getLength();
+ APolygon v = APolygonSerializerDeserializer.INSTANCE.deserialize(new DataInputStream(
+ new ByteArrayInputStream(b, s, l)));
+ JPolygon jObject = (JPolygon) objectPool.allocate(BuiltinType.APOLYGON);
+ jObject.setValue(v.getPoints());
+ return jObject;
+ }
+ }
+
+ public static class JRectangleAccessor implements IJObjectAccessor {
+
+ @Override
+ public IJObject access(IVisitablePointable pointable, IObjectPool<IJObject, IAType> objectPool)
+ throws HyracksDataException {
+ byte[] b = pointable.getByteArray();
+ int s = pointable.getStartOffset();
+ int l = pointable.getLength();
+ ARectangle v = ARectangleSerializerDeserializer.INSTANCE.deserialize(new DataInputStream(
+ new ByteArrayInputStream(b, s, l)));
+ JRectangle jObject = (JRectangle) objectPool.allocate(BuiltinType.ARECTANGLE);
+ jObject.setValue(v.getP1(), v.getP2());
+ return jObject;
+ }
+ }
+
+ public static class JRecordAccessor implements IJRecordAccessor {
+
+ private final TypeInfo typeInfo;
+ private final JRecord jRecord;
+ private final IJObject[] jObjects;
+ private final LinkedHashMap<String, IJObject> openFields;
+
+ public JRecordAccessor(ARecordType recordType, IObjectPool<IJObject, IAType> objectPool) {
+ this.typeInfo = new TypeInfo(objectPool, null, null);
+ this.jObjects = new IJObject[recordType.getFieldNames().length];
+ this.jRecord = new JRecord(recordType, jObjects);
+ this.openFields = new LinkedHashMap<String, IJObject>();
+ }
+
+ @Override
+ public JRecord access(ARecordPointable pointable, IObjectPool<IJObject, IAType> objectPool,
+ ARecordType recordType, JObjectPointableVisitor pointableVisitor) throws HyracksDataException {
+ try {
+ jRecord.reset();
+ } catch (AlgebricksException e) {
+ throw new HyracksDataException(e);
+ }
+ ARecordPointable recordPointable = (ARecordPointable) pointable;
+ List<IVisitablePointable> fieldPointables = recordPointable.getFieldValues();
+ List<IVisitablePointable> fieldTypeTags = recordPointable.getFieldTypeTags();
+ List<IVisitablePointable> fieldNames = recordPointable.getFieldNames();
+ int index = 0;
+ boolean closedPart = true;
+ try {
+ IJObject fieldObject = null;
+ for (IVisitablePointable fieldPointable : fieldPointables) {
+ closedPart = index < recordType.getFieldTypes().length;
+ IVisitablePointable tt = fieldTypeTags.get(index);
+ IAType fieldType = closedPart ? recordType.getFieldTypes()[index] : null;
+ ATypeTag typeTag = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(tt.getByteArray()[tt
+ .getStartOffset()]);
+ IVisitablePointable fieldName = fieldNames.get(index);
+ typeInfo.reset(fieldType, typeTag);
+ switch (typeTag) {
+ case RECORD:
+ fieldObject = pointableVisitor.visit((ARecordPointable) fieldPointable, typeInfo);
+ break;
+ case ORDEREDLIST:
+ case UNORDEREDLIST:
+ if (fieldPointable instanceof AFlatValuePointable) {
+ // value is null
+ fieldObject = null;
+ } else {
+ fieldObject = pointableVisitor.visit((AListPointable) fieldPointable, typeInfo);
+ }
+ break;
+ case ANY:
+ break;
+ default:
+ fieldObject = pointableVisitor.visit((AFlatValuePointable) fieldPointable, typeInfo);
+ }
+ if (closedPart) {
+ jObjects[index] = fieldObject;
+ } else {
+ byte[] b = fieldName.getByteArray();
+ int s = fieldName.getStartOffset();
+ int l = fieldName.getLength();
+ String v = AStringSerializerDeserializer.INSTANCE.deserialize(
+ new DataInputStream(new ByteArrayInputStream(b, s + 1, l - 1))).getStringValue();
+ openFields.put(v, fieldObject);
+ }
+ index++;
+ fieldObject = null;
+ }
+
+ } catch (Exception e) {
+ e.printStackTrace();
+ throw new HyracksDataException(e);
+ }
+ return jRecord;
+ }
+
+ public void reset() throws HyracksDataException {
+ try {
+ jRecord.reset();
+ } catch (AlgebricksException e) {
+ throw new HyracksDataException(e);
+ }
+ openFields.clear();
+ }
+
+ }
+
+ public static class JListAccessor implements IJListAccessor {
+
+ private final TypeInfo typeInfo;
+
+ public JListAccessor(IObjectPool<IJObject, IAType> objectPool) {
+ this.typeInfo = new TypeInfo(objectPool, null, null);
+ }
+
+ @Override
+ public IJObject access(AListPointable pointable, IObjectPool<IJObject, IAType> objectPool, IAType listType,
+ JObjectPointableVisitor pointableVisitor) throws HyracksDataException {
+ List<IVisitablePointable> items = pointable.getItems();
+ List<IVisitablePointable> itemTags = pointable.getItemTags();
+ JList list = pointable.ordered() ? new JOrderedList(listType) : new JUnorderedList(listType);
+ IJObject listItem = null;
+ int index = 0;
+ try {
+
+ for (IVisitablePointable itemPointable : items) {
+ IVisitablePointable itemTagPointable = itemTags.get(index);
+ ATypeTag itemTypeTag = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(itemTagPointable
+ .getByteArray()[itemTagPointable.getStartOffset()]);
+ typeInfo.reset(listType.getType(), listType.getTypeTag());
+ switch (itemTypeTag) {
+ case RECORD:
+ listItem = pointableVisitor.visit((ARecordPointable) itemPointable, typeInfo);
+ break;
+ case UNORDEREDLIST:
+ case ORDEREDLIST:
+ listItem = pointableVisitor.visit((AListPointable) itemPointable, typeInfo);
+ break;
+ case ANY:
+ throw new IllegalArgumentException("Cannot parse list item of type "
+ + listType.getTypeTag());
+ default:
+ typeInfo.reset(((AbstractCollectionType) listType).getItemType(),
+ ((AbstractCollectionType) listType).getTypeTag());
+ listItem = pointableVisitor.visit((AFlatValuePointable) itemPointable, typeInfo);
+
+ }
+ ATypeTag typeTag = EnumDeserializer.ATYPETAGDESERIALIZER
+ .deserialize(itemPointable.getByteArray()[itemPointable.getStartOffset()]);
+
+ list.add(listItem);
+ }
+ } catch (AsterixException exception) {
+ throw new HyracksDataException(exception);
+ }
+ return list;
+ }
+ }
+
+ public static class JUnorderedListAccessor implements IJObjectAccessor {
+
+ @Override
+ public IJObject access(IVisitablePointable pointable, IObjectPool<IJObject, IAType> objectPool)
+ throws HyracksDataException {
+ return null;
+ }
+
+ }
+}
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/library/java/JObjectPointableVisitor.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/library/java/JObjectPointableVisitor.java
new file mode 100644
index 0000000..0f4ce58
--- /dev/null
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/library/java/JObjectPointableVisitor.java
@@ -0,0 +1,75 @@
+package edu.uci.ics.asterix.external.library.java;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import edu.uci.ics.asterix.common.exceptions.AsterixException;
+import edu.uci.ics.asterix.external.library.TypeInfo;
+import edu.uci.ics.asterix.external.library.java.JObjectAccessors.JListAccessor;
+import edu.uci.ics.asterix.external.library.java.JObjectAccessors.JRecordAccessor;
+import edu.uci.ics.asterix.om.pointables.AFlatValuePointable;
+import edu.uci.ics.asterix.om.pointables.AListPointable;
+import edu.uci.ics.asterix.om.pointables.ARecordPointable;
+import edu.uci.ics.asterix.om.pointables.base.IVisitablePointable;
+import edu.uci.ics.asterix.om.pointables.visitor.IVisitablePointableVisitor;
+import edu.uci.ics.asterix.om.types.ARecordType;
+import edu.uci.ics.asterix.om.types.ATypeTag;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+
+public class JObjectPointableVisitor implements IVisitablePointableVisitor<IJObject, TypeInfo> {
+
+ private final Map<ATypeTag, IJObjectAccessor> flatJObjectAccessors = new HashMap<ATypeTag, IJObjectAccessor>();
+ private final Map<IVisitablePointable, IJRecordAccessor> raccessorToJObject = new HashMap<IVisitablePointable, IJRecordAccessor>();
+ private final Map<IVisitablePointable, IJListAccessor> laccessorToPrinter = new HashMap<IVisitablePointable, IJListAccessor>();
+
+ @Override
+ public IJObject visit(AListPointable accessor, TypeInfo arg) throws AsterixException {
+ IJObject result = null;
+ IJListAccessor jListAccessor = laccessorToPrinter.get(accessor);
+ if (jListAccessor == null) {
+ jListAccessor = new JListAccessor(arg.getObjectPool());
+ laccessorToPrinter.put(accessor, jListAccessor);
+ }
+ try {
+ result = jListAccessor.access(accessor, arg.getObjectPool(), arg.getAtype(), this);
+ } catch (Exception e) {
+ throw new AsterixException(e);
+ }
+ return result;
+ }
+
+ @Override
+ public IJObject visit(ARecordPointable accessor, TypeInfo arg) throws AsterixException {
+ IJObject result = null;
+ IJRecordAccessor jRecordAccessor = raccessorToJObject.get(accessor);
+ if (jRecordAccessor == null) {
+ jRecordAccessor = new JRecordAccessor(accessor.getInputRecordType(), arg.getObjectPool());
+ raccessorToJObject.put(accessor, jRecordAccessor);
+ }
+ try {
+ result = jRecordAccessor.access(accessor, arg.getObjectPool(), (ARecordType) arg.getAtype(), this);
+ } catch (Exception e) {
+ throw new AsterixException(e);
+ }
+ return result;
+ }
+
+ @Override
+ public IJObject visit(AFlatValuePointable accessor, TypeInfo arg) throws AsterixException {
+ ATypeTag typeTag = arg.getTypeTag();
+ IJObject result = null;
+ IJObjectAccessor jObjectAccessor = flatJObjectAccessors.get(typeTag);
+ if (jObjectAccessor == null) {
+ jObjectAccessor = JObjectAccessors.createFlatJObjectAccessor(typeTag);
+ flatJObjectAccessors.put(typeTag, jObjectAccessor);
+ }
+
+ try {
+ result = jObjectAccessor.access(accessor, arg.getObjectPool());
+ } catch (HyracksDataException e) {
+ throw new AsterixException(e);
+ }
+ return result;
+ }
+
+}
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/library/java/JObjectUtil.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/library/java/JObjectUtil.java
index ff662ae..f5f404a 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/library/java/JObjectUtil.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/library/java/JObjectUtil.java
@@ -42,6 +42,7 @@
import edu.uci.ics.asterix.external.library.java.JObjects.JString;
import edu.uci.ics.asterix.external.library.java.JObjects.JTime;
import edu.uci.ics.asterix.external.library.java.JObjects.JUnorderedList;
+import edu.uci.ics.asterix.om.base.APoint;
import edu.uci.ics.asterix.om.types.AOrderedListType;
import edu.uci.ics.asterix.om.types.ARecordType;
import edu.uci.ics.asterix.om.types.ATypeTag;
@@ -52,6 +53,7 @@
import edu.uci.ics.asterix.om.types.IAType;
import edu.uci.ics.asterix.om.util.NonTaggedFormatUtil;
import edu.uci.ics.asterix.om.util.container.IObjectPool;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
public class JObjectUtil {
@@ -129,7 +131,11 @@
long start = dis.readLong();
long end = dis.readLong();
byte intervalType = dis.readByte();
- ((JInterval) jObject).setValue(start, end, intervalType);
+ try {
+ ((JInterval) jObject).setValue(start, end, intervalType);
+ } catch (AlgebricksException e) {
+ throw new AsterixException(e);
+ }
break;
}
@@ -184,7 +190,7 @@
p1.setValue(dis.readDouble(), dis.readDouble());
points.add(p1);
}
- ((JPolygon) jObject).setValue(points);
+ ((JPolygon) jObject).setValue(points.toArray(new APoint[]{}));
break;
}
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/library/java/JObjects.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/library/java/JObjects.java
index 191fda6..61a60b6 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/library/java/JObjects.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/library/java/JObjects.java
@@ -16,21 +16,56 @@
import java.io.ByteArrayInputStream;
import java.io.DataInputStream;
+import java.io.DataOutput;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import edu.uci.ics.asterix.builders.IAsterixListBuilder;
+import edu.uci.ics.asterix.builders.RecordBuilder;
+import edu.uci.ics.asterix.builders.UnorderedListBuilder;
import edu.uci.ics.asterix.common.exceptions.AsterixException;
+import edu.uci.ics.asterix.dataflow.data.nontagged.serde.ABooleanSerializerDeserializer;
+import edu.uci.ics.asterix.dataflow.data.nontagged.serde.ACircleSerializerDeserializer;
+import edu.uci.ics.asterix.dataflow.data.nontagged.serde.ADateSerializerDeserializer;
+import edu.uci.ics.asterix.dataflow.data.nontagged.serde.ADateTimeSerializerDeserializer;
+import edu.uci.ics.asterix.dataflow.data.nontagged.serde.ADoubleSerializerDeserializer;
+import edu.uci.ics.asterix.dataflow.data.nontagged.serde.ADurationSerializerDeserializer;
+import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AFloatSerializerDeserializer;
+import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt16SerializerDeserializer;
+import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt32SerializerDeserializer;
+import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt64SerializerDeserializer;
+import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AInt8SerializerDeserializer;
+import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AIntervalSerializerDeserializer;
+import edu.uci.ics.asterix.dataflow.data.nontagged.serde.ALineSerializerDeserializer;
+import edu.uci.ics.asterix.dataflow.data.nontagged.serde.APoint3DSerializerDeserializer;
+import edu.uci.ics.asterix.dataflow.data.nontagged.serde.APointSerializerDeserializer;
+import edu.uci.ics.asterix.dataflow.data.nontagged.serde.APolygonSerializerDeserializer;
+import edu.uci.ics.asterix.dataflow.data.nontagged.serde.ARectangleSerializerDeserializer;
+import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AStringSerializerDeserializer;
+import edu.uci.ics.asterix.dataflow.data.nontagged.serde.ATimeSerializerDeserializer;
import edu.uci.ics.asterix.om.base.ABoolean;
+import edu.uci.ics.asterix.om.base.ADouble;
+import edu.uci.ics.asterix.om.base.AFloat;
+import edu.uci.ics.asterix.om.base.AInt16;
+import edu.uci.ics.asterix.om.base.AInt32;
+import edu.uci.ics.asterix.om.base.AInt64;
+import edu.uci.ics.asterix.om.base.AInt8;
import edu.uci.ics.asterix.om.base.AMutableCircle;
import edu.uci.ics.asterix.om.base.AMutableDate;
import edu.uci.ics.asterix.om.base.AMutableDateTime;
import edu.uci.ics.asterix.om.base.AMutableDouble;
import edu.uci.ics.asterix.om.base.AMutableDuration;
import edu.uci.ics.asterix.om.base.AMutableFloat;
+import edu.uci.ics.asterix.om.base.AMutableInt16;
import edu.uci.ics.asterix.om.base.AMutableInt32;
import edu.uci.ics.asterix.om.base.AMutableInt64;
+import edu.uci.ics.asterix.om.base.AMutableInt8;
import edu.uci.ics.asterix.om.base.AMutableInterval;
import edu.uci.ics.asterix.om.base.AMutableLine;
import edu.uci.ics.asterix.om.base.AMutableOrderedList;
@@ -43,15 +78,17 @@
import edu.uci.ics.asterix.om.base.AMutableTime;
import edu.uci.ics.asterix.om.base.AMutableUnorderedList;
import edu.uci.ics.asterix.om.base.APoint;
+import edu.uci.ics.asterix.om.base.ARectangle;
+import edu.uci.ics.asterix.om.base.AString;
import edu.uci.ics.asterix.om.base.IAObject;
import edu.uci.ics.asterix.om.types.AOrderedListType;
import edu.uci.ics.asterix.om.types.ARecordType;
import edu.uci.ics.asterix.om.types.ATypeTag;
import edu.uci.ics.asterix.om.types.AUnorderedListType;
-import edu.uci.ics.asterix.om.types.BuiltinType;
import edu.uci.ics.asterix.om.types.IAType;
import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.data.std.util.ArrayBackedValueStorage;
public class JObjects {
@@ -73,26 +110,84 @@
public IAObject getIAObject() {
return value;
}
+
}
- public static final class JInt implements IJObject {
+ public static final class JByte extends JObject {
- private AMutableInt32 value;
+ public JByte(byte value) {
+ super(new AMutableInt8(value));
+ }
+
+ public void setValue(byte v) {
+ ((AMutableInt8) value).setValue(v);
+ }
+
+ public byte getValue() {
+ return ((AMutableInt8) value).getByteValue();
+ }
+
+ @Override
+ public void serialize(DataOutput dataOutput, boolean writeTypeTag) throws HyracksDataException {
+ if (writeTypeTag) {
+ try {
+ dataOutput.writeByte(value.getType().getTypeTag().serialize());
+ } catch (IOException e) {
+ throw new HyracksDataException(e);
+ }
+ }
+ AInt8SerializerDeserializer.INSTANCE.serialize((AInt8) value, dataOutput);
+ }
+
+ @Override
+ public void reset() {
+ ((AMutableInt8) value).setValue((byte) 0);
+ }
+ }
+
+ public static final class JShort extends JObject {
+
+ private AMutableInt16 value;
+
+ public JShort(short value) {
+ super(new AMutableInt16(value));
+ }
+
+ public void setValue(byte v) {
+ ((AMutableInt16) value).setValue(v);
+ }
+
+ public short getValue() {
+ return ((AMutableInt16) value).getShortValue();
+ }
+
+ @Override
+ public void serialize(DataOutput dataOutput, boolean writeTypeTag) throws HyracksDataException {
+ if (writeTypeTag) {
+ try {
+ dataOutput.writeByte(value.getType().getTypeTag().serialize());
+ } catch (IOException e) {
+ throw new HyracksDataException(e);
+ }
+ }
+ AInt16SerializerDeserializer.INSTANCE.serialize((AInt16) value, dataOutput);
+ }
+
+ @Override
+ public void reset() {
+ ((AMutableInt16) value).setValue((short) 0);
+ }
+
+ }
+
+ public static final class JInt extends JObject {
public JInt(int value) {
- this.value = new AMutableInt32(value);
+ super(new AMutableInt32(value));
}
public void setValue(int v) {
- if (value == null) {
- value = new AMutableInt32(v);
- } else {
- ((AMutableInt32) value).setValue(v);
- }
- }
-
- public void setValue(AMutableInt32 v) {
- value = v;
+ ((AMutableInt32) value).setValue(v);
}
public int getValue() {
@@ -100,15 +195,21 @@
}
@Override
- public ATypeTag getTypeTag() {
- return BuiltinType.AINT32.getTypeTag();
+ public void serialize(DataOutput dataOutput, boolean writeTypeTag) throws HyracksDataException {
+ if (writeTypeTag) {
+ try {
+ dataOutput.writeByte(value.getType().getTypeTag().serialize());
+ } catch (IOException e) {
+ throw new HyracksDataException(e);
+ }
+ }
+ AInt32SerializerDeserializer.INSTANCE.serialize((AInt32) value, dataOutput);
}
@Override
- public IAObject getIAObject() {
- return value;
+ public void reset() {
+ ((AMutableInt32) value).setValue(0);
}
-
}
public static final class JBoolean implements IJObject {
@@ -133,6 +234,23 @@
return value ? ABoolean.TRUE : ABoolean.FALSE;
}
+ @Override
+ public void serialize(DataOutput dataOutput, boolean writeTypeTag) throws HyracksDataException {
+ if (writeTypeTag) {
+ try {
+ dataOutput.writeByte(ATypeTag.BOOLEAN.serialize());
+ } catch (IOException e) {
+ throw new HyracksDataException(e);
+ }
+ }
+ ABooleanSerializerDeserializer.INSTANCE.serialize((ABoolean) getIAObject(), dataOutput);
+ }
+
+ @Override
+ public void reset() {
+ value = false;
+ }
+
}
public static final class JLong extends JObject {
@@ -149,6 +267,23 @@
return ((AMutableInt64) value).getLongValue();
}
+ @Override
+ public void serialize(DataOutput dataOutput, boolean writeTypeTag) throws HyracksDataException {
+ if (writeTypeTag) {
+ try {
+ dataOutput.writeByte(value.getType().getTypeTag().serialize());
+ } catch (IOException e) {
+ throw new HyracksDataException(e);
+ }
+ }
+ AInt64SerializerDeserializer.INSTANCE.serialize((AInt64) value, dataOutput);
+ }
+
+ @Override
+ public void reset() {
+ ((AMutableInt64) value).setValue(0);
+ }
+
}
public static final class JDouble extends JObject {
@@ -165,6 +300,23 @@
return ((AMutableDouble) value).getDoubleValue();
}
+ @Override
+ public void serialize(DataOutput dataOutput, boolean writeTypeTag) throws HyracksDataException {
+ if (writeTypeTag) {
+ try {
+ dataOutput.writeByte(value.getType().getTypeTag().serialize());
+ } catch (IOException e) {
+ throw new HyracksDataException(e);
+ }
+ }
+ ADoubleSerializerDeserializer.INSTANCE.serialize((ADouble) value, dataOutput);
+ }
+
+ @Override
+ public void reset() {
+ ((AMutableDouble) value).setValue(0);
+ }
+
}
public static final class JString extends JObject {
@@ -181,14 +333,29 @@
return ((AMutableString) value).getStringValue();
}
+ @Override
+ public void serialize(DataOutput dataOutput, boolean writeTypeTag) throws HyracksDataException {
+ if (writeTypeTag) {
+ try {
+ dataOutput.writeByte(value.getType().getTypeTag().serialize());
+ } catch (IOException e) {
+ throw new HyracksDataException(e);
+ }
+ }
+ AStringSerializerDeserializer.INSTANCE.serialize((AString) value, dataOutput);
+ }
+
+ @Override
+ public void reset() {
+ ((AMutableString) value).setValue("");
+ }
+
}
- public static final class JFloat implements IJObject {
-
- private AMutableFloat value;
+ public static final class JFloat extends JObject {
public JFloat(float v) {
- value = new AMutableFloat(v);
+ super(new AMutableFloat(v));
}
public void setValue(float v) {
@@ -200,13 +367,20 @@
}
@Override
- public ATypeTag getTypeTag() {
- return BuiltinType.AFLOAT.getTypeTag();
+ public void serialize(DataOutput dataOutput, boolean writeTypeTag) throws HyracksDataException {
+ if (writeTypeTag) {
+ try {
+ dataOutput.writeByte(value.getType().getTypeTag().serialize());
+ } catch (IOException e) {
+ throw new HyracksDataException(e);
+ }
+ }
+ AFloatSerializerDeserializer.INSTANCE.serialize((AFloat) value, dataOutput);
}
@Override
- public IAObject getIAObject() {
- return value;
+ public void reset() {
+ ((AMutableFloat) value).setValue(0);
}
}
@@ -237,18 +411,37 @@
public String toString() {
return value.toString();
}
+
+ @Override
+ public void serialize(DataOutput dataOutput, boolean writeTypeTag) throws HyracksDataException {
+ if (writeTypeTag) {
+ try {
+ dataOutput.writeByte(value.getType().getTypeTag().serialize());
+ } catch (IOException e) {
+ throw new HyracksDataException(e);
+ }
+ }
+ APointSerializerDeserializer.INSTANCE.serialize((APoint) value, dataOutput);
+ }
+
+ @Override
+ public void reset() {
+ ((AMutablePoint) value).setValue(0, 0);
+ }
}
- public static final class JRectangle implements IJObject {
-
- private AMutableRectangle rect;
+ public static final class JRectangle extends JObject {
public JRectangle(JPoint p1, JPoint p2) {
- rect = new AMutableRectangle((APoint) p1.getValue(), (APoint) p2.getValue());
+ super(new AMutableRectangle((APoint) p1.getIAObject(), (APoint) p2.getIAObject()));
}
public void setValue(JPoint p1, JPoint p2) {
- this.rect.setValue((APoint) p1.getValue(), (APoint) p2.getValue());
+ ((AMutableRectangle) value).setValue((APoint) p1.getValue(), (APoint) p2.getValue());
+ }
+
+ public void setValue(APoint p1, APoint p2) {
+ ((AMutableRectangle) value).setValue(p1, p2);
}
@Override
@@ -257,190 +450,200 @@
}
@Override
- public IAObject getIAObject() {
- return rect;
+ public void serialize(DataOutput dataOutput, boolean writeTypeTag) throws HyracksDataException {
+ if (writeTypeTag) {
+ try {
+ dataOutput.writeByte(value.getType().getTypeTag().serialize());
+ } catch (IOException e) {
+ throw new HyracksDataException(e);
+ }
+ }
+ ARectangleSerializerDeserializer.INSTANCE.serialize((ARectangle) value, dataOutput);
}
@Override
- public String toString() {
- return rect.toString();
+ public void reset() {
}
}
- public static final class JTime implements IJObject {
-
- private AMutableTime time;
+ public static final class JTime extends JObject {
public JTime(int timeInMillsec) {
- time = new AMutableTime(timeInMillsec);
+ super(new AMutableTime(timeInMillsec));
}
public void setValue(int timeInMillsec) {
- time.setValue(timeInMillsec);
+ ((AMutableTime) value).setValue(timeInMillsec);
}
@Override
- public ATypeTag getTypeTag() {
- return ATypeTag.TIME;
+ public void serialize(DataOutput dataOutput, boolean writeTypeTag) throws HyracksDataException {
+ if (writeTypeTag) {
+ try {
+ dataOutput.writeByte(ATypeTag.TIME.serialize());
+ } catch (IOException e) {
+ throw new HyracksDataException(e);
+ }
+ }
+ ATimeSerializerDeserializer.INSTANCE.serialize((AMutableTime) value, dataOutput);
}
@Override
- public IAObject getIAObject() {
- return time;
- }
-
- @Override
- public String toString() {
- return time.toString();
+ public void reset() {
+ ((AMutableTime) value).setValue(0);
}
}
- public static final class JInterval implements IJObject {
-
- private AMutableInterval interval;
+ public static final class JInterval extends JObject {
public JInterval(long intervalStart, long intervalEnd) {
- interval = new AMutableInterval(intervalStart, intervalEnd, (byte) 0);
+ super(new AMutableInterval(intervalStart, intervalEnd, (byte) 0));
}
- public void setValue(long intervalStart, long intervalEnd, byte typetag) throws AsterixException {
- try {
- interval.setValue(intervalStart, intervalEnd, typetag);
- } catch (AlgebricksException e) {
- throw new AsterixException(e);
- }
- }
-
- @Override
- public ATypeTag getTypeTag() {
- return ATypeTag.INTERVAL;
- }
-
- @Override
- public IAObject getIAObject() {
- return interval;
- }
-
- @Override
- public String toString() {
- return interval.toString();
+ public void setValue(long intervalStart, long intervalEnd, byte typetag) throws AlgebricksException {
+ ((AMutableInterval) value).setValue(intervalStart, intervalEnd, typetag);
}
public long getIntervalStart() {
- return interval.getIntervalStart();
+ return ((AMutableInterval) value).getIntervalStart();
}
public long getIntervalEnd() {
- return interval.getIntervalEnd();
+ return ((AMutableInterval) value).getIntervalEnd();
}
public short getIntervalType() {
- return interval.getIntervalType();
+ return ((AMutableInterval) value).getIntervalType();
+ }
+
+ @Override
+ public void serialize(DataOutput dataOutput, boolean writeTypeTag) throws HyracksDataException {
+ if (writeTypeTag) {
+ try {
+ dataOutput.writeByte(ATypeTag.INTERVAL.serialize());
+ } catch (IOException e) {
+ throw new HyracksDataException(e);
+ }
+ }
+ AIntervalSerializerDeserializer.INSTANCE.serialize(((AMutableInterval) value), dataOutput);
+ }
+
+ @Override
+ public void reset() throws AlgebricksException {
+ ((AMutableInterval) value).setValue(0L, 0L, (byte) 0);
}
}
- public static final class JDate implements IJObject {
-
- private AMutableDate date;
+ public static final class JDate extends JObject {
public JDate(int chrononTimeInDays) {
- date = new AMutableDate(chrononTimeInDays);
+ super(new AMutableDate(chrononTimeInDays));
}
public void setValue(int chrononTimeInDays) {
- date.setValue(chrononTimeInDays);
+ ((AMutableDate) value).setValue(chrononTimeInDays);
}
@Override
- public ATypeTag getTypeTag() {
- return ATypeTag.DATE;
+ public void serialize(DataOutput dataOutput, boolean writeTypeTag) throws HyracksDataException {
+ if (writeTypeTag) {
+ try {
+ dataOutput.writeByte(ATypeTag.DATE.serialize());
+ } catch (IOException e) {
+ throw new HyracksDataException(e);
+ }
+ }
+ ADateSerializerDeserializer.INSTANCE.serialize(((AMutableDate) value), dataOutput);
}
@Override
- public IAObject getIAObject() {
- return date;
+ public void reset() {
+ ((AMutableDate) value).setValue(0);
}
-
- @Override
- public String toString() {
- return date.toString();
- }
-
}
- public static final class JDateTime implements IJObject {
-
- private AMutableDateTime dateTime;
+ public static final class JDateTime extends JObject {
public JDateTime(long chrononTime) {
- dateTime = new AMutableDateTime(chrononTime);
+ super(new AMutableDateTime(chrononTime));
}
public void setValue(long chrononTime) {
- dateTime.setValue(chrononTime);
+ ((AMutableDateTime) value).setValue(chrononTime);
}
@Override
- public ATypeTag getTypeTag() {
- return ATypeTag.DATETIME;
+ public void serialize(DataOutput dataOutput, boolean writeTypeTag) throws HyracksDataException {
+ if (writeTypeTag) {
+ try {
+ dataOutput.writeByte(ATypeTag.DATETIME.serialize());
+ } catch (IOException e) {
+ throw new HyracksDataException(e);
+ }
+ }
+ ADateTimeSerializerDeserializer.INSTANCE.serialize(((AMutableDateTime) value), dataOutput);
}
@Override
- public IAObject getIAObject() {
- return dateTime;
- }
-
- @Override
- public String toString() {
- return dateTime.toString();
+ public void reset() {
+ ((AMutableDateTime) value).setValue(0);
}
}
- public static final class JDuration implements IJObject {
-
- private AMutableDuration duration;
+ public static final class JDuration extends JObject {
public JDuration(int months, long milliseconds) {
- duration = new AMutableDuration(months, milliseconds);
+ super(new AMutableDuration(months, milliseconds));
}
public void setValue(int months, long milliseconds) {
- duration.setValue(months, milliseconds);
+ ((AMutableDuration) value).setValue(months, milliseconds);
}
@Override
- public ATypeTag getTypeTag() {
- return ATypeTag.DURATION;
+ public void serialize(DataOutput dataOutput, boolean writeTypeTag) throws HyracksDataException {
+ if (writeTypeTag) {
+ try {
+ dataOutput.writeByte(ATypeTag.DURATION.serialize());
+ } catch (IOException e) {
+ throw new HyracksDataException(e);
+ }
+ }
+ ADurationSerializerDeserializer.INSTANCE.serialize(((AMutableDuration) value), dataOutput);
}
@Override
- public IAObject getIAObject() {
- return duration;
- }
-
- @Override
- public String toString() {
- return duration.toString();
+ public void reset() {
+ ((AMutableDuration) value).setValue(0, 0);
}
}
- public static final class JPolygon implements IJObject {
+ public static final class JPolygon extends JObject {
- private AMutablePolygon polygon;
- private List<JPoint> points;
-
- public JPolygon(List<JPoint> points) {
- this.points = points;
+ public JPolygon(JPoint[] points) {
+ super(new AMutablePolygon(getAPoints(points)));
}
- public void setValue(List<JPoint> points) {
- this.points = points;
- polygon = null;
+ public void setValue(APoint[] points) {
+ ((AMutablePolygon) value).setValue(points);
+ }
+
+ public void setValue(JPoint[] points) {
+ ((AMutablePolygon) value).setValue(getAPoints(points));
+ }
+
+ private static APoint[] getAPoints(JPoint[] jpoints) {
+ APoint[] apoints = new APoint[jpoints.length];
+ int index = 0;
+ for (JPoint jpoint : jpoints) {
+ apoints[index++] = (APoint) jpoint.getIAObject();
+ }
+ return apoints;
}
@Override
@@ -449,35 +652,32 @@
}
@Override
- public IAObject getIAObject() {
- if (polygon == null) {
- APoint[] pts = new APoint[points.size()];
- int index = 0;
- for (JPoint p : points) {
- pts[index++] = (APoint) p.getIAObject();
+ public void serialize(DataOutput dataOutput, boolean writeTypeTag) throws HyracksDataException {
+ if (writeTypeTag) {
+ try {
+ dataOutput.writeByte(ATypeTag.POLYGON.serialize());
+ } catch (IOException e) {
+ throw new HyracksDataException(e);
}
- polygon = new AMutablePolygon(pts);
}
- return polygon;
+ APolygonSerializerDeserializer.INSTANCE.serialize((AMutablePolygon) value, dataOutput);
}
@Override
- public String toString() {
- return getIAObject().toString();
+ public void reset() {
+ ((AMutablePolygon) value).setValue(null);
}
}
- public static final class JCircle implements IJObject {
-
- private AMutableCircle circle;
+ public static final class JCircle extends JObject {
public JCircle(JPoint center, double radius) {
- circle = new AMutableCircle((APoint) center.getIAObject(), radius);
+ super(new AMutableCircle((APoint) center.getIAObject(), radius));
}
public void setValue(JPoint center, double radius) {
- circle.setValue((APoint) center.getIAObject(), radius);
+ ((AMutableCircle) (value)).setValue((APoint) center.getIAObject(), radius);
}
@Override
@@ -486,56 +686,64 @@
}
@Override
- public IAObject getIAObject() {
- return circle;
+ public void serialize(DataOutput dataOutput, boolean writeTypeTag) throws HyracksDataException {
+ if (writeTypeTag) {
+ try {
+ dataOutput.writeByte(ATypeTag.CIRCLE.serialize());
+ } catch (IOException e) {
+ throw new HyracksDataException(e);
+ }
+ }
+ ACircleSerializerDeserializer.INSTANCE.serialize(((AMutableCircle) (value)), dataOutput);
}
@Override
- public String toString() {
- return circle.toString();
+ public void reset() {
}
-
}
- public static final class JLine implements IJObject {
-
- private AMutableLine line;
+ public static final class JLine extends JObject {
public JLine(JPoint p1, JPoint p2) {
- line = new AMutableLine((APoint) p1.getIAObject(), (APoint) p2.getIAObject());
+ super(new AMutableLine((APoint) p1.getIAObject(), (APoint) p2.getIAObject()));
}
public void setValue(JPoint p1, JPoint p2) {
- line.setValue((APoint) p1.getIAObject(), (APoint) p2.getIAObject());
+ ((AMutableLine) value).setValue((APoint) p1.getIAObject(), (APoint) p2.getIAObject());
+ }
+
+ public void setValue(APoint p1, APoint p2) {
+ ((AMutableLine) value).setValue(p1, p2);
}
@Override
- public ATypeTag getTypeTag() {
- return ATypeTag.LINE;
+ public void serialize(DataOutput dataOutput, boolean writeTypeTag) throws HyracksDataException {
+ if (writeTypeTag) {
+ try {
+ dataOutput.writeByte(ATypeTag.LINE.serialize());
+ } catch (IOException e) {
+ throw new HyracksDataException(e);
+ }
+ }
+ ALineSerializerDeserializer.INSTANCE.serialize(((AMutableLine) value), dataOutput);
}
@Override
- public IAObject getIAObject() {
- return line;
- }
+ public void reset() {
+ // TODO Auto-generated method stub
- @Override
- public String toString() {
- return line.toString();
}
}
- public static final class JPoint3D implements IJObject {
-
- private AMutablePoint3D value;
+ public static final class JPoint3D extends JObject {
public JPoint3D(double x, double y, double z) {
- value = new AMutablePoint3D(x, y, z);
+ super(new AMutablePoint3D(x, y, z));
}
public void setValue(double x, double y, double z) {
- value.setValue(x, y, z);
+ ((AMutablePoint3D) value).setValue(x, y, z);
}
public double getXValue() {
@@ -550,40 +758,75 @@
return ((AMutablePoint3D) value).getZ();
}
- public IAObject getValue() {
- return value;
+ @Override
+ public void serialize(DataOutput dataOutput, boolean writeTypeTag) throws HyracksDataException {
+ if (writeTypeTag) {
+ try {
+ dataOutput.writeByte(ATypeTag.POINT3D.serialize());
+ } catch (IOException e) {
+ throw new HyracksDataException(e);
+ }
+ }
+ APoint3DSerializerDeserializer.INSTANCE.serialize(((AMutablePoint3D) value), dataOutput);
}
@Override
- public String toString() {
- return value.toString();
- }
+ public void reset() {
+ // TODO Auto-generated method stub
- @Override
- public ATypeTag getTypeTag() {
- return ATypeTag.POINT3D;
- }
-
- @Override
- public IAObject getIAObject() {
- return value;
}
}
- public static final class JOrderedList implements IJObject {
+ public static abstract class JList implements IJObject {
+ protected List<IJObject> jObjects;
- private AOrderedListType listType;
- private List<IJObject> jObjects;
+ public JList() {
+ jObjects = new ArrayList<IJObject>();
+ }
- public JOrderedList(IJObject jObject) {
- this.listType = new AOrderedListType(jObject.getIAObject().getType(), null);
- this.jObjects = new ArrayList<IJObject>();
+ public boolean isEmpty() {
+ return jObjects.isEmpty();
}
public void add(IJObject jObject) {
jObjects.add(jObject);
}
+ public void addAll(Collection<IJObject> jObjectCollection) {
+ jObjects.addAll(jObjectCollection);
+ }
+
+ public void clear() {
+ jObjects.clear();
+ }
+
+ public IJObject getElement(int index) {
+ return jObjects.get(index);
+ }
+
+ public int size() {
+ return jObjects.size();
+ }
+
+ public Iterator<IJObject> iterator() {
+ return jObjects.iterator();
+ }
+ }
+
+ public static final class JOrderedList extends JList {
+
+ private AOrderedListType listType;
+
+ public JOrderedList(IJObject jObject) {
+ super();
+ this.listType = new AOrderedListType(jObject.getIAObject().getType(), null);
+ }
+
+ public JOrderedList(IAType listItemType) {
+ super();
+ this.listType = new AOrderedListType(listItemType, null);
+ }
+
@Override
public ATypeTag getTypeTag() {
return ATypeTag.ORDEREDLIST;
@@ -602,34 +845,43 @@
return listType;
}
- public void addAll(Collection<IJObject> jObjectCollection) {
- jObjects.addAll(jObjectCollection);
+ @Override
+ public void serialize(DataOutput dataOutput, boolean writeTypeTag) throws HyracksDataException {
+ IAsterixListBuilder listBuilder = new UnorderedListBuilder();
+ listBuilder.reset(listType);
+ ArrayBackedValueStorage fieldValue = new ArrayBackedValueStorage();
+ for (IJObject jObject : jObjects) {
+ fieldValue.reset();
+ jObject.serialize(fieldValue.getDataOutput(), true);
+ listBuilder.addItem(fieldValue);
+ }
+ listBuilder.write(dataOutput, writeTypeTag);
+
}
- public void clear() {
- jObjects.clear();
- }
+ @Override
+ public void reset() {
+ // TODO Auto-generated method stub
- public IJObject getElement(int index) {
- return jObjects.get(index);
- }
-
- public int size() {
- return jObjects.size();
}
}
- public static final class JUnorderedList implements IJObject {
+ public static final class JUnorderedList extends JList {
private AUnorderedListType listType;
- private List<IJObject> jObjects;
public JUnorderedList(IJObject jObject) {
this.listType = new AUnorderedListType(jObject.getIAObject().getType(), null);
this.jObjects = new ArrayList<IJObject>();
}
+ public JUnorderedList(IAType listItemType) {
+ super();
+ this.listType = new AUnorderedListType(listItemType, null);
+ this.jObjects = new ArrayList<IJObject>();
+ }
+
public void add(IJObject jObject) {
jObjects.add(jObject);
}
@@ -652,60 +904,43 @@
return listType;
}
- public boolean isEmpty() {
- return jObjects.isEmpty();
+ @Override
+ public void serialize(DataOutput dataOutput, boolean writeTypeTag) throws HyracksDataException {
+ IAsterixListBuilder listBuilder = new UnorderedListBuilder();
+ listBuilder.reset(listType);
+ ArrayBackedValueStorage fieldValue = new ArrayBackedValueStorage();
+ for (IJObject jObject : jObjects) {
+ fieldValue.reset();
+ jObject.serialize(fieldValue.getDataOutput(), true);
+ listBuilder.addItem(fieldValue);
+ }
+ listBuilder.write(dataOutput, writeTypeTag);
}
- public void addAll(Collection<IJObject> jObjectCollection) {
- jObjects.addAll(jObjectCollection);
- }
-
- public void clear() {
+ @Override
+ public void reset() {
jObjects.clear();
}
- public IJObject getElement(int index) {
- return jObjects.get(index);
- }
-
- public int size() {
- return jObjects.size();
- }
-
}
public static final class JRecord implements IJObject {
private AMutableRecord value;
private ARecordType recordType;
- private List<IJObject> fields;
- private List<String> fieldNames;
- private List<IAType> fieldTypes;
- private int numFieldsAdded = 0;
- private List<Boolean> openField;
-
- public JRecord(ARecordType recordType) {
- this.recordType = recordType;
- this.fields = new ArrayList<IJObject>();
- initFieldInfo();
- }
+ private IJObject[] fields;
+ private Map<String, IJObject> openFields;
public JRecord(ARecordType recordType, IJObject[] fields) {
this.recordType = recordType;
- this.fields = new ArrayList<IJObject>();
- for (IJObject jObject : fields) {
- this.fields.add(jObject);
- }
- initFieldInfo();
+ this.fields = fields;
+ this.openFields = new LinkedHashMap<String, IJObject>();
}
- public JRecord(String[] fieldNames, IJObject[] fields) throws AsterixException {
- this.recordType = getARecordType(fieldNames, fields);
- this.fields = new ArrayList<IJObject>();
- for (IJObject jObject : fields) {
- this.fields.add(jObject);
- }
- initFieldInfo();
+ public JRecord(ARecordType recordType, IJObject[] fields, LinkedHashMap<String, IJObject> openFields) {
+ this.recordType = recordType;
+ this.fields = fields;
+ this.openFields = openFields;
}
private ARecordType getARecordType(String[] fieldNames, IJObject[] fields) throws AsterixException {
@@ -723,74 +958,34 @@
return recordType;
}
- private void initFieldInfo() {
- this.openField = new ArrayList<Boolean>();
- fieldNames = new ArrayList<String>();
- for (String name : recordType.getFieldNames()) {
- fieldNames.add(name);
- openField.add(false);
- }
- fieldTypes = new ArrayList<IAType>();
- for (IAType type : recordType.getFieldTypes()) {
- fieldTypes.add(type);
- }
-
- }
-
- private IAObject[] getIAObjectArray(List<IJObject> fields) {
- IAObject[] retValue = new IAObject[fields.size()];
- int index = 0;
- for (IJObject jObject : fields) {
- retValue[index++] = getIAObject(jObject);
- }
- return retValue;
- }
-
- private IAObject getIAObject(IJObject jObject) {
- IAObject retVal = null;
- switch (jObject.getTypeTag()) {
- case RECORD:
- ARecordType recType = ((JRecord) jObject).getRecordType();
- IAObject[] fields = new IAObject[((JRecord) jObject).getFields().size()];
- int index = 0;
- for (IJObject field : ((JRecord) jObject).getFields()) {
- fields[index++] = getIAObject(field);
- }
- retVal = new AMutableRecord(recType, fields);
- default:
- retVal = jObject.getIAObject();
- break;
- }
- return retVal;
- }
-
- public void addField(String fieldName, IJObject fieldValue) {
+ public void addField(String fieldName, IJObject fieldValue) throws AsterixException {
int pos = getFieldPosByName(fieldName);
if (pos >= 0) {
- throw new IllegalArgumentException("field already defined");
+ throw new AsterixException("field already defined in closed part");
}
- numFieldsAdded++;
- fields.add(fieldValue);
- fieldNames.add(fieldName);
- fieldTypes.add(fieldValue.getIAObject().getType());
- openField.add(true);
+ if (openFields.get(fieldName) != null) {
+ throw new AsterixException("field already defined in open part");
+ }
+ openFields.put(fieldName, fieldValue);
}
public IJObject getValueByName(String fieldName) throws AsterixException, IOException {
+ // check closed part
int fieldPos = getFieldPosByName(fieldName);
- if (fieldPos < 0) {
- throw new AsterixException("unknown field: " + fieldName);
+ if (fieldPos >= 0) {
+ return fields[fieldPos];
+ } else {
+ // check open part
+ IJObject fieldValue = openFields.get(fieldName);
+ if (fieldValue == null) {
+ throw new AsterixException("unknown field: " + fieldName);
+ }
+ return fieldValue;
}
- return fields.get(fieldPos);
}
- public void setValueAtPos(int pos, IJObject jtype) {
- fields.set(pos, jtype);
- }
-
- public void setValue(AMutableRecord mutableRecord) {
- this.value = mutableRecord;
- this.recordType = mutableRecord.getType();
+ public void setValueAtPos(int pos, IJObject jObject) {
+ fields[pos] = jObject;
}
@Override
@@ -798,16 +993,22 @@
return recordType.getTypeTag();
}
- public void setField(String fieldName, IJObject fieldValue) {
+ public void setField(String fieldName, IJObject fieldValue) throws AsterixException {
int pos = getFieldPosByName(fieldName);
- fields.set(pos, fieldValue);
- if (value != null) {
- value.setValueAtPos(pos, fieldValue.getIAObject());
+ if (pos >= 0) {
+ fields[pos] = fieldValue;
+ } else {
+ if (openFields.get(fieldName) != null) {
+ openFields.put(fieldName, fieldValue);
+ } else {
+ throw new AsterixException("unknown field: " + fieldName);
+ }
}
}
private int getFieldPosByName(String fieldName) {
int index = 0;
+ String[] fieldNames = recordType.getFieldNames();
for (String name : fieldNames) {
if (name.equals(fieldName)) {
return index;
@@ -821,40 +1022,75 @@
return recordType;
}
- public List<IJObject> getFields() {
+ public IJObject[] getFields() {
return fields;
}
+ public RecordBuilder getRecordBuilder() {
+ RecordBuilder recordBuilder = new RecordBuilder();
+ recordBuilder.reset(recordType);
+ return recordBuilder;
+ }
+
+ public void serialize(DataOutput output, boolean writeTypeTag) throws HyracksDataException {
+ RecordBuilder recordBuilder = new RecordBuilder();
+ recordBuilder.reset(recordType);
+ int index = 0;
+ ArrayBackedValueStorage fieldValue = new ArrayBackedValueStorage();
+ for (IJObject jObject : fields) {
+ fieldValue.reset();
+ jObject.serialize(fieldValue.getDataOutput(), writeTypeTag);
+ recordBuilder.addField(index, fieldValue);
+ index++;
+ }
+
+ try {
+ if (openFields != null && !openFields.isEmpty()) {
+ ArrayBackedValueStorage openFieldName = new ArrayBackedValueStorage();
+ ArrayBackedValueStorage openFieldValue = new ArrayBackedValueStorage();
+ AMutableString nameValue = new AMutableString(""); // get from the pool
+ for (Entry<String, IJObject> entry : openFields.entrySet()) {
+ openFieldName.reset();
+ openFieldValue.reset();
+ nameValue.setValue(entry.getKey());
+ openFieldName.getDataOutput().write(ATypeTag.STRING.serialize());
+ AStringSerializerDeserializer.INSTANCE.serialize(nameValue, openFieldName.getDataOutput());
+ entry.getValue().serialize(openFieldValue.getDataOutput(), true);
+ recordBuilder.addField(openFieldName, openFieldValue);
+ }
+ }
+ } catch (IOException | AsterixException ae) {
+ throw new HyracksDataException(ae);
+ }
+ try {
+ recordBuilder.write(output, writeTypeTag);
+ } catch (IOException | AsterixException e) {
+ throw new HyracksDataException(e);
+ }
+ }
+
@Override
public IAObject getIAObject() {
- if (value == null || numFieldsAdded > 0) {
- value = new AMutableRecord(recordType, getIAObjectArray(fields));
- }
return value;
}
- public void close() {
- if (numFieldsAdded > 0) {
- int totalFields = fieldNames.size();
- for (int i = 0; i < numFieldsAdded; i++) {
- fieldNames.remove(totalFields - 1 - i);
- fieldTypes.remove(totalFields - 1 - i);
- fields.remove(totalFields - 1 - i);
+ public void reset() throws AlgebricksException {
+ if (openFields != null && !openFields.isEmpty()) {
+ openFields.clear();
+ }
+ if (fields != null) {
+ for (IJObject field : fields) {
+ if (field != null) {
+ field.reset();
+ }
}
- numFieldsAdded = 0;
}
}
- public List<Boolean> getOpenField() {
- return openField;
- }
-
- public List<String> getFieldNames() {
- return fieldNames;
- }
-
- public List<IAType> getFieldTypes() {
- return fieldTypes;
+ public void reset(IJObject[] fields, LinkedHashMap<String, IJObject> openFields) throws AlgebricksException {
+ this.reset();
+ this.fields = fields;
+ this.openFields = openFields;
}
}
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/util/Datatypes.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/util/Datatypes.java
new file mode 100644
index 0000000..e126bf6
--- /dev/null
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/util/Datatypes.java
@@ -0,0 +1,21 @@
+package edu.uci.ics.asterix.external.util;
+
+public class Datatypes {
+
+ public static final class Tweet {
+ public static final String ID = "id";
+ public static final String USER = "user";
+ public static final String MESSAGE = "message_text";
+ public static final String LATITUDE = "latitude";
+ public static final String LONGITUDE = "longitude";
+ public static final String CREATED_AT = "created_at";
+ public static final String SCREEN_NAME = "screen_name";
+ public static final String COUNTRY = "country";
+ }
+
+ public static final class ProcessedTweet {
+ public static final String USER_NAME = "user_name";
+ public static final String LOCATION = "location";
+ public static final String TOPICS = "topics";
+ }
+}
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/util/TweetProcessor.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/util/TweetProcessor.java
new file mode 100644
index 0000000..092d715
--- /dev/null
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/util/TweetProcessor.java
@@ -0,0 +1,72 @@
+package edu.uci.ics.asterix.external.util;
+
+import twitter4j.Status;
+import twitter4j.User;
+import edu.uci.ics.asterix.om.base.AMutableDouble;
+import edu.uci.ics.asterix.om.base.AMutableInt32;
+import edu.uci.ics.asterix.om.base.AMutableRecord;
+import edu.uci.ics.asterix.om.base.AMutableString;
+import edu.uci.ics.asterix.om.base.IAObject;
+import edu.uci.ics.asterix.om.types.ARecordType;
+
+public class TweetProcessor {
+
+ private IAObject[] mutableTweetFields;
+ private IAObject[] mutableUserFields;
+ private AMutableRecord mutableRecord;
+ private AMutableRecord mutableUser;
+
+ public TweetProcessor(ARecordType recordType) {
+ mutableUserFields = new IAObject[] { new AMutableString(null), new AMutableString(null), new AMutableInt32(0),
+ new AMutableInt32(0), new AMutableString(null), new AMutableInt32(0) };
+ mutableUser = new AMutableRecord((ARecordType) recordType.getFieldTypes()[1], mutableUserFields);
+
+ mutableTweetFields = new IAObject[] { new AMutableString(null), mutableUser, new AMutableDouble(0),
+ new AMutableDouble(0), new AMutableString(null), new AMutableString(null) };
+ mutableRecord = new AMutableRecord(recordType, mutableTweetFields);
+
+ }
+
+ public AMutableRecord processNextTweet(Status tweet) {
+ User user = tweet.getUser();
+ ((AMutableString) mutableUserFields[0]).setValue(getNormalizedString(user.getScreenName()));
+ ((AMutableString) mutableUserFields[1]).setValue(getNormalizedString(user.getLang()));
+ ((AMutableInt32) mutableUserFields[2]).setValue(user.getFriendsCount());
+ ((AMutableInt32) mutableUserFields[3]).setValue(user.getStatusesCount());
+ ((AMutableString) mutableUserFields[4]).setValue(getNormalizedString(user.getName()));
+ ((AMutableInt32) mutableUserFields[5]).setValue(user.getFollowersCount());
+
+ ((AMutableString) mutableTweetFields[0]).setValue(tweet.getId() + "");
+
+ for (int i = 0; i < 6; i++) {
+ ((AMutableRecord) mutableTweetFields[1]).setValueAtPos(i, mutableUserFields[i]);
+ }
+ if (tweet.getGeoLocation() != null) {
+ ((AMutableDouble) mutableTweetFields[2]).setValue(tweet.getGeoLocation().getLatitude());
+ ((AMutableDouble) mutableTweetFields[3]).setValue(tweet.getGeoLocation().getLongitude());
+ } else {
+ ((AMutableDouble) mutableTweetFields[2]).setValue(0);
+ ((AMutableDouble) mutableTweetFields[3]).setValue(0);
+ }
+ ((AMutableString) mutableTweetFields[4]).setValue(getNormalizedString(tweet.getCreatedAt().toString()));
+ ((AMutableString) mutableTweetFields[5]).setValue(getNormalizedString(tweet.getText()));
+
+ for (int i = 0; i < 6; i++) {
+ mutableRecord.setValueAtPos(i, mutableTweetFields[i]);
+ }
+
+ return mutableRecord;
+
+ }
+
+ public static String getNormalizedString(String originalString) {
+ String asciiText = originalString.replaceAll("[^\\x00-\\x7F]", "").replaceAll("\n", " ");
+ return asciiText.trim();
+
+ }
+
+ public AMutableRecord getMutableRecord() {
+ return mutableRecord;
+ }
+
+}
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/util/TwitterUtil.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/util/TwitterUtil.java
new file mode 100644
index 0000000..bd1d75c
--- /dev/null
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/util/TwitterUtil.java
@@ -0,0 +1,143 @@
+package edu.uci.ics.asterix.external.util;
+
+import java.io.InputStream;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+
+import twitter4j.FilterQuery;
+import twitter4j.Twitter;
+import twitter4j.TwitterFactory;
+import twitter4j.TwitterStream;
+import twitter4j.TwitterStreamFactory;
+import twitter4j.conf.ConfigurationBuilder;
+import edu.uci.ics.asterix.common.exceptions.AsterixException;
+
+public class TwitterUtil {
+
+ public static class ConfigurationConstants {
+ public static final String KEY_LOCATION = "location";
+ public static final String LOCATION_US = "US";
+ }
+
+ public static class GeoConstants {
+ public static Map<String, double[][]> boundingBoxes = initializeBoundingBoxes();
+ public static final double[][] US = new double[][] { { -124.848974, 24.396308 }, { -66.885444, 49.384358 } };
+ }
+
+ private static Map<String, double[][]> initializeBoundingBoxes() {
+ Map<String, double[][]> boundingBoxes = new HashMap<String, double[][]>();
+ boundingBoxes.put(ConfigurationConstants.LOCATION_US, new double[][] { { -124.848974, 24.396308 },
+ { -66.885444, 49.384358 } });
+ return boundingBoxes;
+ }
+
+ public static FilterQuery getFilterQuery(Map<String, String> configuration) throws AsterixException {
+ String locationValue = configuration.get(ConfigurationConstants.KEY_LOCATION);
+ double[][] locations = null;
+ if (locationValue != null) {
+ if (locationValue.contains(",")) {
+ String[] coordinatesString = locationValue.trim().split(",");
+ locations = new double[2][2];
+ for (int i = 0; i < 2; i++) {
+ for (int j = 0; j < 2; j++) {
+ try {
+ locations[i][j] = Double.parseDouble(coordinatesString[2 * i + j]);
+ } catch (NumberFormatException ne) {
+ throw new AsterixException("Incorrect coordinate value " + coordinatesString[2 * i + j]);
+ }
+ }
+ }
+ } else {
+ locations = GeoConstants.boundingBoxes.get(locationValue);
+ }
+ if (locations != null) {
+ FilterQuery filterQuery = new FilterQuery();
+ filterQuery.locations(locations);
+ return filterQuery;
+ }
+ }
+ return null;
+
+ }
+
+ public static Twitter getTwitterService(Map<String, String> configuration) {
+ ConfigurationBuilder cb = getAuthConfiguration(configuration);
+ TwitterFactory tf = new TwitterFactory(cb.build());
+ Twitter twitter = tf.getInstance();
+ return twitter;
+ }
+
+ public static TwitterStream getTwitterStream(Map<String, String> configuration) {
+ ConfigurationBuilder cb = getAuthConfiguration(configuration);
+ TwitterStreamFactory factory = new TwitterStreamFactory(cb.build());
+ return factory.getInstance();
+ }
+
+ private static ConfigurationBuilder getAuthConfiguration(Map<String, String> configuration) {
+ ConfigurationBuilder cb = new ConfigurationBuilder();
+ cb.setDebugEnabled(true);
+ String oAuthConsumerKey = configuration.get(AuthenticationConstants.OAUTH_CONSUMER_KEY);
+ String oAuthConsumerSecret = configuration.get(AuthenticationConstants.OAUTH_CONSUMER_SECRET);
+ String oAuthAccessToken = configuration.get(AuthenticationConstants.OAUTH_ACCESS_TOKEN);
+ String oAuthAccessTokenSecret = configuration.get(AuthenticationConstants.OAUTH_ACCESS_TOKEN_SECRET);
+
+ cb.setOAuthConsumerKey(oAuthConsumerKey);
+ cb.setOAuthConsumerSecret(oAuthConsumerSecret);
+ cb.setOAuthAccessToken(oAuthAccessToken);
+ cb.setOAuthAccessTokenSecret(oAuthAccessTokenSecret);
+ return cb;
+ }
+
+ public static void initializeConfigurationWithAuthInfo(Map<String, String> configuration) throws AsterixException {
+ String authMode = configuration.get(AuthenticationConstants.AUTHENTICATION_MODE);
+ if (authMode == null) {
+ authMode = AuthenticationConstants.AUTHENTICATION_MODE_FILE;
+ }
+ try {
+ switch (authMode) {
+ case AuthenticationConstants.AUTHENTICATION_MODE_FILE:
+ Properties prop = new Properties();
+ String authFile = configuration.get(AuthenticationConstants.OAUTH_AUTHENTICATION_FILE);
+ if (authFile == null) {
+ authFile = AuthenticationConstants.DEFAULT_AUTH_FILE;
+ }
+ InputStream in = TwitterUtil.class.getResourceAsStream(authFile);
+ prop.load(in);
+ in.close();
+ configuration.put(AuthenticationConstants.OAUTH_CONSUMER_KEY,
+ prop.getProperty(AuthenticationConstants.OAUTH_CONSUMER_KEY));
+ configuration.put(AuthenticationConstants.OAUTH_CONSUMER_SECRET,
+ prop.getProperty(AuthenticationConstants.OAUTH_CONSUMER_SECRET));
+ configuration.put(AuthenticationConstants.OAUTH_ACCESS_TOKEN,
+ prop.getProperty(AuthenticationConstants.OAUTH_ACCESS_TOKEN));
+ configuration.put(AuthenticationConstants.OAUTH_ACCESS_TOKEN_SECRET,
+ prop.getProperty(AuthenticationConstants.OAUTH_ACCESS_TOKEN_SECRET));
+ break;
+ case AuthenticationConstants.AUTHENTICATION_MODE_EXPLICIT:
+ break;
+ }
+ } catch (Exception e) {
+ throw new AsterixException("Incorrect configuration! unable to load authentication credentials "
+ + e.getMessage());
+ }
+ }
+
+ public static final class AuthenticationConstants {
+ public static final String OAUTH_CONSUMER_KEY = "consumer.key";
+ public static final String OAUTH_CONSUMER_SECRET = "consumer.secret";
+ public static final String OAUTH_ACCESS_TOKEN = "access.token";
+ public static final String OAUTH_ACCESS_TOKEN_SECRET = "access.token.secret";
+ public static final String OAUTH_AUTHENTICATION_FILE = "authentication.file";
+ public static final String AUTHENTICATION_MODE = "authentication.mode";
+ public static final String AUTHENTICATION_MODE_FILE = "file";
+ public static final String AUTHENTICATION_MODE_EXPLICIT = "explicit";
+ public static final String DEFAULT_AUTH_FILE = "/feed/twitter/auth.properties"; // default authentication file
+ }
+
+ public static final class SearchAPIConstants {
+ public static final String QUERY = "query";
+ public static final String INTERVAL = "interval";
+ }
+
+}
diff --git a/asterix-external-data/src/test/java/edu/uci/ics/asterix/external/library/AddHashTagsFactory.java b/asterix-external-data/src/test/java/edu/uci/ics/asterix/external/library/AddHashTagsFactory.java
new file mode 100644
index 0000000..90fddcd
--- /dev/null
+++ b/asterix-external-data/src/test/java/edu/uci/ics/asterix/external/library/AddHashTagsFactory.java
@@ -0,0 +1,27 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.external.library;
+
+import edu.uci.ics.asterix.external.library.IExternalScalarFunction;
+import edu.uci.ics.asterix.external.library.IFunctionFactory;
+
+public class AddHashTagsFactory implements IFunctionFactory {
+
+ @Override
+ public IExternalScalarFunction getExternalFunction() {
+ return new AddHashTagsFunction();
+ }
+
+}
diff --git a/asterix-external-data/src/test/java/edu/uci/ics/asterix/external/library/AddHashTagsFunction.java b/asterix-external-data/src/test/java/edu/uci/ics/asterix/external/library/AddHashTagsFunction.java
new file mode 100644
index 0000000..93a87f5
--- /dev/null
+++ b/asterix-external-data/src/test/java/edu/uci/ics/asterix/external/library/AddHashTagsFunction.java
@@ -0,0 +1,76 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.external.library;
+
+import edu.uci.ics.asterix.external.library.java.JObjects.JDouble;
+import edu.uci.ics.asterix.external.library.java.JObjects.JPoint;
+import edu.uci.ics.asterix.external.library.java.JObjects.JRecord;
+import edu.uci.ics.asterix.external.library.java.JObjects.JString;
+import edu.uci.ics.asterix.external.library.java.JObjects.JUnorderedList;
+import edu.uci.ics.asterix.external.library.java.JTypeTag;
+import edu.uci.ics.asterix.external.util.Datatypes;
+
+public class AddHashTagsFunction implements IExternalScalarFunction {
+
+ private JUnorderedList list = null;
+ private JPoint location = null;
+
+ @Override
+ public void initialize(IFunctionHelper functionHelper) {
+ list = new JUnorderedList(functionHelper.getObject(JTypeTag.STRING));
+ location = new JPoint(0, 0);
+ }
+
+ @Override
+ public void deinitialize() {
+ }
+
+ @Override
+ public void evaluate(IFunctionHelper functionHelper) throws Exception {
+ list.clear();
+ JRecord inputRecord = (JRecord) functionHelper.getArgument(0);
+ JString text = (JString) inputRecord.getValueByName(Datatypes.Tweet.MESSAGE);
+ JDouble latitude = (JDouble) inputRecord.getValueByName(Datatypes.Tweet.LATITUDE);
+ JDouble longitude = (JDouble) inputRecord.getValueByName(Datatypes.Tweet.LONGITUDE);
+
+ if (latitude != null && longitude != null) {
+ location.setValue(latitude.getValue(), longitude.getValue());
+ }
+ String[] tokens = text.getValue().split(" ");
+ for (String tk : tokens) {
+ if (tk.startsWith("#")) {
+ JString newField = (JString) functionHelper.getObject(JTypeTag.STRING);
+ newField.setValue(tk);
+ list.add(newField);
+ }
+ }
+
+ JRecord outputRecord = (JRecord) functionHelper.getResultObject();
+ outputRecord.setField(Datatypes.Tweet.ID, inputRecord.getValueByName(Datatypes.Tweet.ID));
+
+ JRecord userRecord = (JRecord) inputRecord.getValueByName(Datatypes.Tweet.USER);
+ outputRecord.setField(Datatypes.ProcessedTweet.USER_NAME,
+ userRecord.getValueByName(Datatypes.Tweet.SCREEN_NAME));
+
+ outputRecord.setField(Datatypes.ProcessedTweet.LOCATION, location);
+ outputRecord.setField(Datatypes.Tweet.CREATED_AT, inputRecord.getValueByName(Datatypes.Tweet.CREATED_AT));
+ outputRecord.setField(Datatypes.Tweet.MESSAGE, text);
+ outputRecord.setField(Datatypes.ProcessedTweet.TOPICS, list);
+
+ inputRecord.addField(Datatypes.ProcessedTweet.TOPICS, list);
+ functionHelper.setResult(outputRecord);
+ }
+
+}
diff --git a/asterix-external-data/src/test/java/edu/uci/ics/asterix/external/library/AddHashTagsInPlaceFactory.java b/asterix-external-data/src/test/java/edu/uci/ics/asterix/external/library/AddHashTagsInPlaceFactory.java
new file mode 100644
index 0000000..a2bec69
--- /dev/null
+++ b/asterix-external-data/src/test/java/edu/uci/ics/asterix/external/library/AddHashTagsInPlaceFactory.java
@@ -0,0 +1,25 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.external.library;
+
+
+public class AddHashTagsInPlaceFactory implements IFunctionFactory {
+
+ @Override
+ public IExternalScalarFunction getExternalFunction() {
+ return new AddHashTagsInPlaceFunction();
+ }
+
+}
diff --git a/asterix-external-data/src/test/java/edu/uci/ics/asterix/external/library/AddHashTagsInPlaceFunction.java b/asterix-external-data/src/test/java/edu/uci/ics/asterix/external/library/AddHashTagsInPlaceFunction.java
new file mode 100644
index 0000000..a3f6f702
--- /dev/null
+++ b/asterix-external-data/src/test/java/edu/uci/ics/asterix/external/library/AddHashTagsInPlaceFunction.java
@@ -0,0 +1,54 @@
+/* 1
+ * Copyright 2009-2013 by The Regents of the University of California 2
+ * Licensed under the Apache License, Version 2.0 (the "License"); 3
+ * you may not use this file except in compliance with the License. 4
+ * you may obtain a copy of the License from 5
+ * 6
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.external.library;
+
+import edu.uci.ics.asterix.external.library.java.JObjects.JRecord;
+import edu.uci.ics.asterix.external.library.java.JObjects.JString;
+import edu.uci.ics.asterix.external.library.java.JObjects.JUnorderedList;
+import edu.uci.ics.asterix.external.library.java.JTypeTag;
+import edu.uci.ics.asterix.external.util.Datatypes;
+
+public class AddHashTagsInPlaceFunction implements IExternalScalarFunction {
+
+ private JUnorderedList list = null;
+
+ @Override
+ public void initialize(IFunctionHelper functionHelper) {
+ list = new JUnorderedList(functionHelper.getObject(JTypeTag.STRING));
+ }
+
+ @Override
+ public void deinitialize() {
+ }
+
+ @Override
+ public void evaluate(IFunctionHelper functionHelper) throws Exception {
+ list.clear();
+ JRecord inputRecord = (JRecord) functionHelper.getArgument(0);
+ JString text = (JString) inputRecord.getValueByName(Datatypes.Tweet.MESSAGE);
+
+ String[] tokens = text.getValue().split(" ");
+ for (String tk : tokens) {
+ if (tk.startsWith("#")) {
+ JString newField = (JString) functionHelper.getObject(JTypeTag.STRING);
+ newField.setValue(tk);
+ list.add(newField);
+ }
+ }
+ inputRecord.addField(Datatypes.ProcessedTweet.TOPICS, list);
+ functionHelper.setResult(inputRecord);
+ }
+
+}
\ No newline at end of file
diff --git a/asterix-external-data/src/test/java/edu/uci/ics/asterix/external/library/UpperCaseFunction.java b/asterix-external-data/src/test/java/edu/uci/ics/asterix/external/library/UpperCaseFunction.java
index 58995c2..ec04541 100644
--- a/asterix-external-data/src/test/java/edu/uci/ics/asterix/external/library/UpperCaseFunction.java
+++ b/asterix-external-data/src/test/java/edu/uci/ics/asterix/external/library/UpperCaseFunction.java
@@ -53,9 +53,6 @@
JRecord result = (JRecord) functionHelper.getResultObject();
result.setField("id", id);
result.setField("text", text);
- JString newField = (JString) functionHelper.getObject(JTypeTag.STRING);
- newField.setValue(text.getValue().substring(random.nextInt(text.getValue().length())));
- result.addField("substring", newField);
functionHelper.setResult(result);
}
}
diff --git a/asterix-external-data/src/test/java/edu/uci/ics/asterix/external/library/adapter/TestTypedAdapter.java b/asterix-external-data/src/test/java/edu/uci/ics/asterix/external/library/adapter/TestTypedAdapter.java
index 07f1a40..7a2597e 100644
--- a/asterix-external-data/src/test/java/edu/uci/ics/asterix/external/library/adapter/TestTypedAdapter.java
+++ b/asterix-external-data/src/test/java/edu/uci/ics/asterix/external/library/adapter/TestTypedAdapter.java
@@ -23,8 +23,9 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import edu.uci.ics.asterix.common.feeds.api.IFeedAdapter;
+import edu.uci.ics.asterix.common.feeds.api.IFeedAdapter.DataExchangeMode;
import edu.uci.ics.asterix.external.dataset.adapter.StreamBasedAdapter;
-import edu.uci.ics.asterix.metadata.feeds.IFeedAdapter;
import edu.uci.ics.asterix.om.types.ARecordType;
import edu.uci.ics.hyracks.api.comm.IFrameWriter;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
@@ -43,8 +44,8 @@
private DummyGenerator generator;
public TestTypedAdapter(ITupleParserFactory parserFactory, ARecordType sourceDatatype, IHyracksTaskContext ctx,
- Map<String, String> configuration) throws IOException {
- super(parserFactory, sourceDatatype, ctx);
+ Map<String, String> configuration, int partition) throws IOException {
+ super(parserFactory, sourceDatatype, ctx, partition);
pos = new PipedOutputStream();
pis = new PipedInputStream(pos);
this.configuration = configuration;
@@ -131,4 +132,9 @@
generator.stop();
}
+ @Override
+ public boolean handleException(Exception e) {
+ return false;
+ }
+
}
diff --git a/asterix-external-data/src/test/java/edu/uci/ics/asterix/external/library/adapter/TestTypedAdapterFactory.java b/asterix-external-data/src/test/java/edu/uci/ics/asterix/external/library/adapter/TestTypedAdapterFactory.java
index b042e9c..5416ce2 100644
--- a/asterix-external-data/src/test/java/edu/uci/ics/asterix/external/library/adapter/TestTypedAdapterFactory.java
+++ b/asterix-external-data/src/test/java/edu/uci/ics/asterix/external/library/adapter/TestTypedAdapterFactory.java
@@ -17,19 +17,23 @@
import java.util.Map;
import edu.uci.ics.asterix.common.exceptions.AsterixException;
-import edu.uci.ics.asterix.metadata.feeds.IDatasourceAdapter;
-import edu.uci.ics.asterix.metadata.feeds.ITypedAdapterFactory;
+import edu.uci.ics.asterix.common.feeds.api.IDatasourceAdapter;
+import edu.uci.ics.asterix.common.feeds.api.IIntakeProgressTracker;
+import edu.uci.ics.asterix.metadata.entities.DatasourceAdapter.AdapterType;
+import edu.uci.ics.asterix.metadata.external.IAdapterFactory.SupportedOperation;
+import edu.uci.ics.asterix.metadata.feeds.IFeedAdapterFactory;
import edu.uci.ics.asterix.om.types.ARecordType;
import edu.uci.ics.asterix.om.types.BuiltinType;
import edu.uci.ics.asterix.om.types.IAType;
-import edu.uci.ics.asterix.runtime.operators.file.AdmSchemafullRecordParserFactory;
+import edu.uci.ics.asterix.runtime.operators.file.AsterixTupleParserFactory;
+import edu.uci.ics.asterix.runtime.operators.file.AsterixTupleParserFactory.InputDataFormat;
import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksCountPartitionConstraint;
import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.dataflow.std.file.ITupleParserFactory;
-public class TestTypedAdapterFactory implements ITypedAdapterFactory {
+public class TestTypedAdapterFactory implements IFeedAdapterFactory {
/**
*
@@ -38,7 +42,7 @@
public static final String NAME = "test_typed_adapter";
- private static ARecordType adapterOutputType = initOutputType();
+ private ARecordType outputType;
public static final String KEY_NUM_OUTPUT_RECORDS = "num_output_records";
@@ -50,7 +54,7 @@
}
private static ARecordType initOutputType() {
- String[] fieldNames = new String[] { "tweetid", "message-text" };
+ String[] fieldNames = new String[] { "id", "message-text" };
IAType[] fieldTypes = new IAType[] { BuiltinType.AINT64, BuiltinType.ASTRING };
ARecordType outputType = null;
try {
@@ -67,29 +71,36 @@
}
@Override
- public AdapterType getAdapterType() {
- return AdapterType.TYPED;
- }
-
- @Override
public AlgebricksPartitionConstraint getPartitionConstraint() throws Exception {
return new AlgebricksCountPartitionConstraint(1);
}
@Override
public IDatasourceAdapter createAdapter(IHyracksTaskContext ctx, int partition) throws Exception {
- ITupleParserFactory tupleParserFactory = new AdmSchemafullRecordParserFactory(adapterOutputType);
- return new TestTypedAdapter(tupleParserFactory, adapterOutputType, ctx, configuration);
+ ITupleParserFactory tupleParserFactory = new AsterixTupleParserFactory(configuration, outputType,
+ InputDataFormat.ADM);
+ return new TestTypedAdapter(tupleParserFactory, outputType, ctx, configuration, partition);
}
@Override
public ARecordType getAdapterOutputType() {
- return adapterOutputType;
+ return outputType;
}
@Override
- public void configure(Map<String, String> configuration) throws Exception {
+ public void configure(Map<String, String> configuration, ARecordType outputType) throws Exception {
this.configuration = configuration;
+ this.outputType = outputType;
+ }
+
+ @Override
+ public boolean isRecordTrackingEnabled() {
+ return false;
+ }
+
+ @Override
+ public IIntakeProgressTracker createIntakeProgressTracker() {
+ return null;
}
}
diff --git a/asterix-external-data/src/test/resources/text_functions.xml b/asterix-external-data/src/test/resources/library_descriptor.xml
similarity index 75%
rename from asterix-external-data/src/test/resources/text_functions.xml
rename to asterix-external-data/src/test/resources/library_descriptor.xml
index 8c7a92c..e35288f 100644
--- a/asterix-external-data/src/test/resources/text_functions.xml
+++ b/asterix-external-data/src/test/resources/library_descriptor.xml
@@ -11,6 +11,22 @@
</libraryFunction>
<libraryFunction>
<function_type>SCALAR</function_type>
+ <name>addHashTags</name>
+ <arguments>Tweet</arguments>
+ <return_type>ProcessedTweet</return_type>
+ <definition>edu.uci.ics.asterix.external.library.AddHashTagsFactory
+ </definition>
+ </libraryFunction>
+ <libraryFunction>
+ <function_type>SCALAR</function_type>
+ <name>addHashTagsInPlace</name>
+ <arguments>Tweet</arguments>
+ <return_type>ProcessedTweet</return_type>
+ <definition>edu.uci.ics.asterix.external.library.AddHashTagsInPlaceFactory
+ </definition>
+ </libraryFunction>
+ <libraryFunction>
+ <function_type>SCALAR</function_type>
<name>mysum</name>
<arguments>AINT32,AINT32</arguments>
<return_type>AINT32</return_type>
@@ -53,7 +69,8 @@
<libraryAdapters>
<libraryAdapter>
<name>test_typed_adapter</name>
- <factory_class>edu.uci.ics.asterix.external.library.adapter.TestTypedAdapterFactory</factory_class>
+ <factory_class>edu.uci.ics.asterix.external.library.adapter.TestTypedAdapterFactory
+ </factory_class>
</libraryAdapter>
</libraryAdapters>
</externalLibrary>