1) Modified HDFS adapter to use the Hadoop API for constructing a record reader to parse HDFSInputStream 2) Modified other adapters to swithc to a model where the parser instance is not shared across multiple instances 3) enabled HDFS cluster test case

git-svn-id: https://asterixdb.googlecode.com/svn/branches/asterix_stabilization@137 eaa15691-b419-025a-1212-ee371bd00084
diff --git a/asterix-app/src/test/resources/runtimets/ignore.txt b/asterix-app/src/test/resources/runtimets/ignore.txt
index 7185e92..dd6a1be 100644
--- a/asterix-app/src/test/resources/runtimets/ignore.txt
+++ b/asterix-app/src/test/resources/runtimets/ignore.txt
@@ -14,4 +14,3 @@
 fuzzyjoin/events-users-aqlplus_1.aql
 fuzzyjoin
 failure/q1_pricing_summary_report_failure.aql
-dml/load
\ No newline at end of file
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/AbstractDatasourceAdapter.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/AbstractDatasourceAdapter.java
index dd153ea..4544288 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/AbstractDatasourceAdapter.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/AbstractDatasourceAdapter.java
@@ -46,8 +46,6 @@
 
     protected IHyracksTaskContext ctx;
 
-    protected transient IDataParser dataParser;
-
     protected static final HashMap<ATypeTag, IValueParserFactory> typeToValueParserFactMap = new HashMap<ATypeTag, IValueParserFactory>();
 
     protected static final HashMap<String, String> formatToParserMap = new HashMap<String, String>();
@@ -96,14 +94,6 @@
         configuration.put(property, value);
     }
 
-    public IDataParser getParser() {
-        return dataParser;
-    }
-
-    public void setParser(IDataParser dataParser) {
-        this.dataParser = dataParser;
-    }
-
     public String getAdapterProperty(String attribute) {
         return configuration.get(attribute);
     }
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/CNNFeedAdapter.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/CNNFeedAdapter.java
index 0673a24..4d969e4 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/CNNFeedAdapter.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/CNNFeedAdapter.java
@@ -15,106 +15,97 @@
 import edu.uci.ics.asterix.om.types.ARecordType;
 import edu.uci.ics.asterix.om.types.IAType;
 
