Issue no 102: Testing on a small rainbow cluster or as part of the regular build process did not reveal some issues that were observed when running on a big Yahoo cluster. The issue observed was an HTTP 500 error Reason: org.hadoop.apache.mapred.Reporter. This check-in makes a slight modification to the HDFS adapter to not re-use the reporter instance across mutliple operator instances.

git-svn-id: https://asterixdb.googlecode.com/svn/branches/asterix_stabilization@139 eaa15691-b419-025a-1212-ee371bd00084
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/HDFSAdapter.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/HDFSAdapter.java
index 9843a17..f7c89ff 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/HDFSAdapter.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/HDFSAdapter.java
@@ -58,7 +58,6 @@
     private Object[] inputSplits;
     private transient JobConf conf;
     private IHyracksTaskContext ctx;
-    private Reporter reporter;
     private boolean isDelimited;
     private Character delimiter;
     private InputSplitsProxy inputSplitsProxy;
@@ -179,8 +178,10 @@
     public void initialize(IHyracksTaskContext ctx) throws Exception {
         this.ctx = ctx;
         inputSplits = inputSplitsProxy.toInputSplits(conf);
+    }
 
-        reporter = new Reporter() {
+    private Reporter getReporter() {
+        Reporter reporter = new Reporter() {
 
             @Override
             public Counter getCounter(Enum<?> arg0) {
@@ -213,8 +214,10 @@
             public void progress() {
             }
         };
+        
+        return reporter;
     }
-
+    
     @Override
     public IDataParser getDataParser(int partition) throws Exception {
         Path path = new Path(inputSplits[partition].toString());
@@ -223,13 +226,13 @@
         if (conf.getInputFormat() instanceof SequenceFileInputFormat) {
             SequenceFileInputFormat format = (SequenceFileInputFormat) conf.getInputFormat();
             RecordReader reader = format.getRecordReader((org.apache.hadoop.mapred.FileSplit) inputSplits[partition],
-                    conf, reporter);
+                    conf, getReporter());
             inputStream = new HDFSStream(reader, ctx);
         } else {
             try {
                 TextInputFormat format = (TextInputFormat) conf.getInputFormat();
                 RecordReader reader = format.getRecordReader(
-                        (org.apache.hadoop.mapred.FileSplit) inputSplits[partition], conf, reporter);
+                        (org.apache.hadoop.mapred.FileSplit) inputSplits[partition], conf, getReporter());
                 inputStream = new HDFSStream(reader, ctx);
             } catch (FileNotFoundException e) {
                 throw new HyracksDataException(e);