modified HDFS adapter to evaluate parition constraints at compile time and store the serialized form. This also avoids multiple calls to namenode

git-svn-id: https://asterixdb.googlecode.com/svn/branches/asterix_stabilization@129 eaa15691-b419-025a-1212-ee371bd00084
diff --git a/asterix-external-data/pom.xml b/asterix-external-data/pom.xml
index 7e3e65d..c297c08 100644
--- a/asterix-external-data/pom.xml
+++ b/asterix-external-data/pom.xml
@@ -128,6 +128,11 @@
                          <artifactId>rome</artifactId>
                          <version>1.0.1-modified-01</version>
                 </dependency>
+	        <dependency>
+		         <groupId>edu.uci.ics.hyracks</groupId>
+		         <artifactId>hyracks-dataflow-hadoop</artifactId>
+		         <version>0.2.1-SNAPSHOT</version>
+	        </dependency>
 <dependency>
             <groupId>jdom</groupId>
             <artifactId>jdom</artifactId>
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/data/operator/ExternalDataScanOperatorDescriptor.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/data/operator/ExternalDataScanOperatorDescriptor.java
index cf0d4f3..4fef771 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/data/operator/ExternalDataScanOperatorDescriptor.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/data/operator/ExternalDataScanOperatorDescriptor.java
@@ -18,64 +18,92 @@
 
 import edu.uci.ics.asterix.external.data.adapter.api.IDatasourceReadAdapter;
 import edu.uci.ics.asterix.om.types.IAType;
+import edu.uci.ics.hyracks.api.application.ICCApplicationContext;
+import edu.uci.ics.hyracks.api.constraints.IConstraintAcceptor;
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
 import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
 import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
 import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.api.job.IOperatorEnvironment;
+import edu.uci.ics.hyracks.api.job.JobActivityGraph;
 import edu.uci.ics.hyracks.api.job.JobSpecification;
 import edu.uci.ics.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
 import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNodePushable;
 
