Merge branch 'master' into yingyi/asterix_beta_fix
diff --git a/asterix-app/src/test/resources/hadoop/conf/mapred-site.xml b/asterix-app/src/test/resources/hadoop/conf/mapred-site.xml
index 1b9a4d6..b39fced 100644
--- a/asterix-app/src/test/resources/hadoop/conf/mapred-site.xml
+++ b/asterix-app/src/test/resources/hadoop/conf/mapred-site.xml
@@ -5,21 +5,21 @@
<configuration>
- <property>
- <name>mapred.job.tracker</name>
- <value>localhost:29007</value>
- </property>
- <property>
- <name>mapred.tasktracker.map.tasks.maximum</name>
- <value>20</value>
- </property>
- <property>
- <name>mapred.tasktracker.reduce.tasks.maximum</name>
- <value>20</value>
- </property>
- <property>
- <name>mapred.min.split.size</name>
- <value>65536</value>
- </property>
+ <property>
+ <name>mapred.job.tracker</name>
+ <value>localhost:29007</value>
+ </property>
+ <property>
+ <name>mapred.tasktracker.map.tasks.maximum</name>
+ <value>20</value>
+ </property>
+ <property>
+ <name>mapred.tasktracker.reduce.tasks.maximum</name>
+ <value>20</value>
+ </property>
+ <property>
+ <name>mapred.max.split.size</name>
+ <value>128</value>
+ </property>
</configuration>
diff --git a/asterix-app/src/test/resources/runtimets/queries/hdfs/issue_245_hdfs/issue_245_hdfs.3.query.aql b/asterix-app/src/test/resources/runtimets/queries/hdfs/issue_245_hdfs/issue_245_hdfs.3.query.aql
index 653ee6c..527a0e5 100644
--- a/asterix-app/src/test/resources/runtimets/queries/hdfs/issue_245_hdfs/issue_245_hdfs.3.query.aql
+++ b/asterix-app/src/test/resources/runtimets/queries/hdfs/issue_245_hdfs/issue_245_hdfs.3.query.aql
@@ -8,4 +8,5 @@
use dataverse test;
for $x in dataset('TextDataset')
+order by $x.line
return $x
diff --git a/asterix-app/src/test/resources/runtimets/results/hdfs/issue_245_hdfs/issue_245_hdfs.1.adm b/asterix-app/src/test/resources/runtimets/results/hdfs/issue_245_hdfs/issue_245_hdfs.1.adm
index 8af2f5f..59425b1 100644
--- a/asterix-app/src/test/resources/runtimets/results/hdfs/issue_245_hdfs/issue_245_hdfs.1.adm
+++ b/asterix-app/src/test/resources/runtimets/results/hdfs/issue_245_hdfs/issue_245_hdfs.1.adm
@@ -1,4 +1,4 @@
+{ "line": "ASTERIX is taking an open stance on data formats and addressing research issues including highly scalable data storage and indexing, semi-structured query processing on very large clusters, and merging parallel database techniques with todays data-intensive computing techniques to support performant yet declarative solutions to the problem of analyzing semi-structured information" }
+{ "line": "ASTERIX targets a wide range of semi-structured information, ranging from data use cases where information is well-tagged and highly regular to content use cases where data is irregular and much of each datum is textual" }
{ "line": "The ASTERIX project is developing new technologies for ingesting, storing, managing, indexing, querying, analyzing, and subscribing to vast quantities of semi-structured information" }
{ "line": "The project is combining ideas from three distinct areas semi-structured data, parallel databases, and data-intensive computing to create a next-generation, open source software platform that scales by running on large, shared-nothing commodity computing clusters" }
-{ "line": "ASTERIX targets a wide range of semi-structured information, ranging from data use cases where information is well-tagged and highly regular to content use cases where data is irregular and much of each datum is textual" }
-{ "line": "ASTERIX is taking an open stance on data formats and addressing research issues including highly scalable data storage and indexing, semi-structured query processing on very large clusters, and merging parallel database techniques with todays data-intensive computing techniques to support performant yet declarative solutions to the problem of analyzing semi-structured information" }
diff --git a/asterix-external-data/pom.xml b/asterix-external-data/pom.xml
index 842cd67..aff6967 100644
--- a/asterix-external-data/pom.xml
+++ b/asterix-external-data/pom.xml
@@ -131,7 +131,8 @@
</dependency>
<dependency>
<groupId>edu.uci.ics.hyracks</groupId>
- <artifactId>hyracks-dataflow-hadoop</artifactId>
+ <artifactId>hyracks-hdfs-core</artifactId>
+ <version>${hyracks.version}</version>
</dependency>
<dependency>
<groupId>jdom</groupId>
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/CNNFeedAdapterFactory.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/CNNFeedAdapterFactory.java
index f1f5884..8c8880a 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/CNNFeedAdapterFactory.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/CNNFeedAdapterFactory.java
@@ -23,9 +23,10 @@
* A factory class for creating the @see {CNNFeedAdapter}.
*/
public class CNNFeedAdapterFactory implements ITypedDatasetAdapterFactory {
+ private static final long serialVersionUID = 1L;
@Override
- public IDatasourceAdapter createAdapter(Map<String, String> configuration) throws Exception {
+ public IDatasourceAdapter createAdapter(Map<String, Object> configuration) throws Exception {
CNNFeedAdapter cnnFeedAdapter = new CNNFeedAdapter();
cnnFeedAdapter.configure(configuration);
return cnnFeedAdapter;
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/HDFSAdapterFactory.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/HDFSAdapterFactory.java
index 6fcb710..c267658 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/HDFSAdapterFactory.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/HDFSAdapterFactory.java
@@ -14,22 +14,79 @@
*/
package edu.uci.ics.asterix.external.adapter.factory;
+import java.util.Arrays;
+import java.util.HashMap;
import java.util.Map;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+
import edu.uci.ics.asterix.external.dataset.adapter.HDFSAdapter;
import edu.uci.ics.asterix.external.dataset.adapter.IDatasourceAdapter;
import edu.uci.ics.asterix.om.types.IAType;
+import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
+import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
+import edu.uci.ics.hyracks.hdfs.dataflow.ConfFactory;
+import edu.uci.ics.hyracks.hdfs.dataflow.InputSplitsFactory;
+import edu.uci.ics.hyracks.hdfs.scheduler.Scheduler;
/**
* A factory class for creating an instance of HDFSAdapter
*/
+@SuppressWarnings("deprecation")
public class HDFSAdapterFactory implements IGenericDatasetAdapterFactory {
+ private static final long serialVersionUID = 1L;
public static final String HDFS_ADAPTER_NAME = "hdfs";
+ public static final String CLUSTER_LOCATIONS = "cluster-locations";
+ public static transient String SCHEDULER = "hdfs-scheduler";
+
+ public static final String KEY_HDFS_URL = "hdfs";
+ public static final String KEY_PATH = "path";
+ public static final String KEY_INPUT_FORMAT = "input-format";
+ public static final String INPUT_FORMAT_TEXT = "text-input-format";
+ public static final String INPUT_FORMAT_SEQUENCE = "sequence-input-format";
+
+ private transient AlgebricksPartitionConstraint clusterLocations;
+ private String[] readSchedule;
+ private boolean executed[];
+ private InputSplitsFactory inputSplitsFactory;
+ private ConfFactory confFactory;
+ private boolean setup = false;
+
+ private static final Map<String, String> formatClassNames = initInputFormatMap();
+
+ private static Map<String, String> initInputFormatMap() {
+ Map<String, String> formatClassNames = new HashMap<String, String>();
+ formatClassNames.put(INPUT_FORMAT_TEXT, "org.apache.hadoop.mapred.TextInputFormat");
+ formatClassNames.put(INPUT_FORMAT_SEQUENCE, "org.apache.hadoop.mapred.SequenceFileInputFormat");
+ return formatClassNames;
+ }
@Override
- public IDatasourceAdapter createAdapter(Map<String, String> configuration, IAType atype) throws Exception {
- HDFSAdapter hdfsAdapter = new HDFSAdapter(atype);
+ public IDatasourceAdapter createAdapter(Map<String, Object> configuration, IAType atype) throws Exception {
+ if (!setup) {
+ /** set up the factory --serializable stuff --- this if-block should be called only once for each factory instance */
+ configureJobConf(configuration);
+ JobConf conf = configureJobConf(configuration);
+ confFactory = new ConfFactory(conf);
+
+ clusterLocations = (AlgebricksPartitionConstraint) configuration.get(CLUSTER_LOCATIONS);
+ int numPartitions = ((AlgebricksAbsolutePartitionConstraint) clusterLocations).getLocations().length;
+
+ InputSplit[] inputSplits = conf.getInputFormat().getSplits(conf, numPartitions);
+ inputSplitsFactory = new InputSplitsFactory(inputSplits);
+
+ Scheduler scheduler = (Scheduler) configuration.get(SCHEDULER);
+ readSchedule = scheduler.getLocationConstraints(inputSplits);
+ executed = new boolean[readSchedule.length];
+ Arrays.fill(executed, false);
+
+ setup = true;
+ }
+ JobConf conf = confFactory.getConf();
+ InputSplit[] inputSplits = inputSplitsFactory.getSplits();
+ HDFSAdapter hdfsAdapter = new HDFSAdapter(atype, readSchedule, executed, inputSplits, conf, clusterLocations);
hdfsAdapter.configure(configuration);
return hdfsAdapter;
}
@@ -39,4 +96,15 @@
return HDFS_ADAPTER_NAME;
}
+ private JobConf configureJobConf(Map<String, Object> configuration) throws Exception {
+ JobConf conf = new JobConf();
+ conf.set("fs.default.name", ((String) configuration.get(KEY_HDFS_URL)).trim());
+ conf.set("fs.hdfs.impl", "org.apache.hadoop.hdfs.DistributedFileSystem");
+ conf.setClassLoader(HDFSAdapter.class.getClassLoader());
+ conf.set("mapred.input.dir", ((String) configuration.get(KEY_PATH)).trim());
+ conf.set("mapred.input.format.class",
+ (String) formatClassNames.get(((String) configuration.get(KEY_INPUT_FORMAT)).trim()));
+ return conf;
+ }
+
}
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/HiveAdapterFactory.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/HiveAdapterFactory.java
index 5e28eed..1845c07 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/HiveAdapterFactory.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/HiveAdapterFactory.java
@@ -14,26 +14,96 @@
*/
package edu.uci.ics.asterix.external.adapter.factory;
+import java.util.Arrays;
+import java.util.HashMap;
import java.util.Map;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+
+import edu.uci.ics.asterix.external.dataset.adapter.HDFSAdapter;
import edu.uci.ics.asterix.external.dataset.adapter.HiveAdapter;
import edu.uci.ics.asterix.external.dataset.adapter.IDatasourceAdapter;
import edu.uci.ics.asterix.om.types.IAType;
+import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
+import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
+import edu.uci.ics.hyracks.hdfs.dataflow.ConfFactory;
+import edu.uci.ics.hyracks.hdfs.dataflow.InputSplitsFactory;
+import edu.uci.ics.hyracks.hdfs.scheduler.Scheduler;
/**
* A factory class for creating an instance of HiveAdapter
*/
+@SuppressWarnings("deprecation")
public class HiveAdapterFactory implements IGenericDatasetAdapterFactory {
+ private static final long serialVersionUID = 1L;
+
+ public static final String HDFS_ADAPTER_NAME = "hdfs";
+ public static final String CLUSTER_LOCATIONS = "cluster-locations";
+ public static transient String SCHEDULER = "hdfs-scheduler";
+
+ public static final String KEY_HDFS_URL = "hdfs";
+ public static final String KEY_PATH = "path";
+ public static final String KEY_INPUT_FORMAT = "input-format";
+ public static final String INPUT_FORMAT_TEXT = "text-input-format";
+ public static final String INPUT_FORMAT_SEQUENCE = "sequence-input-format";
+
+ private String[] readSchedule;
+ private boolean executed[];
+ private InputSplitsFactory inputSplitsFactory;
+ private ConfFactory confFactory;
+ private AlgebricksPartitionConstraint clusterLocations;
+ private boolean setup = false;
+
+ private static final Map<String, String> formatClassNames = initInputFormatMap();
+
+ private static Map<String, String> initInputFormatMap() {
+ Map<String, String> formatClassNames = new HashMap<String, String>();
+ formatClassNames.put(INPUT_FORMAT_TEXT, "org.apache.hadoop.mapred.TextInputFormat");
+ formatClassNames.put(INPUT_FORMAT_SEQUENCE, "org.apache.hadoop.mapred.SequenceFileInputFormat");
+ return formatClassNames;
+ }
@Override
- public IDatasourceAdapter createAdapter(Map<String, String> configuration, IAType type) throws Exception {
- HiveAdapter hiveAdapter = new HiveAdapter(type);
- hiveAdapter.configure(configuration);
- return hiveAdapter;
+ public IDatasourceAdapter createAdapter(Map<String, Object> configuration, IAType atype) throws Exception {
+ if (!setup) {
+ /** set up the factory --serializable stuff --- this if-block should be called only once for each factory instance */
+ configureJobConf(configuration);
+ JobConf conf = configureJobConf(configuration);
+ confFactory = new ConfFactory(conf);
+
+ clusterLocations = (AlgebricksPartitionConstraint) configuration.get(CLUSTER_LOCATIONS);
+ int numPartitions = ((AlgebricksAbsolutePartitionConstraint) clusterLocations).getLocations().length;
+
+ InputSplit[] inputSplits = conf.getInputFormat().getSplits(conf, numPartitions);
+ inputSplitsFactory = new InputSplitsFactory(inputSplits);
+
+ Scheduler scheduler = (Scheduler) configuration.get(SCHEDULER);
+ readSchedule = scheduler.getLocationConstraints(inputSplits);
+ executed = new boolean[readSchedule.length];
+ Arrays.fill(executed, false);
+
+ setup = true;
+ }
+ JobConf conf = confFactory.getConf();
+ InputSplit[] inputSplits = inputSplitsFactory.getSplits();
+ HiveAdapter hdfsAdapter = new HiveAdapter(atype, readSchedule, executed, inputSplits, conf, clusterLocations);
+ return hdfsAdapter;
}
@Override
public String getName() {
return "hive";
}
+
+ private JobConf configureJobConf(Map<String, Object> configuration) throws Exception {
+ JobConf conf = new JobConf();
+ conf.set("fs.default.name", ((String) configuration.get(KEY_HDFS_URL)).trim());
+ conf.set("fs.hdfs.impl", "org.apache.hadoop.hdfs.DistributedFileSystem");
+ conf.setClassLoader(HDFSAdapter.class.getClassLoader());
+ conf.set("mapred.input.dir", ((String) configuration.get(KEY_PATH)).trim());
+ conf.set("mapred.input.format.class",
+ (String) formatClassNames.get(((String) configuration.get(KEY_INPUT_FORMAT)).trim()));
+ return conf;
+ }
}
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/IAdapterFactory.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/IAdapterFactory.java
index 45fd6cf..697c8ea 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/IAdapterFactory.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/IAdapterFactory.java
@@ -14,12 +14,14 @@
*/
package edu.uci.ics.asterix.external.adapter.factory;
+import java.io.Serializable;
+
/**
* Base interface for IGenericDatasetAdapterFactory and ITypedDatasetAdapterFactory.
* Acts as a marker interface indicating that the implementation provides functionality
* for creating an adapter.
*/
-public interface IAdapterFactory {
+public interface IAdapterFactory extends Serializable {
/**
* Returns the display name corresponding to the Adapter type that is created by the factory.
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/IGenericDatasetAdapterFactory.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/IGenericDatasetAdapterFactory.java
index 093a3dd..e8d120a 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/IGenericDatasetAdapterFactory.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/IGenericDatasetAdapterFactory.java
@@ -38,6 +38,6 @@
* @return An instance of IDatasourceAdapter.
* @throws Exception
*/
- public IDatasourceAdapter createAdapter(Map<String, String> configuration, IAType atype) throws Exception;
+ public IDatasourceAdapter createAdapter(Map<String, Object> configuration, IAType atype) throws Exception;
}
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/ITypedDatasetAdapterFactory.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/ITypedDatasetAdapterFactory.java
index 0f9978e..84a5ca8 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/ITypedDatasetAdapterFactory.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/ITypedDatasetAdapterFactory.java
@@ -33,6 +33,6 @@
* @return An instance of IDatasourceAdapter.
* @throws Exception
*/
- public IDatasourceAdapter createAdapter(Map<String, String> configuration) throws Exception;
+ public IDatasourceAdapter createAdapter(Map<String, Object> configuration) throws Exception;
}
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/NCFileSystemAdapterFactory.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/NCFileSystemAdapterFactory.java
index 2040949..659fd23 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/NCFileSystemAdapterFactory.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/NCFileSystemAdapterFactory.java
@@ -26,11 +26,11 @@
* an NC.
*/
public class NCFileSystemAdapterFactory implements IGenericDatasetAdapterFactory {
-
+ private static final long serialVersionUID = 1L;
public static final String NC_FILE_SYSTEM_ADAPTER_NAME = "localfs";
@Override
- public IDatasourceAdapter createAdapter(Map<String, String> configuration, IAType atype) throws Exception {
+ public IDatasourceAdapter createAdapter(Map<String, Object> configuration, IAType atype) throws Exception {
NCFileSystemAdapter fsAdapter = new NCFileSystemAdapter(atype);
fsAdapter.configure(configuration);
return fsAdapter;
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/PullBasedTwitterAdapterFactory.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/PullBasedTwitterAdapterFactory.java
index bc00469..e63be17 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/PullBasedTwitterAdapterFactory.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/PullBasedTwitterAdapterFactory.java
@@ -25,11 +25,11 @@
* via pull-based Twitter API.
*/
public class PullBasedTwitterAdapterFactory implements ITypedDatasetAdapterFactory {
-
+ private static final long serialVersionUID = 1L;
public static final String PULL_BASED_TWITTER_ADAPTER_NAME = "pull_twitter";
@Override
- public IDatasourceAdapter createAdapter(Map<String, String> configuration) throws Exception {
+ public IDatasourceAdapter createAdapter(Map<String, Object> configuration) throws Exception {
PullBasedTwitterAdapter twitterAdapter = new PullBasedTwitterAdapter();
twitterAdapter.configure(configuration);
return twitterAdapter;
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/RSSFeedAdapterFactory.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/RSSFeedAdapterFactory.java
index bbbea38..3cd22e8 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/RSSFeedAdapterFactory.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/RSSFeedAdapterFactory.java
@@ -24,11 +24,11 @@
* RSSFeedAdapter provides the functionality of fetching an RSS based feed.
*/
public class RSSFeedAdapterFactory implements ITypedDatasetAdapterFactory {
-
+ private static final long serialVersionUID = 1L;
public static final String RSS_FEED_ADAPTER_NAME = "rss_feed";
@Override
- public IDatasourceAdapter createAdapter(Map<String, String> configuration) throws Exception {
+ public IDatasourceAdapter createAdapter(Map<String, Object> configuration) throws Exception {
RSSFeedAdapter rssFeedAdapter = new RSSFeedAdapter();
rssFeedAdapter.configure(configuration);
return rssFeedAdapter;
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/data/operator/ExternalDataScanOperatorDescriptor.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/data/operator/ExternalDataScanOperatorDescriptor.java
index fb4cc99..f9f72cf 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/data/operator/ExternalDataScanOperatorDescriptor.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/data/operator/ExternalDataScanOperatorDescriptor.java
@@ -19,8 +19,6 @@
import edu.uci.ics.asterix.external.adapter.factory.IGenericDatasetAdapterFactory;
import edu.uci.ics.asterix.external.dataset.adapter.IDatasourceAdapter;
import edu.uci.ics.asterix.om.types.IAType;
-import edu.uci.ics.hyracks.api.application.ICCApplicationContext;
-import edu.uci.ics.hyracks.api.constraints.IConstraintAcceptor;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
@@ -38,61 +36,24 @@
private static final long serialVersionUID = 1L;
- private final String adapterFactory;
- private final Map<String, String> adapterConfiguration;
+ private final Map<String, Object> adapterConfiguration;
private final IAType atype;
private IGenericDatasetAdapterFactory datasourceAdapterFactory;
- public ExternalDataScanOperatorDescriptor(JobSpecification spec, String adapter, Map<String, String> arguments,
- IAType atype, RecordDescriptor rDesc) {
+ public ExternalDataScanOperatorDescriptor(JobSpecification spec, Map<String, Object> arguments, IAType atype,
+ RecordDescriptor rDesc, IGenericDatasetAdapterFactory dataSourceAdapterFactory) {
super(spec, 0, 1);
recordDescriptors[0] = rDesc;
- this.adapterFactory = adapter;
this.adapterConfiguration = arguments;
this.atype = atype;
- }
-
- @Override
- public void contributeSchedulingConstraints(IConstraintAcceptor constraintAcceptor, ICCApplicationContext appCtx) {
-
- /*
- Comment: The following code is commented out. This is because constraints are being set at compile time so that they can
- be propagated to upstream Asterix operators. Hyracks has to provide a way to propagate constraints to upstream operators.
- Once it is there, we will uncomment the following code.
-
- AlgebricksPartitionConstraint constraint = datasourceReadAdapter.getPartitionConstraint();
- switch (constraint.getPartitionConstraintType()) {
- case ABSOLUTE:
- String[] locations = ((AlgebricksAbsolutePartitionConstraint) constraint).getLocations();
- for (int i = 0; i < locations.length; ++i) {
- constraintAcceptor.addConstraint(new Constraint(new PartitionLocationExpression(this.odId, i),
- new ConstantExpression(locations[i])));
- }
- constraintAcceptor.addConstraint(new Constraint(new PartitionCountExpression(this.odId),
- new ConstantExpression(locations.length)));
-
- break;
- case COUNT:
- constraintAcceptor.addConstraint(new Constraint(new PartitionCountExpression(this.odId),
- new ConstantExpression(((AlgebricksCountPartitionConstraint) constraint).getCount())));
- break;
- default:
- throw new IllegalStateException(" Constraint type :" + constraint.getPartitionConstraintType()
- + " not supported");
-
- }*/
-
+ this.datasourceAdapterFactory = dataSourceAdapterFactory;
}
@Override
public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
IRecordDescriptorProvider recordDescProvider, final int partition, int nPartitions)
throws HyracksDataException {
- try {
- datasourceAdapterFactory = (IGenericDatasetAdapterFactory) Class.forName(adapterFactory).newInstance();
- } catch (Exception e) {
- throw new HyracksDataException("initialization of adapter failed", e);
- }
+
return new AbstractUnaryOutputSourceOperatorNodePushable() {
@Override
public void initialize() throws HyracksDataException {
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/data/operator/FeedIntakeOperatorDescriptor.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/data/operator/FeedIntakeOperatorDescriptor.java
index 2da4e76..f07168a 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/data/operator/FeedIntakeOperatorDescriptor.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/data/operator/FeedIntakeOperatorDescriptor.java
@@ -35,52 +35,46 @@
* Operator responsible for ingesting data from an external source. This
* operator uses a (configurable) adapter associated with the feed dataset.
*/
-public class FeedIntakeOperatorDescriptor extends
- AbstractSingleActivityOperatorDescriptor {
+public class FeedIntakeOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor {
- private static final long serialVersionUID = 1L;
+ private static final long serialVersionUID = 1L;
- private final String adapterFactoryClassName;
- private final Map<String, String> adapterConfiguration;
- private final IAType atype;
- private final FeedId feedId;
+ private final String adapterFactoryClassName;
+ private final Map<String, Object> adapterConfiguration;
+ private final IAType atype;
+ private final FeedId feedId;
+ private final IAdapterFactory datasourceAdapterFactory;
- private transient IAdapterFactory datasourceAdapterFactory;
+ public FeedIntakeOperatorDescriptor(JobSpecification spec, FeedId feedId, String adapter,
+ Map<String, Object> arguments, ARecordType atype, RecordDescriptor rDesc,
+ IAdapterFactory datasourceAdapterFactory) {
+ super(spec, 1, 1);
+ recordDescriptors[0] = rDesc;
+ this.adapterFactoryClassName = adapter;
+ this.adapterConfiguration = arguments;
+ this.atype = atype;
+ this.feedId = feedId;
+ this.datasourceAdapterFactory = datasourceAdapterFactory;
+ }
- public FeedIntakeOperatorDescriptor(JobSpecification spec, FeedId feedId,
- String adapter, Map<String, String> arguments, ARecordType atype,
- RecordDescriptor rDesc) {
- super(spec, 1, 1);
- recordDescriptors[0] = rDesc;
- this.adapterFactoryClassName = adapter;
- this.adapterConfiguration = arguments;
- this.atype = atype;
- this.feedId = feedId;
- }
-
- public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx,
- IRecordDescriptorProvider recordDescProvider, final int partition,
- int nPartitions) throws HyracksDataException {
- ITypedDatasourceAdapter adapter;
- try {
- datasourceAdapterFactory = (IAdapterFactory) Class.forName(
- adapterFactoryClassName).newInstance();
- if (datasourceAdapterFactory instanceof IGenericDatasetAdapterFactory) {
- adapter = (ITypedDatasourceAdapter) ((IGenericDatasetAdapterFactory) datasourceAdapterFactory)
- .createAdapter(adapterConfiguration, atype);
- } else if (datasourceAdapterFactory instanceof ITypedDatasetAdapterFactory) {
- adapter = (ITypedDatasourceAdapter) ((ITypedDatasetAdapterFactory) datasourceAdapterFactory)
- .createAdapter(adapterConfiguration);
- } else {
- throw new IllegalStateException(
- " Unknown adapter factory type for "
- + adapterFactoryClassName);
- }
- adapter.initialize(ctx);
- } catch (Exception e) {
- throw new HyracksDataException("initialization of adapter failed",
- e);
- }
- return new FeedIntakeOperatorNodePushable(feedId, adapter, partition);
- }
+ public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx,
+ IRecordDescriptorProvider recordDescProvider, final int partition, int nPartitions)
+ throws HyracksDataException {
+ ITypedDatasourceAdapter adapter;
+ try {
+ if (datasourceAdapterFactory instanceof IGenericDatasetAdapterFactory) {
+ adapter = (ITypedDatasourceAdapter) ((IGenericDatasetAdapterFactory) datasourceAdapterFactory)
+ .createAdapter(adapterConfiguration, atype);
+ } else if (datasourceAdapterFactory instanceof ITypedDatasetAdapterFactory) {
+ adapter = (ITypedDatasourceAdapter) ((ITypedDatasetAdapterFactory) datasourceAdapterFactory)
+ .createAdapter(adapterConfiguration);
+ } else {
+ throw new IllegalStateException(" Unknown adapter factory type for " + adapterFactoryClassName);
+ }
+ adapter.initialize(ctx);
+ } catch (Exception e) {
+ throw new HyracksDataException("initialization of adapter failed", e);
+ }
+ return new FeedIntakeOperatorNodePushable(feedId, adapter, partition);
+ }
}
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/AbstractDatasourceAdapter.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/AbstractDatasourceAdapter.java
index 440ee8c..23e545d 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/AbstractDatasourceAdapter.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/AbstractDatasourceAdapter.java
@@ -36,7 +36,7 @@
private static final long serialVersionUID = 1L;
- protected Map<String, String> configuration;
+ protected Map<String, Object> configuration;
protected transient AlgebricksPartitionConstraint partitionConstraint;
protected IAType atype;
protected IHyracksTaskContext ctx;
@@ -51,15 +51,15 @@
typeToValueParserFactMap.put(ATypeTag.STRING, UTF8StringParserFactory.INSTANCE);
}
- protected static final Map<String, String> formatToParserFactoryMap = initializeFormatParserFactoryMap();
+ protected static final Map<String, Object> formatToParserFactoryMap = initializeFormatParserFactoryMap();
public static final String KEY_FORMAT = "format";
public static final String KEY_PARSER_FACTORY = "parser";
public static final String FORMAT_DELIMITED_TEXT = "delimited-text";
public static final String FORMAT_ADM = "adm";
- private static Map<String, String> initializeFormatParserFactoryMap() {
- Map<String, String> map = new HashMap<String, String>();
+ private static Map<String, Object> initializeFormatParserFactoryMap() {
+ Map<String, Object> map = new HashMap<String, Object>();
map.put(FORMAT_DELIMITED_TEXT, "edu.uci.ics.asterix.runtime.operators.file.NtDelimitedDataTupleParserFactory");
map.put(FORMAT_ADM, "edu.uci.ics.asterix.runtime.operators.file.AdmSchemafullRecordParserFactory");
return map;
@@ -77,7 +77,7 @@
* @param attribute
* The attribute whose value needs to be obtained.
*/
- public String getAdapterProperty(String attribute) {
+ public Object getAdapterProperty(String attribute) {
return configuration.get(attribute);
}
@@ -86,7 +86,7 @@
*
* @return A Map<String,String> instance representing the adapter configuration.
*/
- public Map<String, String> getConfiguration() {
+ public Map<String, Object> getConfiguration() {
return configuration;
}
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/CNNFeedAdapter.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/CNNFeedAdapter.java
index 3898f7e..8976f7a 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/CNNFeedAdapter.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/CNNFeedAdapter.java
@@ -70,9 +70,9 @@
}
@Override
- public void configure(Map<String, String> arguments) throws Exception {
+ public void configure(Map<String, Object> arguments) throws Exception {
configuration = arguments;
- String rssURLProperty = configuration.get(KEY_RSS_URL);
+ String rssURLProperty = (String) configuration.get(KEY_RSS_URL);
if (rssURLProperty == null) {
throw new IllegalArgumentException("no rss url provided");
}
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/FileSystemBasedAdapter.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/FileSystemBasedAdapter.java
index 9f8cedc..8ab252d 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/FileSystemBasedAdapter.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/FileSystemBasedAdapter.java
@@ -69,7 +69,7 @@
public abstract void initialize(IHyracksTaskContext ctx) throws Exception;
@Override
- public abstract void configure(Map<String, String> arguments) throws Exception;
+ public abstract void configure(Map<String, Object> arguments) throws Exception;
@Override
public abstract AdapterType getAdapterType();
@@ -82,14 +82,14 @@
}
protected void configureFormat() throws Exception {
- String parserFactoryClassname = configuration.get(KEY_PARSER_FACTORY);
+ String parserFactoryClassname = (String) configuration.get(KEY_PARSER_FACTORY);
if (parserFactoryClassname == null) {
- String specifiedFormat = configuration.get(KEY_FORMAT);
+ String specifiedFormat = (String) configuration.get(KEY_FORMAT);
if (specifiedFormat == null) {
throw new IllegalArgumentException(" Unspecified data format");
} else if (FORMAT_DELIMITED_TEXT.equalsIgnoreCase(specifiedFormat)) {
parserFactory = getDelimitedDataTupleParserFactory((ARecordType) atype);
- } else if (FORMAT_ADM.equalsIgnoreCase(configuration.get(KEY_FORMAT))) {
+ } else if (FORMAT_ADM.equalsIgnoreCase((String)configuration.get(KEY_FORMAT))) {
parserFactory = getADMDataTupleParserFactory((ARecordType) atype);
} else {
throw new IllegalArgumentException(" format " + configuration.get(KEY_FORMAT) + " not supported");
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/HDFSAdapter.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/HDFSAdapter.java
index 1e05b2f..02eacf5 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/HDFSAdapter.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/HDFSAdapter.java
@@ -14,18 +14,9 @@
*/
package edu.uci.ics.asterix.external.dataset.adapter;
-import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
-import java.net.UnknownHostException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
import java.util.Map;
-import java.util.Random;
-import java.util.Set;
-import java.util.logging.Level;
-import java.util.logging.Logger;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.Counters.Counter;
@@ -37,14 +28,9 @@
import org.apache.hadoop.mapred.TextInputFormat;
import edu.uci.ics.asterix.om.types.IAType;
-import edu.uci.ics.asterix.om.util.AsterixRuntimeUtil;
-import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
-import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksCountPartitionConstraint;
import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
import edu.uci.ics.hyracks.algebricks.common.exceptions.NotImplementedException;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.dataflow.hadoop.util.InputSplitsProxy;
/**
* Provides functionality for fetching external data stored in an HDFS instance.
@@ -53,118 +39,29 @@
public class HDFSAdapter extends FileSystemBasedAdapter {
private static final long serialVersionUID = 1L;
- private static final Logger LOGGER = Logger.getLogger(HDFSAdapter.class.getName());
- public static final String KEY_HDFS_URL = "hdfs";
- public static final String KEY_INPUT_FORMAT = "input-format";
- public static final String INPUT_FORMAT_TEXT = "text-input-format";
- public static final String INPUT_FORMAT_SEQUENCE = "sequence-input-format";
-
- private Object[] inputSplits;
+ private transient String[] readSchedule;
+ private transient boolean executed[];
+ private transient InputSplit[] inputSplits;
private transient JobConf conf;
- private InputSplitsProxy inputSplitsProxy;
- private static final Map<String, String> formatClassNames = initInputFormatMap();
+ private transient AlgebricksPartitionConstraint clusterLocations;
- private static Map<String, String> initInputFormatMap() {
- Map<String, String> formatClassNames = new HashMap<String, String>();
- formatClassNames.put(INPUT_FORMAT_TEXT, "org.apache.hadoop.mapred.TextInputFormat");
- formatClassNames.put(INPUT_FORMAT_SEQUENCE, "org.apache.hadoop.mapred.SequenceFileInputFormat");
- return formatClassNames;
- }
+ private transient String nodeName;
- public HDFSAdapter(IAType atype) {
+ public HDFSAdapter(IAType atype, String[] readSchedule, boolean[] executed, InputSplit[] inputSplits, JobConf conf,
+ AlgebricksPartitionConstraint clusterLocations) {
super(atype);
+ this.readSchedule = readSchedule;
+ this.executed = executed;
+ this.inputSplits = inputSplits;
+ this.conf = conf;
+ this.clusterLocations = clusterLocations;
}
@Override
- public void configure(Map<String, String> arguments) throws Exception {
- configuration = arguments;
+ public void configure(Map<String, Object> arguments) throws Exception {
+ this.configuration = arguments;
configureFormat();
- configureJobConf();
- configureSplits();
- }
-
- private void configureSplits() throws IOException {
- if (inputSplitsProxy == null) {
- inputSplits = conf.getInputFormat().getSplits(conf, 0);
- }
- inputSplitsProxy = new InputSplitsProxy(conf, inputSplits);
- }
-
- private void configurePartitionConstraint() throws Exception {
- List<String> locations = new ArrayList<String>();
- Random random = new Random();
- boolean couldConfigureLocationConstraints = false;
- try {
- Map<String, Set<String>> nodeControllers = AsterixRuntimeUtil.getNodeControllerMap();
- for (Object inputSplit : inputSplits) {
- String[] dataNodeLocations = ((InputSplit) inputSplit).getLocations();
- if (dataNodeLocations == null || dataNodeLocations.length == 0) {
- throw new IllegalArgumentException("No datanode locations found: check hdfs path");
- }
-
- // loop over all replicas until a split location coincides
- // with an asterix datanode location
- for (String datanodeLocation : dataNodeLocations) {
- Set<String> nodeControllersAtLocation = null;
- try {
- nodeControllersAtLocation = nodeControllers.get(AsterixRuntimeUtil
- .getIPAddress(datanodeLocation));
- } catch (UnknownHostException uhe) {
- if (LOGGER.isLoggable(Level.WARNING)) {
- LOGGER.log(Level.WARNING, "Unknown host :" + datanodeLocation);
- }
- continue;
- }
- if (nodeControllersAtLocation == null || nodeControllersAtLocation.size() == 0) {
- if (LOGGER.isLoggable(Level.WARNING)) {
- LOGGER.log(Level.WARNING, "No node controller found at " + datanodeLocation
- + " will look at replica location");
- }
- couldConfigureLocationConstraints = false;
- } else {
- int locationIndex = random.nextInt(nodeControllersAtLocation.size());
- String chosenLocation = (String) nodeControllersAtLocation.toArray()[locationIndex];
- locations.add(chosenLocation);
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.log(Level.INFO, "split : " + inputSplit + " to be processed by :" + chosenLocation);
- }
- couldConfigureLocationConstraints = true;
- break;
- }
- }
-
- /* none of the replica locations coincides with an Asterix
- node controller location.
- */
- if (!couldConfigureLocationConstraints) {
- List<String> allNodeControllers = AsterixRuntimeUtil.getAllNodeControllers();
- int locationIndex = random.nextInt(allNodeControllers.size());
- String chosenLocation = allNodeControllers.get(locationIndex);
- locations.add(chosenLocation);
- if (LOGGER.isLoggable(Level.SEVERE)) {
- LOGGER.log(Level.SEVERE, "No local node controller found to process split : " + inputSplit
- + " will be processed by a remote node controller:" + chosenLocation);
- }
- }
- }
- partitionConstraint = new AlgebricksAbsolutePartitionConstraint(locations.toArray(new String[] {}));
- } catch (Exception e) {
- if (LOGGER.isLoggable(Level.SEVERE)) {
- LOGGER.log(Level.SEVERE, "Encountered exception :" + e + " using count constraints");
- }
- partitionConstraint = new AlgebricksCountPartitionConstraint(inputSplits.length);
- }
- }
-
- private JobConf configureJobConf() throws Exception {
- conf = new JobConf();
- conf.set("fs.default.name", configuration.get(KEY_HDFS_URL).trim());
- conf.set("fs.hdfs.impl", "org.apache.hadoop.hdfs.DistributedFileSystem");
- conf.setClassLoader(HDFSAdapter.class.getClassLoader());
- conf.set("mapred.input.dir", configuration.get(KEY_PATH).trim());
- conf.set("mapred.input.format.class", formatClassNames.get(configuration.get(KEY_INPUT_FORMAT).trim()));
- return conf;
}
public AdapterType getAdapterType() {
@@ -174,7 +71,7 @@
@Override
public void initialize(IHyracksTaskContext ctx) throws Exception {
this.ctx = ctx;
- inputSplits = inputSplitsProxy.toInputSplits(conf);
+ this.nodeName = ctx.getJobletContext().getApplicationContext().getNodeId();
}
private Reporter getReporter() {
@@ -215,98 +112,124 @@
return reporter;
}
- @SuppressWarnings("unchecked")
@Override
public InputStream getInputStream(int partition) throws IOException {
- try {
- InputStream inputStream;
- if (conf.getInputFormat() instanceof SequenceFileInputFormat) {
- SequenceFileInputFormat format = (SequenceFileInputFormat) conf.getInputFormat();
- RecordReader reader = format.getRecordReader(
- (org.apache.hadoop.mapred.FileSplit) inputSplits[partition], conf, getReporter());
- inputStream = new HDFSStream(reader, ctx);
- } else {
- try {
+
+ return new InputStream() {
+
+ private RecordReader<Object, Text> reader;
+ private Object key;
+ private Text value;
+ private boolean hasMore = false;
+ private int EOL = "\n".getBytes()[0];
+ private Text pendingValue = null;
+ private int currentSplitIndex = 0;
+
+ @SuppressWarnings("unchecked")
+ private boolean moveToNext() throws IOException {
+ for (; currentSplitIndex < inputSplits.length; currentSplitIndex++) {
+ /**
+ * read all the partitions scheduled to the current node
+ */
+ if (readSchedule[currentSplitIndex].equals(nodeName)) {
+ /**
+ * pick an unread split to read
+ * synchronize among simultaneous partitions in the same machine
+ */
+ synchronized (executed) {
+ if (executed[currentSplitIndex] == false) {
+ executed[currentSplitIndex] = true;
+ } else {
+ continue;
+ }
+ }
+
+ /**
+ * read the split
+ */
+ reader = getRecordReader(currentSplitIndex);
+ key = reader.createKey();
+ value = (Text) reader.createValue();
+ return true;
+ }
+ }
+ return false;
+ }
+
+ @Override
+ public int read(byte[] buffer, int offset, int len) throws IOException {
+ if (reader == null) {
+ if (!moveToNext()) {
+ //nothing to read
+ return -1;
+ }
+ }
+
+ int numBytes = 0;
+ if (pendingValue != null) {
+ System.arraycopy(pendingValue.getBytes(), 0, buffer, offset + numBytes, pendingValue.getLength());
+ buffer[offset + numBytes + pendingValue.getLength()] = (byte) EOL;
+ numBytes += pendingValue.getLength() + 1;
+ pendingValue = null;
+ }
+
+ while (numBytes < len) {
+ hasMore = reader.next(key, value);
+ if (!hasMore) {
+ while (moveToNext()) {
+ hasMore = reader.next(key, value);
+ if (hasMore) {
+ //move to the next non-empty split
+ break;
+ }
+ }
+ }
+ if (!hasMore) {
+ return (numBytes == 0) ? -1 : numBytes;
+ }
+ int sizeOfNextTuple = value.getLength() + 1;
+ if (numBytes + sizeOfNextTuple > len) {
+ // cannot add tuple to current buffer
+ // but the reader has moved pass the fetched tuple
+ // we need to store this for a subsequent read call.
+ // and return this then.
+ pendingValue = value;
+ break;
+ } else {
+ System.arraycopy(value.getBytes(), 0, buffer, offset + numBytes, value.getLength());
+ buffer[offset + numBytes + value.getLength()] = (byte) EOL;
+ numBytes += sizeOfNextTuple;
+ }
+ }
+ return numBytes;
+ }
+
+ @Override
+ public int read() throws IOException {
+ throw new NotImplementedException("Use read(byte[], int, int");
+ }
+
+ private RecordReader getRecordReader(int slitIndex) throws IOException {
+ if (conf.getInputFormat() instanceof SequenceFileInputFormat) {
+ SequenceFileInputFormat format = (SequenceFileInputFormat) conf.getInputFormat();
+ RecordReader reader = format.getRecordReader(
+ (org.apache.hadoop.mapred.FileSplit) inputSplits[slitIndex], conf, getReporter());
+ return reader;
+ } else {
TextInputFormat format = (TextInputFormat) conf.getInputFormat();
RecordReader reader = format.getRecordReader(
- (org.apache.hadoop.mapred.FileSplit) inputSplits[partition], conf, getReporter());
- inputStream = new HDFSStream(reader, ctx);
- } catch (FileNotFoundException e) {
- throw new HyracksDataException(e);
+ (org.apache.hadoop.mapred.FileSplit) inputSplits[slitIndex], conf, getReporter());
+ return reader;
}
}
- return inputStream;
- } catch (Exception e) {
- throw new IOException(e);
- }
+
+ };
}
@Override
public AlgebricksPartitionConstraint getPartitionConstraint() throws Exception {
- if (partitionConstraint == null) {
- configurePartitionConstraint();
- }
- return partitionConstraint;
- }
-
-}
-
-class HDFSStream extends InputStream {
-
- private RecordReader<Object, Text> reader;
- private final Object key;
- private final Text value;
- private boolean hasMore = false;
- private static final int EOL = "\n".getBytes()[0];
- private Text pendingValue = null;
-
- public HDFSStream(RecordReader<Object, Text> reader, IHyracksTaskContext ctx) throws Exception {
- this.reader = reader;
- key = reader.createKey();
- try {
- value = (Text) reader.createValue();
- } catch (ClassCastException cce) {
- throw new Exception("value is not of type org.apache.hadoop.io.Text"
- + " type not supported in sequence file format", cce);
- }
- }
-
- @Override
- public int read(byte[] buffer, int offset, int len) throws IOException {
- int numBytes = 0;
- if (pendingValue != null) {
- System.arraycopy(pendingValue.getBytes(), 0, buffer, offset + numBytes, pendingValue.getLength());
- buffer[offset + numBytes + pendingValue.getLength()] = (byte) EOL;
- numBytes += pendingValue.getLength() + 1;
- pendingValue = null;
- }
-
- while (numBytes < len) {
- hasMore = reader.next(key, value);
- if (!hasMore) {
- return (numBytes == 0) ? -1 : numBytes;
- }
- int sizeOfNextTuple = value.getLength() + 1;
- if (numBytes + sizeOfNextTuple > len) {
- // cannot add tuple to current buffer
- // but the reader has moved pass the fetched tuple
- // we need to store this for a subsequent read call.
- // and return this then.
- pendingValue = value;
- break;
- } else {
- System.arraycopy(value.getBytes(), 0, buffer, offset + numBytes, value.getLength());
- buffer[offset + numBytes + value.getLength()] = (byte) EOL;
- numBytes += sizeOfNextTuple;
- }
- }
- return numBytes;
- }
-
- @Override
- public int read() throws IOException {
- throw new NotImplementedException("Use read(byte[], int, int");
+ return clusterLocations;
}
}
\ No newline at end of file
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/HiveAdapter.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/HiveAdapter.java
index 3731eba..7f15a09 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/HiveAdapter.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/HiveAdapter.java
@@ -16,6 +16,9 @@
import java.util.Map;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+
import edu.uci.ics.asterix.om.types.IAType;
import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
import edu.uci.ics.hyracks.api.comm.IFrameWriter;
@@ -24,6 +27,7 @@
/**
* Provides the functionality of fetching data in form of ADM records from a Hive dataset.
*/
+@SuppressWarnings("deprecation")
public class HiveAdapter extends AbstractDatasourceAdapter {
private static final long serialVersionUID = 1L;
@@ -37,8 +41,9 @@
private HDFSAdapter hdfsAdapter;
- public HiveAdapter(IAType atype) {
- this.hdfsAdapter = new HDFSAdapter(atype);
+ public HiveAdapter(IAType atype, String[] readSchedule, boolean[] executed, InputSplit[] inputSplits, JobConf conf,
+ AlgebricksPartitionConstraint clusterLocations) {
+ this.hdfsAdapter = new HDFSAdapter(atype, readSchedule, executed, inputSplits, conf, clusterLocations);
this.atype = atype;
}
@@ -48,33 +53,8 @@
}
@Override
- public void configure(Map<String, String> arguments) throws Exception {
- configuration = arguments;
- configureHadoopAdapter();
- }
+ public void configure(Map<String, Object> arguments) throws Exception {
- private void configureHadoopAdapter() throws Exception {
- String database = configuration.get(HIVE_DATABASE);
- String tablePath = null;
- if (database == null) {
- tablePath = configuration.get(HIVE_WAREHOUSE_DIR) + "/" + configuration.get(HIVE_TABLE);
- } else {
- tablePath = configuration.get(HIVE_WAREHOUSE_DIR) + "/" + tablePath + ".db" + "/"
- + configuration.get(HIVE_TABLE);
- }
- configuration.put(HDFSAdapter.KEY_PATH, tablePath);
- if (!configuration.get(KEY_FORMAT).equals(FORMAT_DELIMITED_TEXT)) {
- throw new IllegalArgumentException("format" + configuration.get(KEY_FORMAT) + " is not supported");
- }
-
- if (!(configuration.get(HDFSAdapter.KEY_INPUT_FORMAT).equals(HDFSAdapter.INPUT_FORMAT_TEXT) || configuration
- .get(HDFSAdapter.KEY_INPUT_FORMAT).equals(HDFSAdapter.INPUT_FORMAT_SEQUENCE))) {
- throw new IllegalArgumentException("file input format" + configuration.get(HDFSAdapter.KEY_INPUT_FORMAT)
- + " is not supported");
- }
-
- hdfsAdapter = new HDFSAdapter(atype);
- hdfsAdapter.configure(configuration);
}
@Override
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/IDatasourceAdapter.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/IDatasourceAdapter.java
index b0dc32f..031f34f 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/IDatasourceAdapter.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/IDatasourceAdapter.java
@@ -70,7 +70,7 @@
* @return String the value corresponding to the configuration parameter
* represented by the key- attributeKey.
*/
- public String getAdapterProperty(String propertyKey);
+ public Object getAdapterProperty(String propertyKey);
/**
* Configures the IDatasourceAdapter instance.
@@ -100,7 +100,7 @@
* providing all arguments as a set of (key,value) pairs. These
* arguments are put into the metadata.
*/
- public void configure(Map<String, String> arguments) throws Exception;
+ public void configure(Map<String, Object> arguments) throws Exception;
/**
* Returns a list of partition constraints. A partition constraint can be a
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/NCFileSystemAdapter.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/NCFileSystemAdapter.java
index ef39d45..9abc92a 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/NCFileSystemAdapter.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/NCFileSystemAdapter.java
@@ -44,9 +44,9 @@
}
@Override
- public void configure(Map<String, String> arguments) throws Exception {
+ public void configure(Map<String, Object> arguments) throws Exception {
this.configuration = arguments;
- String[] splits = arguments.get(KEY_PATH).split(",");
+ String[] splits = ((String) arguments.get(KEY_PATH)).split(",");
configureFileSplits(splits);
configureFormat();
}
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedTwitterAdapter.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedTwitterAdapter.java
index ebfbcad..66d9f98 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedTwitterAdapter.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedTwitterAdapter.java
@@ -31,9 +31,8 @@
*/
public class PullBasedTwitterAdapter extends PullBasedAdapter implements IManagedFeedAdapter {
-
private static final long serialVersionUID = 1L;
-
+
public static final String QUERY = "query";
public static final String INTERVAL = "interval";
@@ -49,7 +48,7 @@
}
@Override
- public void configure(Map<String, String> arguments) throws Exception {
+ public void configure(Map<String, Object> arguments) throws Exception {
configuration = arguments;
String[] fieldNames = { "id", "username", "location", "text", "timestamp" };
IAType[] fieldTypes = { BuiltinType.ASTRING, BuiltinType.ASTRING, BuiltinType.ASTRING, BuiltinType.ASTRING,
@@ -69,12 +68,12 @@
}
@Override
- public void stop() {
+ public void stop() {
tweetClient.stop();
}
@Override
- public void alter(Map<String, String> properties) {
+ public void alter(Map<String, String> properties) {
alterRequested = true;
this.alteredParams = properties;
}
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedTwitterFeedClient.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedTwitterFeedClient.java
index 2a07472..06cddfd 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedTwitterFeedClient.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedTwitterFeedClient.java
@@ -63,8 +63,8 @@
tupleFieldValues = new String[recordType.getFieldNames().length];
}
- public void initialize(Map<String, String> params) {
- this.keywords = params.get(PullBasedTwitterAdapter.QUERY);
+ public void initialize(Map<String, Object> params) {
+ this.keywords = (String) params.get(PullBasedTwitterAdapter.QUERY);
this.query = new Query(keywords);
query.setRpp(100);
}
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/RSSFeedAdapter.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/RSSFeedAdapter.java
index 611183c..ccd6516 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/RSSFeedAdapter.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/RSSFeedAdapter.java
@@ -72,9 +72,9 @@
}
@Override
- public void configure(Map<String, String> arguments) throws Exception {
+ public void configure(Map<String, Object> arguments) throws Exception {
configuration = arguments;
- String rssURLProperty = configuration.get(KEY_RSS_URL);
+ String rssURLProperty = (String) configuration.get(KEY_RSS_URL);
if (rssURLProperty == null) {
throw new IllegalArgumentException("no rss url provided");
}
@@ -94,7 +94,7 @@
}
protected void reconfigure(Map<String, String> arguments) {
- String rssURLProperty = configuration.get(KEY_RSS_URL);
+ String rssURLProperty = (String) configuration.get(KEY_RSS_URL);
if (rssURLProperty != null) {
initializeFeedURLs(rssURLProperty);
}
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java
index 117f492..29bafa9 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java
@@ -34,6 +34,7 @@
import edu.uci.ics.asterix.common.transactions.IResourceManager.ResourceType;
import edu.uci.ics.asterix.common.transactions.JobId;
import edu.uci.ics.asterix.dataflow.data.nontagged.valueproviders.AqlPrimitiveValueProviderFactory;
+import edu.uci.ics.asterix.external.adapter.factory.HDFSAdapterFactory;
import edu.uci.ics.asterix.external.adapter.factory.IAdapterFactory;
import edu.uci.ics.asterix.external.adapter.factory.IGenericDatasetAdapterFactory;
import edu.uci.ics.asterix.external.adapter.factory.ITypedDatasetAdapterFactory;
@@ -101,6 +102,7 @@
import edu.uci.ics.hyracks.algebricks.runtime.base.IPushRuntimeFactory;
import edu.uci.ics.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
import edu.uci.ics.hyracks.algebricks.runtime.operators.std.SinkWriterRuntimeFactory;
+import edu.uci.ics.hyracks.api.context.ICCContext;
import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
import edu.uci.ics.hyracks.api.dataflow.value.ILinearizeComparatorFactory;
@@ -119,6 +121,7 @@
import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
import edu.uci.ics.hyracks.dataflow.std.file.ITupleParserFactory;
import edu.uci.ics.hyracks.dataflow.std.result.ResultWriterOperatorDescriptor;
+import edu.uci.ics.hyracks.hdfs.scheduler.Scheduler;
import edu.uci.ics.hyracks.storage.am.btree.dataflow.BTreeSearchOperatorDescriptor;
import edu.uci.ics.hyracks.storage.am.btree.frames.BTreeNSMInteriorFrameFactory;
import edu.uci.ics.hyracks.storage.am.common.api.IPrimitiveValueProviderFactory;
@@ -157,6 +160,7 @@
private final AsterixStorageProperties storageProperties;
private static final Map<String, String> adapterFactoryMapping = initializeAdapterFactoryMapping();
+ private static Scheduler hdfsScheduler;
public String getPropertyValue(String propertyName) {
return config.get(propertyName);
@@ -178,6 +182,16 @@
this.defaultDataverse = defaultDataverse;
this.stores = AsterixAppContextInfo.getInstance().getMetadataProperties().getStores();
this.storageProperties = AsterixAppContextInfo.getInstance().getStorageProperties();
+ ICCContext ccContext = AsterixAppContextInfo.getInstance().getCCApplicationContext().getCCContext();
+ try {
+ if (hdfsScheduler == null) {
+ //set the singleton hdfs scheduler
+ hdfsScheduler = new Scheduler(ccContext.getClusterControllerInfo().getClientNetAddress(), ccContext
+ .getClusterControllerInfo().getClientNetPort());
+ }
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
}
public void setJobId(JobId jobId) {
@@ -343,8 +357,8 @@
adapterFactory = (IGenericDatasetAdapterFactory) Class.forName(adapterFactoryClassname).newInstance();
}
- adapter = ((IGenericDatasetAdapterFactory) adapterFactory).createAdapter(datasetDetails.getProperties(),
- itemType);
+ adapter = ((IGenericDatasetAdapterFactory) adapterFactory).createAdapter(
+ wrapProperties(datasetDetails.getProperties()), itemType);
} catch (AlgebricksException ae) {
throw ae;
} catch (Exception e) {
@@ -362,7 +376,7 @@
RecordDescriptor scannerDesc = new RecordDescriptor(new ISerializerDeserializer[] { payloadSerde });
ExternalDataScanOperatorDescriptor dataScanner = new ExternalDataScanOperatorDescriptor(jobSpec,
- adapterFactoryClassname, datasetDetails.getProperties(), rt, scannerDesc);
+ wrapPropertiesEmpty(datasetDetails.getProperties()), rt, scannerDesc, adapterFactory);
AlgebricksPartitionConstraint constraint;
try {
@@ -427,14 +441,15 @@
}
if (adapterFactory instanceof ITypedDatasetAdapterFactory) {
- adapter = ((ITypedDatasetAdapterFactory) adapterFactory).createAdapter(datasetDetails.getProperties());
+ adapter = ((ITypedDatasetAdapterFactory) adapterFactory).createAdapter(wrapProperties(datasetDetails
+ .getProperties()));
adapterOutputType = ((ITypedDatasourceAdapter) adapter).getAdapterOutputType();
} else if (adapterFactory instanceof IGenericDatasetAdapterFactory) {
String outputTypeName = datasetDetails.getProperties().get(IGenericDatasetAdapterFactory.KEY_TYPE_NAME);
adapterOutputType = MetadataManager.INSTANCE.getDatatype(mdTxnCtx, dataset.getDataverseName(),
outputTypeName).getDatatype();
adapter = ((IGenericDatasetAdapterFactory) adapterFactory).createAdapter(
- datasetDetails.getProperties(), adapterOutputType);
+ wrapProperties(datasetDetails.getProperties()), adapterOutputType);
} else {
throw new IllegalStateException(" Unknown factory type for " + adapterFactoryClassname);
}
@@ -451,7 +466,8 @@
FeedIntakeOperatorDescriptor feedIngestor = new FeedIntakeOperatorDescriptor(jobSpec, new FeedId(
dataset.getDataverseName(), dataset.getDatasetName()), adapterFactoryClassname,
- datasetDetails.getProperties(), (ARecordType) adapterOutputType, feedDesc);
+ this.wrapPropertiesEmpty(datasetDetails.getProperties()), (ARecordType) adapterOutputType, feedDesc,
+ adapterFactory);
AlgebricksPartitionConstraint constraint = null;
try {
@@ -1484,4 +1500,32 @@
return FormatUtils.getDefaultFormat();
}
+ /**
+ * Add HDFS scheduler and the cluster location constraint into the scheduler
+ *
+ * @param properties
+ * the original dataset properties
+ * @return a new map containing the original dataset properties and the scheduler/locations
+ */
+ private Map<String, Object> wrapProperties(Map<String, String> properties) {
+ Map<String, Object> wrappedProperties = new HashMap<String, Object>();
+ wrappedProperties.putAll(properties);
+ wrappedProperties.put(HDFSAdapterFactory.SCHEDULER, hdfsScheduler);
+ wrappedProperties.put(HDFSAdapterFactory.CLUSTER_LOCATIONS, getClusterLocations());
+ return wrappedProperties;
+ }
+
+ /**
+ * Adapt the original properties to a string-object map
+ *
+ * @param properties
+ * the original properties
+ * @return the new stirng-object map
+ */
+ private Map<String, Object> wrapPropertiesEmpty(Map<String, String> properties) {
+ Map<String, Object> wrappedProperties = new HashMap<String, Object>();
+ wrappedProperties.putAll(properties);
+ return wrappedProperties;
+ }
+
}
diff --git a/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/RateControlledFileSystemBasedAdapter.java b/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/RateControlledFileSystemBasedAdapter.java
index 84b989d..afdf343 100644
--- a/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/RateControlledFileSystemBasedAdapter.java
+++ b/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/RateControlledFileSystemBasedAdapter.java
@@ -19,7 +19,6 @@
import java.util.Map;
import edu.uci.ics.asterix.common.exceptions.AsterixException;
-import edu.uci.ics.asterix.external.adapter.factory.IGenericDatasetAdapterFactory;
import edu.uci.ics.asterix.external.dataset.adapter.FileSystemBasedAdapter;
import edu.uci.ics.asterix.external.dataset.adapter.ITypedDatasourceAdapter;
import edu.uci.ics.asterix.feed.managed.adapter.IManagedFeedAdapter;
@@ -47,48 +46,15 @@
IManagedFeedAdapter {
private static final long serialVersionUID = 1L;
+ private FileSystemBasedAdapter coreAdapter;
+ private String format;
- public static final String KEY_FILE_SYSTEM = "fs";
- public static final String LOCAL_FS = "localfs";
- public static final String HDFS = "hdfs";
-
- private final FileSystemBasedAdapter coreAdapter;
- private final Map<String, String> configuration;
- private final String fileSystem;
- private final String format;
-
- public RateControlledFileSystemBasedAdapter(ARecordType atype, Map<String, String> configuration) throws Exception {
+ public RateControlledFileSystemBasedAdapter(ARecordType atype, Map<String, Object> configuration,
+ FileSystemBasedAdapter coreAdapter, String format) throws Exception {
super(atype);
- checkRequiredArgs(configuration);
- fileSystem = configuration.get(KEY_FILE_SYSTEM);
- String adapterFactoryClass = null;
- if (fileSystem.equalsIgnoreCase(LOCAL_FS)) {
- adapterFactoryClass = "edu.uci.ics.asterix.external.adapter.factory.NCFileSystemAdapterFactory";
- } else if (fileSystem.equals(HDFS)) {
- adapterFactoryClass = "edu.uci.ics.asterix.external.adapter.factory.HDFSAdapterFactory";
- } else {
- throw new AsterixException("Unsupported file system type " + fileSystem);
- }
- format = configuration.get(KEY_FORMAT);
- IGenericDatasetAdapterFactory adapterFactory = (IGenericDatasetAdapterFactory) Class.forName(
- adapterFactoryClass).newInstance();
- coreAdapter = (FileSystemBasedAdapter) adapterFactory.createAdapter(configuration, atype);
this.configuration = configuration;
- }
-
- private void checkRequiredArgs(Map<String, String> configuration) throws Exception {
- if (configuration.get(KEY_FILE_SYSTEM) == null) {
- throw new Exception("File system type not specified. (fs=?) File system could be 'localfs' or 'hdfs'");
- }
- if (configuration.get(IGenericDatasetAdapterFactory.KEY_TYPE_NAME) == null) {
- throw new Exception("Record type not specified (output-type-name=?)");
- }
- if (configuration.get(KEY_PATH) == null) {
- throw new Exception("File path not specified (path=?)");
- }
- if (configuration.get(KEY_FORMAT) == null) {
- throw new Exception("File format not specified (format=?)");
- }
+ this.coreAdapter = coreAdapter;
+ this.format = format;
}
@Override
@@ -103,7 +69,7 @@
}
@Override
- public void configure(Map<String, String> arguments) throws Exception {
+ public void configure(Map<String, Object> arguments) throws Exception {
coreAdapter.configure(arguments);
}
@@ -189,16 +155,16 @@
private final ARecordType recordType;
private final IDataParser dataParser;
- private final Map<String, String> configuration;
+ private final Map<String, Object> configuration;
public RateControlledTupleParserFactory(ARecordType recordType, IValueParserFactory[] valueParserFactories,
- char fieldDelimiter, Map<String, String> configuration) {
+ char fieldDelimiter, Map<String, Object> configuration) {
this.recordType = recordType;
dataParser = new DelimitedDataParser(recordType, valueParserFactories, fieldDelimiter);
this.configuration = configuration;
}
- public RateControlledTupleParserFactory(ARecordType recordType, Map<String, String> configuration) {
+ public RateControlledTupleParserFactory(ARecordType recordType, Map<String, Object> configuration) {
this.recordType = recordType;
dataParser = new ADMDataParser();
this.configuration = configuration;
@@ -221,10 +187,10 @@
public static final String INTER_TUPLE_INTERVAL = "tuple-interval";
public RateControlledTupleParser(IHyracksTaskContext ctx, ARecordType recType, IDataParser dataParser,
- Map<String, String> configuration) {
+ Map<String, Object> configuration) {
super(ctx, recType);
this.dataParser = dataParser;
- String propValue = configuration.get(INTER_TUPLE_INTERVAL);
+ String propValue = (String) configuration.get(INTER_TUPLE_INTERVAL);
if (propValue != null) {
interTupleInterval = Long.parseLong(propValue);
} else {
diff --git a/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/RateControlledFileSystemBasedAdapterFactory.java b/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/RateControlledFileSystemBasedAdapterFactory.java
index 6c32acb..bf1c268 100644
--- a/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/RateControlledFileSystemBasedAdapterFactory.java
+++ b/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/RateControlledFileSystemBasedAdapterFactory.java
@@ -16,7 +16,9 @@
import java.util.Map;
+import edu.uci.ics.asterix.common.exceptions.AsterixException;
import edu.uci.ics.asterix.external.adapter.factory.IGenericDatasetAdapterFactory;
+import edu.uci.ics.asterix.external.dataset.adapter.FileSystemBasedAdapter;
import edu.uci.ics.asterix.external.dataset.adapter.IDatasourceAdapter;
import edu.uci.ics.asterix.om.types.ARecordType;
import edu.uci.ics.asterix.om.types.IAType;
@@ -28,10 +30,37 @@
* source file has been ingested.
*/
public class RateControlledFileSystemBasedAdapterFactory implements IGenericDatasetAdapterFactory {
+ private static final long serialVersionUID = 1L;
+
+ public static final String KEY_FILE_SYSTEM = "fs";
+ public static final String LOCAL_FS = "localfs";
+ public static final String HDFS = "hdfs";
+ public static final String KEY_PATH = "path";
+ public static final String KEY_FORMAT = "format";
+
+ private IGenericDatasetAdapterFactory adapterFactory;
+ private String format;
+ private boolean setup = false;
@Override
- public IDatasourceAdapter createAdapter(Map<String, String> configuration, IAType type) throws Exception {
- return new RateControlledFileSystemBasedAdapter((ARecordType) type, configuration);
+ public IDatasourceAdapter createAdapter(Map<String, Object> configuration, IAType type) throws Exception {
+ if (!setup) {
+ checkRequiredArgs(configuration);
+ String fileSystem = (String) configuration.get(KEY_FILE_SYSTEM);
+ String adapterFactoryClass = null;
+ if (fileSystem.equalsIgnoreCase(LOCAL_FS)) {
+ adapterFactoryClass = "edu.uci.ics.asterix.external.adapter.factory.NCFileSystemAdapterFactory";
+ } else if (fileSystem.equals(HDFS)) {
+ adapterFactoryClass = "edu.uci.ics.asterix.external.adapter.factory.HDFSAdapterFactory";
+ } else {
+ throw new AsterixException("Unsupported file system type " + fileSystem);
+ }
+ format = (String) configuration.get(KEY_FORMAT);
+ adapterFactory = (IGenericDatasetAdapterFactory) Class.forName(adapterFactoryClass).newInstance();
+ setup = true;
+ }
+ return new RateControlledFileSystemBasedAdapter((ARecordType) type, configuration,
+ (FileSystemBasedAdapter) adapterFactory.createAdapter(configuration, type), format);
}
@Override
@@ -39,4 +68,19 @@
return "file_feed";
}
+ private void checkRequiredArgs(Map<String, Object> configuration) throws Exception {
+ if (configuration.get(KEY_FILE_SYSTEM) == null) {
+ throw new Exception("File system type not specified. (fs=?) File system could be 'localfs' or 'hdfs'");
+ }
+ if (configuration.get(IGenericDatasetAdapterFactory.KEY_TYPE_NAME) == null) {
+ throw new Exception("Record type not specified (output-type-name=?)");
+ }
+ if (configuration.get(KEY_PATH) == null) {
+ throw new Exception("File path not specified (path=?)");
+ }
+ if (configuration.get(KEY_FORMAT) == null) {
+ throw new Exception("File format not specified (format=?)");
+ }
+ }
+
}
\ No newline at end of file