-public class CNNFeedAdapter extends RSSFeedAdapter  implements
-		IDatasourceAdapter, IMutableFeedAdapter {
+public class CNNFeedAdapter extends RSSFeedAdapter implements IDatasourceAdapter, IMutableFeedAdapter {
 
-	private List<String> feedURLs = new ArrayList<String>();
-	private String id_prefix = "";
+    private List<String> feedURLs = new ArrayList<String>();
+    private String id_prefix = "";
 
-	public static final String KEY_RSS_URL = "topic";
-	public static final String KEY_INTERVAL = "interval";
+    public static final String KEY_RSS_URL = "topic";
+    public static final String KEY_INTERVAL = "interval";
 
-	private static Map<String, String> topicFeeds = new HashMap<String, String>();
+    private static Map<String, String> topicFeeds = new HashMap<String, String>();
 
-	public static final String TOP_STORIES = "topstories";
-	public static final String WORLD = "world";
-	public static final String US = "us";
-	public static final String SPORTS = "sports";
-	public static final String BUSINESS = "business";
-	public static final String POLITICS = "politics";
-	public static final String CRIME = "crime";
-	public static final String TECHNOLOGY = "technology";
-	public static final String HEALTH = "health";
-	public static final String ENTERNTAINMENT = "entertainemnt";
-	public static final String TRAVEL = "travel";
-	public static final String LIVING = "living";
-	public static final String VIDEO = "video";
-	public static final String STUDENT = "student";
-	public static final String POPULAR = "popular";
-	public static final String RECENT = "recent";
+    public static final String TOP_STORIES = "topstories";
+    public static final String WORLD = "world";
+    public static final String US = "us";
+    public static final String SPORTS = "sports";
+    public static final String BUSINESS = "business";
+    public static final String POLITICS = "politics";
+    public static final String CRIME = "crime";
+    public static final String TECHNOLOGY = "technology";
+    public static final String HEALTH = "health";
+    public static final String ENTERNTAINMENT = "entertainemnt";
+    public static final String TRAVEL = "travel";
+    public static final String LIVING = "living";
+    public static final String VIDEO = "video";
+    public static final String STUDENT = "student";
+    public static final String POPULAR = "popular";
+    public static final String RECENT = "recent";
 
-	private void initTopics() {
-		topicFeeds
-				.put(TOP_STORIES, "http://rss.cnn.com/rss/cnn_topstories.rss");
-		topicFeeds.put(WORLD, "http://rss.cnn.com/rss/cnn_world.rss");
-		topicFeeds.put(US, "http://rss.cnn.com/rss/cnn_us.rss");
-		topicFeeds.put(SPORTS, "http://rss.cnn.com/rss/si_topstories.rss");
-		topicFeeds.put(BUSINESS, "http://rss.cnn.com/rss/money_latest.rss");
-		topicFeeds.put(POLITICS, "http://rss.cnn.com/rss/cnn_allpolitics.rss");
-		topicFeeds.put(CRIME, "http://rss.cnn.com/rss/cnn_crime.rss");
-		topicFeeds.put(TECHNOLOGY, "http://rss.cnn.com/rss/cnn_tech.rss");
-		topicFeeds.put(HEALTH, "http://rss.cnn.com/rss/cnn_health.rss");
-		topicFeeds
-				.put(ENTERNTAINMENT, "http://rss.cnn.com/rss/cnn_showbiz.rss");
-		topicFeeds.put(LIVING, "http://rss.cnn.com/rss/cnn_living.rss");
-		topicFeeds.put(VIDEO, "http://rss.cnn.com/rss/cnn_freevideo.rss");
-		topicFeeds.put(TRAVEL, "http://rss.cnn.com/rss/cnn_travel.rss");
-		topicFeeds.put(STUDENT, "http://rss.cnn.com/rss/cnn_studentnews.rss");
-		topicFeeds.put(POPULAR, "http://rss.cnn.com/rss/cnn_mostpopular.rss");
-		topicFeeds.put(RECENT, "http://rss.cnn.com/rss/cnn_latest.rss");
-	}
+    private void initTopics() {
+        topicFeeds.put(TOP_STORIES, "http://rss.cnn.com/rss/cnn_topstories.rss");
+        topicFeeds.put(WORLD, "http://rss.cnn.com/rss/cnn_world.rss");
+        topicFeeds.put(US, "http://rss.cnn.com/rss/cnn_us.rss");
+        topicFeeds.put(SPORTS, "http://rss.cnn.com/rss/si_topstories.rss");
+        topicFeeds.put(BUSINESS, "http://rss.cnn.com/rss/money_latest.rss");
+        topicFeeds.put(POLITICS, "http://rss.cnn.com/rss/cnn_allpolitics.rss");
+        topicFeeds.put(CRIME, "http://rss.cnn.com/rss/cnn_crime.rss");
+        topicFeeds.put(TECHNOLOGY, "http://rss.cnn.com/rss/cnn_tech.rss");
+        topicFeeds.put(HEALTH, "http://rss.cnn.com/rss/cnn_health.rss");
+        topicFeeds.put(ENTERNTAINMENT, "http://rss.cnn.com/rss/cnn_showbiz.rss");
+        topicFeeds.put(LIVING, "http://rss.cnn.com/rss/cnn_living.rss");
+        topicFeeds.put(VIDEO, "http://rss.cnn.com/rss/cnn_freevideo.rss");
+        topicFeeds.put(TRAVEL, "http://rss.cnn.com/rss/cnn_travel.rss");
+        topicFeeds.put(STUDENT, "http://rss.cnn.com/rss/cnn_studentnews.rss");
+        topicFeeds.put(POPULAR, "http://rss.cnn.com/rss/cnn_mostpopular.rss");
+        topicFeeds.put(RECENT, "http://rss.cnn.com/rss/cnn_latest.rss");
+    }
 
-	@Override
-	public IDataParser getDataParser(int partition) throws Exception {
-		if (dataParser == null) {
-			dataParser = new ManagedDelimitedDataStreamParser();
-			dataParser.configure(configuration);
-			dataParser.initialize((ARecordType) atype, ctx);
-			RSSFeedClient feedClient = new RSSFeedClient(this,
-					feedURLs.get(partition), id_prefix);
-			FeedStream feedStream = new FeedStream(feedClient, ctx);
-			((IDataStreamParser) dataParser).setInputStream(feedStream);
-		}
-		return dataParser;
-	}
+    @Override
+    public IDataParser getDataParser(int partition) throws Exception {
+        IDataParser dataParser = new ManagedDelimitedDataStreamParser();
+        dataParser.configure(configuration);
+        dataParser.initialize((ARecordType) atype, ctx);
+        RSSFeedClient feedClient = new RSSFeedClient(this, feedURLs.get(partition), id_prefix);
+        FeedStream feedStream = new FeedStream(feedClient, ctx);
+        ((IDataStreamParser) dataParser).setInputStream(feedStream);
+        return dataParser;
+    }
 
-	@Override
-	public void configure(Map<String, String> arguments, IAType atype)
-			throws Exception {
-		configuration = arguments;
-		this.atype = atype;
-		String rssURLProperty = configuration.get(KEY_RSS_URL);
-		if (rssURLProperty == null) {
-			throw new IllegalArgumentException("no rss url provided");
-		}
-		initializeFeedURLs(rssURLProperty);
-		configurePartitionConstraints();
+    @Override
+    public void configure(Map<String, String> arguments, IAType atype) throws Exception {
+        configuration = arguments;
+        this.atype = atype;
+        String rssURLProperty = configuration.get(KEY_RSS_URL);
+        if (rssURLProperty == null) {
+            throw new IllegalArgumentException("no rss url provided");
+        }
+        initializeFeedURLs(rssURLProperty);
+        configurePartitionConstraints();
 
-	}
+    }
 
-	private void initializeFeedURLs(String rssURLProperty) {
-		feedURLs.clear();
-		String[] rssTopics = rssURLProperty.split(",");
-		initTopics();
-		for (String topic : rssTopics) {
-			String feedURL = topicFeeds.get(topic);
-			if (feedURL == null) {
-				throw new IllegalArgumentException(" unknown topic :" + topic
-						+ " please choose from the following "
-						+ getValidTopics());
-			}
-			feedURLs.add(feedURL);
-		}
-	}
+    private void initializeFeedURLs(String rssURLProperty) {
+        feedURLs.clear();
+        String[] rssTopics = rssURLProperty.split(",");
+        initTopics();
+        for (String topic : rssTopics) {
+            String feedURL = topicFeeds.get(topic);
+            if (feedURL == null) {
+                throw new IllegalArgumentException(" unknown topic :" + topic + " please choose from the following "
+                        + getValidTopics());
+            }
+            feedURLs.add(feedURL);
+        }
+    }
 
-	private static String getValidTopics() {
-		StringBuilder builder = new StringBuilder();
-		for (String key : topicFeeds.keySet()) {
-			builder.append(key);
-			builder.append(" ");
-		}
-		return new String(builder);
-	}
+    private static String getValidTopics() {
+        StringBuilder builder = new StringBuilder();
+        for (String key : topicFeeds.keySet()) {
+            builder.append(key);
+            builder.append(" ");
+        }
+        return new String(builder);
+    }
 
 }
-
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/HDFSAdapter.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/HDFSAdapter.java
index 8bd2fb0..9843a17 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/HDFSAdapter.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/HDFSAdapter.java
@@ -31,6 +31,7 @@
 import org.apache.hadoop.mapred.RecordReader;
 import org.apache.hadoop.mapred.Reporter;
 import org.apache.hadoop.mapred.SequenceFileInputFormat;
+import org.apache.hadoop.mapred.TextInputFormat;
 import org.apache.hadoop.mapred.Counters.Counter;
 
 import edu.uci.ics.asterix.external.data.adapter.api.IDatasourceReadAdapter;
@@ -116,12 +117,13 @@
                 parserClass = formatToParserMap.get(FORMAT_ADM);
             }
         }