-public class ExternalDataScanOperatorDescriptor extends
-		AbstractSingleActivityOperatorDescriptor {
-	private static final long serialVersionUID = 1L;
+public class ExternalDataScanOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor {
+    private static final long serialVersionUID = 1L;
 
-	private final String adapter;
-	private final Map<String, String> adapterConfiguration;
-	private final IAType atype;
+    private final String adapter;
+    private final Map<String, String> adapterConfiguration;
+    private final IAType atype;
 
-	private transient IDatasourceReadAdapter datasourceReadAdapter;
+    private transient IDatasourceReadAdapter datasourceReadAdapter;
 
-	public ExternalDataScanOperatorDescriptor(JobSpecification spec,
-			String adapter, Map<String, String> arguments, IAType atype,
-			RecordDescriptor rDesc) {
-		super(spec, 0, 1);
-		recordDescriptors[0] = rDesc;
-		this.adapter = adapter;
-		this.adapterConfiguration = arguments;
-		this.atype = atype;
-	}
+    public ExternalDataScanOperatorDescriptor(JobSpecification spec, String adapter, Map<String, String> arguments,
+            IAType atype, RecordDescriptor rDesc) {
+        super(spec, 0, 1);
+        recordDescriptors[0] = rDesc;
+        this.adapter = adapter;
+        this.adapterConfiguration = arguments;
+        this.atype = atype;
+    }
 
-	public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx,
-			IRecordDescriptorProvider recordDescProvider, final int partition,
-			int nPartitions) throws HyracksDataException {
+    @Override
+    public void contributeSchedulingConstraints(IConstraintAcceptor constraintAcceptor, JobActivityGraph plan,
+            ICCApplicationContext appCtx) {
+        
+        /*
+        Comment: The following code is commented out. This is because constraints are being set at compile time so that they can 
+        be propagated to upstream Asterix operators. Hyracks has to provide a way to propagate constraints to upstream operators.
+        Once it is there, we will uncomment the following code.  
+        
+        AlgebricksPartitionConstraint constraint = datasourceReadAdapter.getPartitionConstraint();
+        switch (constraint.getPartitionConstraintType()) {
+            case ABSOLUTE:
+                String[] locations = ((AlgebricksAbsolutePartitionConstraint) constraint).getLocations();
+                for (int i = 0; i < locations.length; ++i) {
+                    constraintAcceptor.addConstraint(new Constraint(new PartitionLocationExpression(this.odId, i),
+                            new ConstantExpression(locations[i])));
+                }
+                constraintAcceptor.addConstraint(new Constraint(new PartitionCountExpression(this.odId),
+                        new ConstantExpression(locations.length)));
 
-		try {
-			datasourceReadAdapter = (IDatasourceReadAdapter) Class.forName(
-					adapter).newInstance();
-			datasourceReadAdapter.configure(adapterConfiguration, atype);
-			datasourceReadAdapter.initialize(ctx);
-		} catch (Exception e) {
-			throw new HyracksDataException("initialization of adapter failed",
-					e);
-		}
-		return new AbstractUnaryOutputSourceOperatorNodePushable() {
-			@Override
-			public void initialize() throws HyracksDataException {
-				writer.open();
-				try {
-					datasourceReadAdapter.getDataParser(partition)
-							.parse(writer);
-				} catch (Exception e) {
-					throw new HyracksDataException(
-							"exception during reading from external data source",
-							e);
-				} finally {
-					writer.close();
-				}
-			}
-		};
-	}
+                break;
+            case COUNT:
+                constraintAcceptor.addConstraint(new Constraint(new PartitionCountExpression(this.odId),
+                        new ConstantExpression(((AlgebricksCountPartitionConstraint) constraint).getCount())));
+                break;
+            default:
+                throw new IllegalStateException(" Constraint type :" + constraint.getPartitionConstraintType()
+                        + " not supported");
+
+        }*/
+
+    }
+
+    public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx,
+            IRecordDescriptorProvider recordDescProvider, final int partition, int nPartitions)
+            throws HyracksDataException {
+
+        try {
+            datasourceReadAdapter = (IDatasourceReadAdapter) Class.forName(adapter).newInstance();
+            datasourceReadAdapter.configure(adapterConfiguration, atype);
+            datasourceReadAdapter.initialize(ctx);
+        } catch (Exception e) {
+            throw new HyracksDataException("initialization of adapter failed", e);
+        }
+        return new AbstractUnaryOutputSourceOperatorNodePushable() {
+            @Override
+            public void initialize() throws HyracksDataException {
+                writer.open();
+                try {
+                    datasourceReadAdapter.getDataParser(partition).parse(writer);
+                } catch (Exception e) {
+                    throw new HyracksDataException("exception during reading from external data source", e);
+                } finally {
+                    writer.close();
+                }
+            }
+        };
+    }
 }
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/HDFSAdapter.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/HDFSAdapter.java
index 205c855..b59f034 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/HDFSAdapter.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/HDFSAdapter.java
@@ -47,6 +47,7 @@
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.dataflow.common.data.parsers.IValueParserFactory;
+import edu.uci.ics.hyracks.dataflow.hadoop.util.InputSplitsProxy;
 import edu.uci.ics.hyracks.dataflow.std.file.ITupleParserFactory;
 
 public class HDFSAdapter extends AbstractDatasourceAdapter implements IDatasourceReadAdapter {
@@ -54,12 +55,13 @@
     private String hdfsUrl;
     private List<String> hdfsPaths;
     private String inputFormatClassName;
-    private InputSplit[] inputSplits;
+    private Object[] inputSplits;
     private JobConf conf;
     private IHyracksTaskContext ctx;
     private Reporter reporter;
     private boolean isDelimited;
     private Character delimiter;
+    private InputSplitsProxy inputSplitsProxy;
     private static final Map<String, String> formatClassNames = new HashMap<String, String>();
 
     public static final String KEY_HDFS_URL = "hdfs";
@@ -121,6 +123,7 @@
 
     private void configurePartitionConstraint() throws Exception {
         InputSplit[] inputSplits = conf.getInputFormat().getSplits(conf, 0);
+        inputSplitsProxy = new InputSplitsProxy(conf, inputSplits);
         partitionConstraint = new AlgebricksCountPartitionConstraint(inputSplits.length);
         hdfsPaths = new ArrayList<String>();
         for (String hdfsPath : configuration.get(KEY_HDFS_PATH).split(",")) {
@@ -168,7 +171,7 @@
     @Override
     public void initialize(IHyracksTaskContext ctx) throws Exception {
         this.ctx = ctx;
-        inputSplits = conf.getInputFormat().getSplits(conf, 0);
+        inputSplits = inputSplitsProxy.toInputSplits(conf);
         dataParser.initialize((ARecordType) atype, ctx);
         reporter = new Reporter() {
 
@@ -212,7 +215,8 @@
         InputStream inputStream;
         if (conf.getInputFormat() instanceof SequenceFileInputFormat) {
             SequenceFileInputFormat format = (SequenceFileInputFormat) conf.getInputFormat();
-            RecordReader reader = format.getRecordReader(inputSplits[partition], conf, reporter);
+            RecordReader reader = format.getRecordReader((org.apache.hadoop.mapred.FileSplit) inputSplits[partition],
+                    conf, reporter);
             inputStream = new SequenceToTextStream(reader, ctx);
         } else {
             try {