modified HDFS adapter to evaluate parition constraints at compile time and store the serialized form. This also avoids multiple calls to namenode
git-svn-id: https://asterixdb.googlecode.com/svn/branches/asterix_stabilization@129 eaa15691-b419-025a-1212-ee371bd00084
diff --git a/asterix-external-data/pom.xml b/asterix-external-data/pom.xml
index 7e3e65d..c297c08 100644
--- a/asterix-external-data/pom.xml
+++ b/asterix-external-data/pom.xml
@@ -128,6 +128,11 @@
<artifactId>rome</artifactId>
<version>1.0.1-modified-01</version>
</dependency>
+ <dependency>
+ <groupId>edu.uci.ics.hyracks</groupId>
+ <artifactId>hyracks-dataflow-hadoop</artifactId>
+ <version>0.2.1-SNAPSHOT</version>
+ </dependency>
<dependency>
<groupId>jdom</groupId>
<artifactId>jdom</artifactId>
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/data/operator/ExternalDataScanOperatorDescriptor.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/data/operator/ExternalDataScanOperatorDescriptor.java
index cf0d4f3..4fef771 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/data/operator/ExternalDataScanOperatorDescriptor.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/data/operator/ExternalDataScanOperatorDescriptor.java
@@ -18,64 +18,92 @@
import edu.uci.ics.asterix.external.data.adapter.api.IDatasourceReadAdapter;
import edu.uci.ics.asterix.om.types.IAType;
+import edu.uci.ics.hyracks.api.application.ICCApplicationContext;
+import edu.uci.ics.hyracks.api.constraints.IConstraintAcceptor;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.api.job.IOperatorEnvironment;
+import edu.uci.ics.hyracks.api.job.JobActivityGraph;
import edu.uci.ics.hyracks.api.job.JobSpecification;
import edu.uci.ics.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNodePushable;
-public class ExternalDataScanOperatorDescriptor extends
- AbstractSingleActivityOperatorDescriptor {
- private static final long serialVersionUID = 1L;
+public class ExternalDataScanOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor {
+ private static final long serialVersionUID = 1L;
- private final String adapter;
- private final Map<String, String> adapterConfiguration;
- private final IAType atype;
+ private final String adapter;
+ private final Map<String, String> adapterConfiguration;
+ private final IAType atype;
- private transient IDatasourceReadAdapter datasourceReadAdapter;
+ private transient IDatasourceReadAdapter datasourceReadAdapter;
- public ExternalDataScanOperatorDescriptor(JobSpecification spec,
- String adapter, Map<String, String> arguments, IAType atype,
- RecordDescriptor rDesc) {
- super(spec, 0, 1);
- recordDescriptors[0] = rDesc;
- this.adapter = adapter;
- this.adapterConfiguration = arguments;
- this.atype = atype;
- }
+ public ExternalDataScanOperatorDescriptor(JobSpecification spec, String adapter, Map<String, String> arguments,
+ IAType atype, RecordDescriptor rDesc) {
+ super(spec, 0, 1);
+ recordDescriptors[0] = rDesc;
+ this.adapter = adapter;
+ this.adapterConfiguration = arguments;
+ this.atype = atype;
+ }
- public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx,
- IRecordDescriptorProvider recordDescProvider, final int partition,
- int nPartitions) throws HyracksDataException {
+ @Override
+ public void contributeSchedulingConstraints(IConstraintAcceptor constraintAcceptor, JobActivityGraph plan,
+ ICCApplicationContext appCtx) {
+
+ /*
+ Comment: The following code is commented out. This is because constraints are being set at compile time so that they can
+ be propagated to upstream Asterix operators. Hyracks has to provide a way to propagate constraints to upstream operators.
+ Once it is there, we will uncomment the following code.
+
+ AlgebricksPartitionConstraint constraint = datasourceReadAdapter.getPartitionConstraint();
+ switch (constraint.getPartitionConstraintType()) {
+ case ABSOLUTE:
+ String[] locations = ((AlgebricksAbsolutePartitionConstraint) constraint).getLocations();
+ for (int i = 0; i < locations.length; ++i) {
+ constraintAcceptor.addConstraint(new Constraint(new PartitionLocationExpression(this.odId, i),
+ new ConstantExpression(locations[i])));
+ }
+ constraintAcceptor.addConstraint(new Constraint(new PartitionCountExpression(this.odId),
+ new ConstantExpression(locations.length)));
- try {
- datasourceReadAdapter = (IDatasourceReadAdapter) Class.forName(
- adapter).newInstance();
- datasourceReadAdapter.configure(adapterConfiguration, atype);
- datasourceReadAdapter.initialize(ctx);
- } catch (Exception e) {
- throw new HyracksDataException("initialization of adapter failed",
- e);
- }
- return new AbstractUnaryOutputSourceOperatorNodePushable() {
- @Override
- public void initialize() throws HyracksDataException {
- writer.open();
- try {
- datasourceReadAdapter.getDataParser(partition)
- .parse(writer);
- } catch (Exception e) {
- throw new HyracksDataException(
- "exception during reading from external data source",
- e);
- } finally {
- writer.close();
- }
- }
- };
- }
+ break;
+ case COUNT:
+ constraintAcceptor.addConstraint(new Constraint(new PartitionCountExpression(this.odId),
+ new ConstantExpression(((AlgebricksCountPartitionConstraint) constraint).getCount())));
+ break;
+ default:
+ throw new IllegalStateException(" Constraint type :" + constraint.getPartitionConstraintType()
+ + " not supported");
+
+ }*/
+
+ }
+
+ public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx,
+ IRecordDescriptorProvider recordDescProvider, final int partition, int nPartitions)
+ throws HyracksDataException {
+
+ try {
+ datasourceReadAdapter = (IDatasourceReadAdapter) Class.forName(adapter).newInstance();
+ datasourceReadAdapter.configure(adapterConfiguration, atype);
+ datasourceReadAdapter.initialize(ctx);
+ } catch (Exception e) {
+ throw new HyracksDataException("initialization of adapter failed", e);
+ }
+ return new AbstractUnaryOutputSourceOperatorNodePushable() {
+ @Override
+ public void initialize() throws HyracksDataException {
+ writer.open();
+ try {
+ datasourceReadAdapter.getDataParser(partition).parse(writer);
+ } catch (Exception e) {
+ throw new HyracksDataException("exception during reading from external data source", e);
+ } finally {
+ writer.close();
+ }
+ }
+ };
+ }
}
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/HDFSAdapter.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/HDFSAdapter.java
index 205c855..b59f034 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/HDFSAdapter.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/HDFSAdapter.java
@@ -47,6 +47,7 @@
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.dataflow.common.data.parsers.IValueParserFactory;
+import edu.uci.ics.hyracks.dataflow.hadoop.util.InputSplitsProxy;
import edu.uci.ics.hyracks.dataflow.std.file.ITupleParserFactory;
public class HDFSAdapter extends AbstractDatasourceAdapter implements IDatasourceReadAdapter {
@@ -54,12 +55,13 @@
private String hdfsUrl;
private List<String> hdfsPaths;
private String inputFormatClassName;
- private InputSplit[] inputSplits;
+ private Object[] inputSplits;
private JobConf conf;
private IHyracksTaskContext ctx;
private Reporter reporter;
private boolean isDelimited;
private Character delimiter;
+ private InputSplitsProxy inputSplitsProxy;
private static final Map<String, String> formatClassNames = new HashMap<String, String>();
public static final String KEY_HDFS_URL = "hdfs";
@@ -121,6 +123,7 @@
private void configurePartitionConstraint() throws Exception {
InputSplit[] inputSplits = conf.getInputFormat().getSplits(conf, 0);
+ inputSplitsProxy = new InputSplitsProxy(conf, inputSplits);
partitionConstraint = new AlgebricksCountPartitionConstraint(inputSplits.length);
hdfsPaths = new ArrayList<String>();
for (String hdfsPath : configuration.get(KEY_HDFS_PATH).split(",")) {
@@ -168,7 +171,7 @@
@Override
public void initialize(IHyracksTaskContext ctx) throws Exception {
this.ctx = ctx;
- inputSplits = conf.getInputFormat().getSplits(conf, 0);
+ inputSplits = inputSplitsProxy.toInputSplits(conf);
dataParser.initialize((ARecordType) atype, ctx);
reporter = new Reporter() {
@@ -212,7 +215,8 @@
InputStream inputStream;
if (conf.getInputFormat() instanceof SequenceFileInputFormat) {
SequenceFileInputFormat format = (SequenceFileInputFormat) conf.getInputFormat();
- RecordReader reader = format.getRecordReader(inputSplits[partition], conf, reporter);
+ RecordReader reader = format.getRecordReader((org.apache.hadoop.mapred.FileSplit) inputSplits[partition],
+ conf, reporter);
inputStream = new SequenceToTextStream(reader, ctx);
} else {
try {