1) Modified HDFS adapter to use the Hadoop API for constructing a record reader to parse HDFSInputStream 2) Modified other adapters to swithc to a model where the parser instance is not shared across multiple instances 3) enabled HDFS cluster test case
git-svn-id: https://asterixdb.googlecode.com/svn/branches/asterix_stabilization@137 eaa15691-b419-025a-1212-ee371bd00084
diff --git a/asterix-app/src/test/resources/runtimets/ignore.txt b/asterix-app/src/test/resources/runtimets/ignore.txt
index 7185e92..dd6a1be 100644
--- a/asterix-app/src/test/resources/runtimets/ignore.txt
+++ b/asterix-app/src/test/resources/runtimets/ignore.txt
@@ -14,4 +14,3 @@
fuzzyjoin/events-users-aqlplus_1.aql
fuzzyjoin
failure/q1_pricing_summary_report_failure.aql
-dml/load
\ No newline at end of file
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/AbstractDatasourceAdapter.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/AbstractDatasourceAdapter.java
index dd153ea..4544288 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/AbstractDatasourceAdapter.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/AbstractDatasourceAdapter.java
@@ -46,8 +46,6 @@
protected IHyracksTaskContext ctx;
- protected transient IDataParser dataParser;
-
protected static final HashMap<ATypeTag, IValueParserFactory> typeToValueParserFactMap = new HashMap<ATypeTag, IValueParserFactory>();
protected static final HashMap<String, String> formatToParserMap = new HashMap<String, String>();
@@ -96,14 +94,6 @@
configuration.put(property, value);
}
- public IDataParser getParser() {
- return dataParser;
- }
-
- public void setParser(IDataParser dataParser) {
- this.dataParser = dataParser;
- }
-
public String getAdapterProperty(String attribute) {
return configuration.get(attribute);
}
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/CNNFeedAdapter.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/CNNFeedAdapter.java
index 0673a24..4d969e4 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/CNNFeedAdapter.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/CNNFeedAdapter.java
@@ -15,106 +15,97 @@
import edu.uci.ics.asterix.om.types.ARecordType;
import edu.uci.ics.asterix.om.types.IAType;
-public class CNNFeedAdapter extends RSSFeedAdapter implements
- IDatasourceAdapter, IMutableFeedAdapter {
+public class CNNFeedAdapter extends RSSFeedAdapter implements IDatasourceAdapter, IMutableFeedAdapter {
- private List<String> feedURLs = new ArrayList<String>();
- private String id_prefix = "";
+ private List<String> feedURLs = new ArrayList<String>();
+ private String id_prefix = "";
- public static final String KEY_RSS_URL = "topic";
- public static final String KEY_INTERVAL = "interval";
+ public static final String KEY_RSS_URL = "topic";
+ public static final String KEY_INTERVAL = "interval";
- private static Map<String, String> topicFeeds = new HashMap<String, String>();
+ private static Map<String, String> topicFeeds = new HashMap<String, String>();
- public static final String TOP_STORIES = "topstories";
- public static final String WORLD = "world";
- public static final String US = "us";
- public static final String SPORTS = "sports";
- public static final String BUSINESS = "business";
- public static final String POLITICS = "politics";
- public static final String CRIME = "crime";
- public static final String TECHNOLOGY = "technology";
- public static final String HEALTH = "health";
- public static final String ENTERNTAINMENT = "entertainemnt";
- public static final String TRAVEL = "travel";
- public static final String LIVING = "living";
- public static final String VIDEO = "video";
- public static final String STUDENT = "student";
- public static final String POPULAR = "popular";
- public static final String RECENT = "recent";
+ public static final String TOP_STORIES = "topstories";
+ public static final String WORLD = "world";
+ public static final String US = "us";
+ public static final String SPORTS = "sports";
+ public static final String BUSINESS = "business";
+ public static final String POLITICS = "politics";
+ public static final String CRIME = "crime";
+ public static final String TECHNOLOGY = "technology";
+ public static final String HEALTH = "health";
+ public static final String ENTERNTAINMENT = "entertainemnt";
+ public static final String TRAVEL = "travel";
+ public static final String LIVING = "living";
+ public static final String VIDEO = "video";
+ public static final String STUDENT = "student";
+ public static final String POPULAR = "popular";
+ public static final String RECENT = "recent";
- private void initTopics() {
- topicFeeds
- .put(TOP_STORIES, "http://rss.cnn.com/rss/cnn_topstories.rss");
- topicFeeds.put(WORLD, "http://rss.cnn.com/rss/cnn_world.rss");
- topicFeeds.put(US, "http://rss.cnn.com/rss/cnn_us.rss");
- topicFeeds.put(SPORTS, "http://rss.cnn.com/rss/si_topstories.rss");
- topicFeeds.put(BUSINESS, "http://rss.cnn.com/rss/money_latest.rss");
- topicFeeds.put(POLITICS, "http://rss.cnn.com/rss/cnn_allpolitics.rss");
- topicFeeds.put(CRIME, "http://rss.cnn.com/rss/cnn_crime.rss");
- topicFeeds.put(TECHNOLOGY, "http://rss.cnn.com/rss/cnn_tech.rss");
- topicFeeds.put(HEALTH, "http://rss.cnn.com/rss/cnn_health.rss");
- topicFeeds
- .put(ENTERNTAINMENT, "http://rss.cnn.com/rss/cnn_showbiz.rss");
- topicFeeds.put(LIVING, "http://rss.cnn.com/rss/cnn_living.rss");
- topicFeeds.put(VIDEO, "http://rss.cnn.com/rss/cnn_freevideo.rss");
- topicFeeds.put(TRAVEL, "http://rss.cnn.com/rss/cnn_travel.rss");
- topicFeeds.put(STUDENT, "http://rss.cnn.com/rss/cnn_studentnews.rss");
- topicFeeds.put(POPULAR, "http://rss.cnn.com/rss/cnn_mostpopular.rss");
- topicFeeds.put(RECENT, "http://rss.cnn.com/rss/cnn_latest.rss");
- }
+ private void initTopics() {
+ topicFeeds.put(TOP_STORIES, "http://rss.cnn.com/rss/cnn_topstories.rss");
+ topicFeeds.put(WORLD, "http://rss.cnn.com/rss/cnn_world.rss");
+ topicFeeds.put(US, "http://rss.cnn.com/rss/cnn_us.rss");
+ topicFeeds.put(SPORTS, "http://rss.cnn.com/rss/si_topstories.rss");
+ topicFeeds.put(BUSINESS, "http://rss.cnn.com/rss/money_latest.rss");
+ topicFeeds.put(POLITICS, "http://rss.cnn.com/rss/cnn_allpolitics.rss");
+ topicFeeds.put(CRIME, "http://rss.cnn.com/rss/cnn_crime.rss");
+ topicFeeds.put(TECHNOLOGY, "http://rss.cnn.com/rss/cnn_tech.rss");
+ topicFeeds.put(HEALTH, "http://rss.cnn.com/rss/cnn_health.rss");
+ topicFeeds.put(ENTERNTAINMENT, "http://rss.cnn.com/rss/cnn_showbiz.rss");
+ topicFeeds.put(LIVING, "http://rss.cnn.com/rss/cnn_living.rss");
+ topicFeeds.put(VIDEO, "http://rss.cnn.com/rss/cnn_freevideo.rss");
+ topicFeeds.put(TRAVEL, "http://rss.cnn.com/rss/cnn_travel.rss");
+ topicFeeds.put(STUDENT, "http://rss.cnn.com/rss/cnn_studentnews.rss");
+ topicFeeds.put(POPULAR, "http://rss.cnn.com/rss/cnn_mostpopular.rss");
+ topicFeeds.put(RECENT, "http://rss.cnn.com/rss/cnn_latest.rss");
+ }
- @Override
- public IDataParser getDataParser(int partition) throws Exception {
- if (dataParser == null) {
- dataParser = new ManagedDelimitedDataStreamParser();
- dataParser.configure(configuration);
- dataParser.initialize((ARecordType) atype, ctx);
- RSSFeedClient feedClient = new RSSFeedClient(this,
- feedURLs.get(partition), id_prefix);
- FeedStream feedStream = new FeedStream(feedClient, ctx);
- ((IDataStreamParser) dataParser).setInputStream(feedStream);
- }
- return dataParser;
- }
+ @Override
+ public IDataParser getDataParser(int partition) throws Exception {
+ IDataParser dataParser = new ManagedDelimitedDataStreamParser();
+ dataParser.configure(configuration);
+ dataParser.initialize((ARecordType) atype, ctx);
+ RSSFeedClient feedClient = new RSSFeedClient(this, feedURLs.get(partition), id_prefix);
+ FeedStream feedStream = new FeedStream(feedClient, ctx);
+ ((IDataStreamParser) dataParser).setInputStream(feedStream);
+ return dataParser;
+ }
- @Override
- public void configure(Map<String, String> arguments, IAType atype)
- throws Exception {
- configuration = arguments;
- this.atype = atype;
- String rssURLProperty = configuration.get(KEY_RSS_URL);
- if (rssURLProperty == null) {
- throw new IllegalArgumentException("no rss url provided");
- }
- initializeFeedURLs(rssURLProperty);
- configurePartitionConstraints();
+ @Override
+ public void configure(Map<String, String> arguments, IAType atype) throws Exception {
+ configuration = arguments;
+ this.atype = atype;
+ String rssURLProperty = configuration.get(KEY_RSS_URL);
+ if (rssURLProperty == null) {
+ throw new IllegalArgumentException("no rss url provided");
+ }
+ initializeFeedURLs(rssURLProperty);
+ configurePartitionConstraints();
- }
+ }
- private void initializeFeedURLs(String rssURLProperty) {
- feedURLs.clear();
- String[] rssTopics = rssURLProperty.split(",");
- initTopics();
- for (String topic : rssTopics) {
- String feedURL = topicFeeds.get(topic);
- if (feedURL == null) {
- throw new IllegalArgumentException(" unknown topic :" + topic
- + " please choose from the following "
- + getValidTopics());
- }
- feedURLs.add(feedURL);
- }
- }
+ private void initializeFeedURLs(String rssURLProperty) {
+ feedURLs.clear();
+ String[] rssTopics = rssURLProperty.split(",");
+ initTopics();
+ for (String topic : rssTopics) {
+ String feedURL = topicFeeds.get(topic);
+ if (feedURL == null) {
+ throw new IllegalArgumentException(" unknown topic :" + topic + " please choose from the following "
+ + getValidTopics());
+ }
+ feedURLs.add(feedURL);
+ }
+ }
- private static String getValidTopics() {
- StringBuilder builder = new StringBuilder();
- for (String key : topicFeeds.keySet()) {
- builder.append(key);
- builder.append(" ");
- }
- return new String(builder);
- }
+ private static String getValidTopics() {
+ StringBuilder builder = new StringBuilder();
+ for (String key : topicFeeds.keySet()) {
+ builder.append(key);
+ builder.append(" ");
+ }
+ return new String(builder);
+ }
}
-
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/HDFSAdapter.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/HDFSAdapter.java
index 8bd2fb0..9843a17 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/HDFSAdapter.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/HDFSAdapter.java
@@ -31,6 +31,7 @@
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.SequenceFileInputFormat;
+import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.mapred.Counters.Counter;
import edu.uci.ics.asterix.external.data.adapter.api.IDatasourceReadAdapter;
@@ -116,12 +117,13 @@
parserClass = formatToParserMap.get(FORMAT_ADM);
}
}
-
+
}
- private void configureParser() throws Exception {
- dataParser = (IDataParser) Class.forName(parserClass).newInstance();
+ private IDataParser createDataParser() throws Exception {
+ IDataParser dataParser = (IDataParser) Class.forName(parserClass).newInstance();
dataParser.configure(configuration);
+ return dataParser;
}
private void configurePartitionConstraint() throws Exception {
@@ -177,8 +179,7 @@
public void initialize(IHyracksTaskContext ctx) throws Exception {
this.ctx = ctx;
inputSplits = inputSplitsProxy.toInputSplits(conf);
- configureParser();
- dataParser.initialize((ARecordType) atype, ctx);
+
reporter = new Reporter() {
@Override
@@ -223,27 +224,32 @@
SequenceFileInputFormat format = (SequenceFileInputFormat) conf.getInputFormat();
RecordReader reader = format.getRecordReader((org.apache.hadoop.mapred.FileSplit) inputSplits[partition],
conf, reporter);
- inputStream = new SequenceToTextStream(reader, ctx);
+ inputStream = new HDFSStream(reader, ctx);
} else {
try {
- inputStream = fs.open(((org.apache.hadoop.mapred.FileSplit) inputSplits[partition]).getPath());
+ TextInputFormat format = (TextInputFormat) conf.getInputFormat();
+ RecordReader reader = format.getRecordReader(
+ (org.apache.hadoop.mapred.FileSplit) inputSplits[partition], conf, reporter);
+ inputStream = new HDFSStream(reader, ctx);
} catch (FileNotFoundException e) {
throw new HyracksDataException(e);
}
}
+ IDataParser dataParser = createDataParser();
if (dataParser instanceof IDataStreamParser) {
((IDataStreamParser) dataParser).setInputStream(inputStream);
} else {
throw new IllegalArgumentException(" parser not compatible");
}
-
+ dataParser.configure(configuration);
+ dataParser.initialize((ARecordType) atype, ctx);
return dataParser;
}
}
-class SequenceToTextStream extends InputStream {
+class HDFSStream extends InputStream {
private ByteBuffer buffer;
private int capacity;
@@ -252,7 +258,7 @@
private final Object key;
private final Text value;
- public SequenceToTextStream(RecordReader reader, IHyracksTaskContext ctx) throws Exception {
+ public HDFSStream(RecordReader reader, IHyracksTaskContext ctx) throws Exception {
capacity = ctx.getFrameSize();
buffer = ByteBuffer.allocate(capacity);
this.reader = reader;
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/NCFileSystemAdapter.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/NCFileSystemAdapter.java
index 385ffe5..bf64dd4 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/NCFileSystemAdapter.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/NCFileSystemAdapter.java
@@ -70,8 +70,6 @@
@Override
public void initialize(IHyracksTaskContext ctx) throws Exception {
this.ctx = ctx;
- configureDataParser();
- dataParser.initialize((ARecordType) atype, ctx);
}
@Override
@@ -94,11 +92,15 @@
} catch (FileNotFoundException e) {
throw new HyracksDataException(e);
}
+
+ IDataParser dataParser = (IDataParser) Class.forName(parserClass).newInstance();
if (dataParser instanceof IDataStreamParser) {
((IDataStreamParser) dataParser).setInputStream(in);
} else {
throw new IllegalArgumentException(" parser not compatible");
}
+ dataParser.configure(configuration);
+ dataParser.initialize((ARecordType) atype, ctx);
return dataParser;
}
@@ -131,11 +133,6 @@
}
- private void configureDataParser() throws Exception {
- dataParser = (IDataParser) Class.forName(parserClass).newInstance();
- dataParser.configure(configuration);
- }
-
private void configureInputType() {
throw new UnsupportedOperationException(" Cannot resolve input type, operation not supported");
}
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/RSSFeedAdapter.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/RSSFeedAdapter.java
index 0cf8e95..d6c93c1 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/RSSFeedAdapter.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/RSSFeedAdapter.java
@@ -56,14 +56,13 @@
@Override
public IDataParser getDataParser(int partition) throws Exception {
- if (dataParser == null) {
- dataParser = new ManagedDelimitedDataStreamParser();
- ((IManagedDataParser) dataParser).setAdapter(this);
- dataParser.initialize((ARecordType) atype, ctx);
- IFeedClient feedClient = new RSSFeedClient(this, feedURLs.get(partition), id_prefix);
- FeedStream feedStream = new FeedStream(feedClient, ctx);
- ((IDataStreamParser) dataParser).setInputStream(feedStream);
- }
+ IDataParser dataParser = new ManagedDelimitedDataStreamParser();
+ ((IManagedDataParser) dataParser).setAdapter(this);
+ dataParser.configure(configuration);
+ dataParser.initialize((ARecordType) atype, ctx);
+ IFeedClient feedClient = new RSSFeedClient(this, feedURLs.get(partition), id_prefix);
+ FeedStream feedStream = new FeedStream(feedClient, ctx);
+ ((IDataStreamParser) dataParser).setInputStream(feedStream);
return dataParser;
}