cleanup the fix for issue349
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/HiveAdapterFactory.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/HiveAdapterFactory.java
index 176c2cd..1845c07 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/HiveAdapterFactory.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/adapter/factory/HiveAdapterFactory.java
@@ -22,8 +22,10 @@
import org.apache.hadoop.mapred.JobConf;
import edu.uci.ics.asterix.external.dataset.adapter.HDFSAdapter;
+import edu.uci.ics.asterix.external.dataset.adapter.HiveAdapter;
import edu.uci.ics.asterix.external.dataset.adapter.IDatasourceAdapter;
import edu.uci.ics.asterix.om.types.IAType;
+import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
import edu.uci.ics.hyracks.hdfs.dataflow.ConfFactory;
import edu.uci.ics.hyracks.hdfs.dataflow.InputSplitsFactory;
@@ -68,18 +70,24 @@
/** set up the factory --serializable stuff --- this if-block should be called only once for each factory instance */
configureJobConf(configuration);
JobConf conf = configureJobConf(configuration);
+ confFactory = new ConfFactory(conf);
- InputSplit[] inputSplits = conf.getInputFormat().getSplits(conf, 0);
+ clusterLocations = (AlgebricksPartitionConstraint) configuration.get(CLUSTER_LOCATIONS);
+ int numPartitions = ((AlgebricksAbsolutePartitionConstraint) clusterLocations).getLocations().length;
+
+ InputSplit[] inputSplits = conf.getInputFormat().getSplits(conf, numPartitions);
inputSplitsFactory = new InputSplitsFactory(inputSplits);
Scheduler scheduler = (Scheduler) configuration.get(SCHEDULER);
readSchedule = scheduler.getLocationConstraints(inputSplits);
executed = new boolean[readSchedule.length];
Arrays.fill(executed, false);
+
+ setup = true;
}
JobConf conf = confFactory.getConf();
InputSplit[] inputSplits = inputSplitsFactory.getSplits();
- HDFSAdapter hdfsAdapter = new HDFSAdapter(atype, readSchedule, executed, inputSplits, conf, clusterLocations);
+ HiveAdapter hdfsAdapter = new HiveAdapter(atype, readSchedule, executed, inputSplits, conf, clusterLocations);
return hdfsAdapter;
}
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java
index 659a3c3..29bafa9 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java
@@ -1500,6 +1500,13 @@
return FormatUtils.getDefaultFormat();
}
+ /**
+ * Add HDFS scheduler and the cluster location constraint into the scheduler
+ *
+ * @param properties
+ * the original dataset properties
+ * @return a new map containing the original dataset properties and the scheduler/locations
+ */
private Map<String, Object> wrapProperties(Map<String, String> properties) {
Map<String, Object> wrappedProperties = new HashMap<String, Object>();
wrappedProperties.putAll(properties);
@@ -1508,6 +1515,13 @@
return wrappedProperties;
}
+ /**
+ * Adapt the original properties to a string-object map
+ *
+ * @param properties
+ * the original properties
+ * @return the new stirng-object map
+ */
private Map<String, Object> wrapPropertiesEmpty(Map<String, String> properties) {
Map<String, Object> wrappedProperties = new HashMap<String, Object>();
wrappedProperties.putAll(properties);