-       
+
     }
 
-    private void configureParser() throws Exception {
-        dataParser = (IDataParser) Class.forName(parserClass).newInstance();
+    private IDataParser createDataParser() throws Exception {
+        IDataParser dataParser = (IDataParser) Class.forName(parserClass).newInstance();
         dataParser.configure(configuration);
+        return dataParser;
     }
 
     private void configurePartitionConstraint() throws Exception {
@@ -177,8 +179,7 @@
     public void initialize(IHyracksTaskContext ctx) throws Exception {
         this.ctx = ctx;
         inputSplits = inputSplitsProxy.toInputSplits(conf);
-        configureParser();
-        dataParser.initialize((ARecordType) atype, ctx);
+
         reporter = new Reporter() {
 
             @Override
@@ -223,27 +224,32 @@
             SequenceFileInputFormat format = (SequenceFileInputFormat) conf.getInputFormat();
             RecordReader reader = format.getRecordReader((org.apache.hadoop.mapred.FileSplit) inputSplits[partition],
                     conf, reporter);
-            inputStream = new SequenceToTextStream(reader, ctx);
+            inputStream = new HDFSStream(reader, ctx);
         } else {
             try {
-                inputStream = fs.open(((org.apache.hadoop.mapred.FileSplit) inputSplits[partition]).getPath());
+                TextInputFormat format = (TextInputFormat) conf.getInputFormat();
+                RecordReader reader = format.getRecordReader(
+                        (org.apache.hadoop.mapred.FileSplit) inputSplits[partition], conf, reporter);
+                inputStream = new HDFSStream(reader, ctx);
             } catch (FileNotFoundException e) {
                 throw new HyracksDataException(e);
             }
         }
 
+        IDataParser dataParser = createDataParser();
         if (dataParser instanceof IDataStreamParser) {
             ((IDataStreamParser) dataParser).setInputStream(inputStream);
         } else {
             throw new IllegalArgumentException(" parser not compatible");
         }
-
+        dataParser.configure(configuration);
+        dataParser.initialize((ARecordType) atype, ctx);
         return dataParser;
     }
 
 }
 
-class SequenceToTextStream extends InputStream {
+class HDFSStream extends InputStream {
 
     private ByteBuffer buffer;
     private int capacity;
@@ -252,7 +258,7 @@
     private final Object key;
     private final Text value;
 
-    public SequenceToTextStream(RecordReader reader, IHyracksTaskContext ctx) throws Exception {
+    public HDFSStream(RecordReader reader, IHyracksTaskContext ctx) throws Exception {
         capacity = ctx.getFrameSize();
         buffer = ByteBuffer.allocate(capacity);
         this.reader = reader;
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/NCFileSystemAdapter.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/NCFileSystemAdapter.java
index 385ffe5..bf64dd4 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/NCFileSystemAdapter.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/NCFileSystemAdapter.java
@@ -70,8 +70,6 @@
     @Override
     public void initialize(IHyracksTaskContext ctx) throws Exception {
         this.ctx = ctx;
-        configureDataParser();
-        dataParser.initialize((ARecordType) atype, ctx);
     }
 
     @Override
@@ -94,11 +92,15 @@
         } catch (FileNotFoundException e) {
             throw new HyracksDataException(e);
         }
+
+        IDataParser dataParser = (IDataParser) Class.forName(parserClass).newInstance();
         if (dataParser instanceof IDataStreamParser) {
             ((IDataStreamParser) dataParser).setInputStream(in);
         } else {
             throw new IllegalArgumentException(" parser not compatible");
         }
+        dataParser.configure(configuration);
+        dataParser.initialize((ARecordType) atype, ctx);
         return dataParser;
     }
 
@@ -131,11 +133,6 @@
 
     }
 
-    private void configureDataParser() throws Exception {
-        dataParser = (IDataParser) Class.forName(parserClass).newInstance();
-        dataParser.configure(configuration);
-    }
-
     private void configureInputType() {
         throw new UnsupportedOperationException(" Cannot resolve input type, operation not supported");
     }
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/RSSFeedAdapter.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/RSSFeedAdapter.java
index 0cf8e95..d6c93c1 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/RSSFeedAdapter.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/RSSFeedAdapter.java
@@ -56,14 +56,13 @@
 
     @Override
     public IDataParser getDataParser(int partition) throws Exception {
-        if (dataParser == null) {
-            dataParser = new ManagedDelimitedDataStreamParser();
-            ((IManagedDataParser) dataParser).setAdapter(this);
-            dataParser.initialize((ARecordType) atype, ctx);
-            IFeedClient feedClient = new RSSFeedClient(this, feedURLs.get(partition), id_prefix);
-            FeedStream feedStream = new FeedStream(feedClient, ctx);
-            ((IDataStreamParser) dataParser).setInputStream(feedStream);
-        }
+        IDataParser dataParser = new ManagedDelimitedDataStreamParser();
+        ((IManagedDataParser) dataParser).setAdapter(this);
+        dataParser.configure(configuration);
+        dataParser.initialize((ARecordType) atype, ctx);
+        IFeedClient feedClient = new RSSFeedClient(this, feedURLs.get(partition), id_prefix);
+        FeedStream feedStream = new FeedStream(feedClient, ctx);
+        ((IDataStreamParser) dataParser).setInputStream(feedStream);
         return dataParser;
     }