add LSM support in pregelix
diff --git a/hivesterix-dist/src/main/java/edu/uci/ics/hivesterix/runtime/exec/HyracksExecutionEngine.java b/hivesterix-dist/src/main/java/edu/uci/ics/hivesterix/runtime/exec/HyracksExecutionEngine.java
index 0c2416d..d3bcaca 100644
--- a/hivesterix-dist/src/main/java/edu/uci/ics/hivesterix/runtime/exec/HyracksExecutionEngine.java
+++ b/hivesterix-dist/src/main/java/edu/uci/ics/hivesterix/runtime/exec/HyracksExecutionEngine.java
@@ -12,598 +12,601 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package edu.uci.ics.hivesterix.runtime.exec;
-
-import java.io.BufferedReader;
-import java.io.FileInputStream;
-import java.io.InputStream;
-import java.io.InputStreamReader;
-import java.io.PrintWriter;
-import java.io.Serializable;
-import java.net.InetAddress;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Properties;
-import java.util.Set;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.ql.exec.ConditionalTask;
-import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
-import org.apache.hadoop.hive.ql.exec.MapRedTask;
-import org.apache.hadoop.hive.ql.exec.Operator;
-import org.apache.hadoop.hive.ql.exec.TableScanOperator;
-import org.apache.hadoop.hive.ql.exec.Task;
-import org.apache.hadoop.hive.ql.plan.DynamicPartitionCtx;
-import org.apache.hadoop.hive.ql.plan.FetchWork;
-import org.apache.hadoop.hive.ql.plan.FileSinkDesc;
-import org.apache.hadoop.hive.ql.plan.MapredLocalWork;
-import org.apache.hadoop.hive.ql.plan.MapredWork;
-import org.apache.hadoop.hive.ql.plan.PartitionDesc;
-import org.apache.hadoop.hive.ql.plan.TableScanDesc;
-
-import edu.uci.ics.hivesterix.common.config.ConfUtil;
-import edu.uci.ics.hivesterix.logical.expression.HiveExpressionTypeComputer;
-import edu.uci.ics.hivesterix.logical.expression.HiveMergeAggregationExpressionFactory;
-import edu.uci.ics.hivesterix.logical.expression.HiveNullableTypeComputer;
-import edu.uci.ics.hivesterix.logical.expression.HivePartialAggregationTypeComputer;
-import edu.uci.ics.hivesterix.logical.plan.HiveAlgebricksTranslator;
-import edu.uci.ics.hivesterix.logical.plan.HiveLogicalPlanAndMetaData;
-import edu.uci.ics.hivesterix.optimizer.rulecollections.HiveRuleCollections;
-import edu.uci.ics.hivesterix.runtime.factory.evaluator.HiveExpressionRuntimeProvider;
-import edu.uci.ics.hivesterix.runtime.factory.nullwriter.HiveNullWriterFactory;
-import edu.uci.ics.hivesterix.runtime.inspector.HiveBinaryBooleanInspectorFactory;
-import edu.uci.ics.hivesterix.runtime.inspector.HiveBinaryIntegerInspectorFactory;
-import edu.uci.ics.hivesterix.runtime.jobgen.HiveConnectorPolicyAssignmentPolicy;
-import edu.uci.ics.hivesterix.runtime.jobgen.HiveConnectorPolicyAssignmentPolicy.Policy;
-import edu.uci.ics.hivesterix.runtime.provider.HiveBinaryComparatorFactoryProvider;
-import edu.uci.ics.hivesterix.runtime.provider.HiveBinaryHashFunctionFactoryProvider;
-import edu.uci.ics.hivesterix.runtime.provider.HiveBinaryHashFunctionFamilyProvider;
-import edu.uci.ics.hivesterix.runtime.provider.HiveNormalizedKeyComputerFactoryProvider;
-import edu.uci.ics.hivesterix.runtime.provider.HivePrinterFactoryProvider;
-import edu.uci.ics.hivesterix.runtime.provider.HiveSerializerDeserializerProvider;
-import edu.uci.ics.hivesterix.runtime.provider.HiveTypeTraitProvider;
-import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
-import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
-import edu.uci.ics.hyracks.algebricks.common.utils.Pair;
-import edu.uci.ics.hyracks.algebricks.compiler.api.HeuristicCompilerFactoryBuilder;
-import edu.uci.ics.hyracks.algebricks.compiler.api.HeuristicCompilerFactoryBuilder.DefaultOptimizationContextFactory;
-import edu.uci.ics.hyracks.algebricks.compiler.api.ICompiler;
-import edu.uci.ics.hyracks.algebricks.compiler.api.ICompilerFactory;
-import edu.uci.ics.hyracks.algebricks.compiler.rewriter.rulecontrollers.SequentialFixpointRuleController;
-import edu.uci.ics.hyracks.algebricks.compiler.rewriter.rulecontrollers.SequentialOnceRuleController;
-import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalPlan;
-import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalPlanAndMetadata;
-import edu.uci.ics.hyracks.algebricks.core.algebra.prettyprint.LogicalOperatorPrettyPrintVisitor;
-import edu.uci.ics.hyracks.algebricks.core.algebra.prettyprint.PlanPrettyPrinter;
-import edu.uci.ics.hyracks.algebricks.core.rewriter.base.AbstractRuleController;
-import edu.uci.ics.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule;
-import edu.uci.ics.hyracks.algebricks.core.rewriter.base.PhysicalOptimizationConfig;
-import edu.uci.ics.hyracks.api.client.HyracksConnection;
-import edu.uci.ics.hyracks.api.client.IHyracksClientConnection;
-import edu.uci.ics.hyracks.api.job.JobId;
-import edu.uci.ics.hyracks.api.job.JobSpecification;
-
-@SuppressWarnings({ "rawtypes", "unchecked" })
-public class HyracksExecutionEngine implements IExecutionEngine {
-
- private static final Log LOG = LogFactory.getLog(HyracksExecutionEngine.class.getName());
- private static final String clusterPropertiesPath = "conf/cluster.properties";
- private static final String masterFilePath = "conf/master";
-
- private static List<Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>> DEFAULT_LOGICAL_REWRITES = new ArrayList<Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>>();
- private static List<Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>> DEFAULT_PHYSICAL_REWRITES = new ArrayList<Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>>();
- static {
- SequentialFixpointRuleController seqCtrlNoDfs = new SequentialFixpointRuleController(false);
- SequentialFixpointRuleController seqCtrlFullDfs = new SequentialFixpointRuleController(true);
- SequentialOnceRuleController seqOnceCtrl = new SequentialOnceRuleController(true);
- DEFAULT_LOGICAL_REWRITES.add(new Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>(seqCtrlFullDfs,
- HiveRuleCollections.NORMALIZATION));
- DEFAULT_LOGICAL_REWRITES.add(new Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>(seqCtrlNoDfs,
- HiveRuleCollections.COND_PUSHDOWN_AND_JOIN_INFERENCE));
- DEFAULT_LOGICAL_REWRITES.add(new Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>(seqCtrlFullDfs,
- HiveRuleCollections.LOAD_FIELDS));
- DEFAULT_LOGICAL_REWRITES.add(new Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>(seqCtrlNoDfs,
- HiveRuleCollections.OP_PUSHDOWN));
- DEFAULT_LOGICAL_REWRITES.add(new Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>(seqOnceCtrl,
- HiveRuleCollections.DATA_EXCHANGE));
- DEFAULT_LOGICAL_REWRITES.add(new Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>(seqCtrlNoDfs,
- HiveRuleCollections.CONSOLIDATION));
-
- DEFAULT_PHYSICAL_REWRITES.add(new Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>(seqOnceCtrl,
- HiveRuleCollections.PHYSICAL_PLAN_REWRITES));
- DEFAULT_PHYSICAL_REWRITES.add(new Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>(seqOnceCtrl,
- HiveRuleCollections.prepareJobGenRules));
- }
-
- /**
- * static configurations for compiler
- */
- private HeuristicCompilerFactoryBuilder builder;
-
- /**
- * compiler
- */
- private ICompiler compiler;
-
- /**
- * physical optimization config
- */
- private PhysicalOptimizationConfig physicalOptimizationConfig;
-
- /**
- * final ending operators
- */
- private List<Operator> leaveOps = new ArrayList<Operator>();
-
- /**
- * tasks that are already visited
- */
- private Map<Task<? extends Serializable>, Boolean> tasksVisited = new HashMap<Task<? extends Serializable>, Boolean>();
-
- /**
- * hyracks job spec
- */
- private JobSpecification jobSpec;
-
- /**
- * hive configuration
- */
- private HiveConf conf;
-
- /**
- * plan printer
- */
- private PrintWriter planPrinter;
-
- /**
- * properties
- */
- private Properties clusterProps;
-
- /**
- * the Hyracks client connection
- */
- private IHyracksClientConnection hcc;
-
- public HyracksExecutionEngine(HiveConf conf) {
- this.conf = conf;
- init(conf);
- }
-
- public HyracksExecutionEngine(HiveConf conf, PrintWriter planPrinter) {
- this.conf = conf;
- this.planPrinter = planPrinter;
- init(conf);
- }
-
- private void init(HiveConf conf) {
- builder = new HeuristicCompilerFactoryBuilder(DefaultOptimizationContextFactory.INSTANCE);
- builder.setLogicalRewrites(DEFAULT_LOGICAL_REWRITES);
- builder.setPhysicalRewrites(DEFAULT_PHYSICAL_REWRITES);
- builder.setIMergeAggregationExpressionFactory(HiveMergeAggregationExpressionFactory.INSTANCE);
- builder.setExpressionTypeComputer(HiveExpressionTypeComputer.INSTANCE);
- builder.setNullableTypeComputer(HiveNullableTypeComputer.INSTANCE);
-
- long memSizeExternalGby = conf.getLong("hive.algebricks.groupby.external.memory", 268435456);
- long memSizeExternalSort = conf.getLong("hive.algebricks.sort.memory", 536870912);
- int frameSize = conf.getInt("hive.algebricks.framesize", 32768);
-
- physicalOptimizationConfig = new PhysicalOptimizationConfig();
- int frameLimitExtGby = (int) (memSizeExternalGby / frameSize);
- physicalOptimizationConfig.setMaxFramesExternalGroupBy(frameLimitExtGby);
- int frameLimitExtSort = (int) (memSizeExternalSort / frameSize);
- physicalOptimizationConfig.setMaxFramesExternalSort(frameLimitExtSort);
- builder.setPhysicalOptimizationConfig(physicalOptimizationConfig);
- }
-
- @Override
- public int compileJob(List<Task<? extends Serializable>> rootTasks) {
- // clean up
- leaveOps.clear();
- tasksVisited.clear();
- jobSpec = null;
-
- HashMap<String, PartitionDesc> aliasToPath = new HashMap<String, PartitionDesc>();
- List<Operator> rootOps = generateRootOperatorDAG(rootTasks, aliasToPath);
-
- // get all leave Ops
- getLeaves(rootOps, leaveOps);
-
- HiveAlgebricksTranslator translator = new HiveAlgebricksTranslator();
- try {
- translator.translate(rootOps, null, aliasToPath);
-
- ILogicalPlan plan = translator.genLogicalPlan();
-
- if (plan.getRoots() != null && plan.getRoots().size() > 0 && plan.getRoots().get(0).getValue() != null) {
- translator.printOperators();
- ILogicalPlanAndMetadata planAndMetadata = new HiveLogicalPlanAndMetaData(plan,
- translator.getMetadataProvider());
-
- ICompilerFactory compilerFactory = builder.create();
- compiler = compilerFactory.createCompiler(planAndMetadata.getPlan(),
- planAndMetadata.getMetadataProvider(), translator.getVariableCounter());
-
- // run optimization and re-writing rules for Hive plan
- compiler.optimize();
-
- // print optimized plan
- LogicalOperatorPrettyPrintVisitor pvisitor = new LogicalOperatorPrettyPrintVisitor();
- StringBuilder buffer = new StringBuilder();
- PlanPrettyPrinter.printPlan(plan, buffer, pvisitor, 0);
- String planStr = buffer.toString();
- System.out.println(planStr);
-
- if (planPrinter != null)
- planPrinter.print(planStr);
- }
- } catch (Exception e) {
- e.printStackTrace();
- return 1;
- }
-
- return 0;
- }
-
- private void codeGen() throws AlgebricksException {
- try {
- // number of cpu cores in the cluster
- builder.setClusterLocations(new AlgebricksAbsolutePartitionConstraint(ConfUtil.getNCs()));
- } catch (Exception e) {
- throw new AlgebricksException(e);
- }
- // builder.setClusterTopology(ConfUtil.getClusterTopology());
- builder.setBinaryBooleanInspectorFactory(HiveBinaryBooleanInspectorFactory.INSTANCE);
- builder.setBinaryIntegerInspectorFactory(HiveBinaryIntegerInspectorFactory.INSTANCE);
- builder.setComparatorFactoryProvider(HiveBinaryComparatorFactoryProvider.INSTANCE);
- builder.setExpressionRuntimeProvider(HiveExpressionRuntimeProvider.INSTANCE);
- builder.setHashFunctionFactoryProvider(HiveBinaryHashFunctionFactoryProvider.INSTANCE);
- builder.setPrinterProvider(HivePrinterFactoryProvider.INSTANCE);
- builder.setSerializerDeserializerProvider(HiveSerializerDeserializerProvider.INSTANCE);
- builder.setNullWriterFactory(HiveNullWriterFactory.INSTANCE);
- builder.setNormalizedKeyComputerFactoryProvider(HiveNormalizedKeyComputerFactoryProvider.INSTANCE);
- builder.setPartialAggregationTypeComputer(HivePartialAggregationTypeComputer.INSTANCE);
- builder.setTypeTraitProvider(HiveTypeTraitProvider.INSTANCE);
- builder.setHashFunctionFamilyProvider(HiveBinaryHashFunctionFamilyProvider.INSTANCE);
-
- jobSpec = compiler.createJob(null, null);
-
- // set the policy
- String policyStr = conf.get("hive.hyracks.connectorpolicy");
- if (policyStr == null)
- policyStr = "PIPELINING";
- Policy policyValue = Policy.valueOf(policyStr);
- jobSpec.setConnectorPolicyAssignmentPolicy(new HiveConnectorPolicyAssignmentPolicy(policyValue));
- jobSpec.setUseConnectorPolicyForScheduling(false);
- }
-
- @Override
- public int executeJob() {
- try {
- codeGen();
- executeHyracksJob(jobSpec);
- } catch (Exception e) {
- e.printStackTrace();
- return 1;
- }
- return 0;
- }
-
- private List<Operator> generateRootOperatorDAG(List<Task<? extends Serializable>> rootTasks,
- HashMap<String, PartitionDesc> aliasToPath) {
-
- List<Operator> rootOps = new ArrayList<Operator>();
- List<Task<? extends Serializable>> toDelete = new ArrayList<Task<? extends Serializable>>();
- tasksVisited.clear();
-
- for (int i = rootTasks.size() - 1; i >= 0; i--) {
- /**
- * list of map-reduce tasks
- */
- Task<? extends Serializable> task = rootTasks.get(i);
-
- if (task instanceof MapRedTask) {
- List<Operator> mapRootOps = articulateMapReduceOperators(task, rootOps, aliasToPath, rootTasks);
- if (i == 0)
- rootOps.addAll(mapRootOps);
- else {
- List<Operator> leaves = new ArrayList<Operator>();
- getLeaves(rootOps, leaves);
-
- List<Operator> mapChildren = new ArrayList<Operator>();
- for (Operator childMap : mapRootOps) {
- if (childMap instanceof TableScanOperator) {
- TableScanDesc topDesc = (TableScanDesc) childMap.getConf();
- if (topDesc == null)
- mapChildren.add(childMap);
- else {
- rootOps.add(childMap);
- }
- } else
- mapChildren.add(childMap);
- }
-
- if (mapChildren.size() > 0) {
- for (Operator leaf : leaves)
- leaf.setChildOperators(mapChildren);
- for (Operator child : mapChildren)
- child.setParentOperators(leaves);
- }
- }
-
- MapredWork mr = (MapredWork) task.getWork();
- HashMap<String, PartitionDesc> map = mr.getAliasToPartnInfo();
-
- addAliasToPartition(aliasToPath, map);
- toDelete.add(task);
- }
- }
-
- for (Task<? extends Serializable> task : toDelete)
- rootTasks.remove(task);
-
- return rootOps;
- }
-
- private void addAliasToPartition(HashMap<String, PartitionDesc> aliasToPath, HashMap<String, PartitionDesc> map) {
- Iterator<String> keys = map.keySet().iterator();
- while (keys.hasNext()) {
- String key = keys.next();
- PartitionDesc part = map.get(key);
- String[] names = key.split(":");
- for (String name : names) {
- aliasToPath.put(name, part);
- }
- }
- }
-
- private List<Operator> articulateMapReduceOperators(Task task, List<Operator> rootOps,
- HashMap<String, PartitionDesc> aliasToPath, List<Task<? extends Serializable>> rootTasks) {
- // System.out.println("!"+task.getName());
- if (!(task instanceof MapRedTask)) {
- if (!(task instanceof ConditionalTask)) {
- rootTasks.add(task);
- return null;
- } else {
- // remove map-reduce branches in condition task
- ConditionalTask condition = (ConditionalTask) task;
- List<Task<? extends Serializable>> branches = condition.getListTasks();
- for (int i = branches.size() - 1; i >= 0; i--) {
- Task branch = branches.get(i);
- if (branch instanceof MapRedTask) {
- return articulateMapReduceOperators(branch, rootOps, aliasToPath, rootTasks);
- }
- }
- rootTasks.add(task);
- return null;
- }
- }
-
- MapredWork mr = (MapredWork) task.getWork();
- HashMap<String, PartitionDesc> map = mr.getAliasToPartnInfo();
-
- // put all aliasToParitionDesc mapping into the map
- addAliasToPartition(aliasToPath, map);
-
- MapRedTask mrtask = (MapRedTask) task;
- MapredWork work = (MapredWork) mrtask.getWork();
- HashMap<String, Operator<? extends Serializable>> operators = work.getAliasToWork();
-
- Set entries = operators.entrySet();
- Iterator<Entry<String, Operator>> iterator = entries.iterator();
- List<Operator> mapRootOps = new ArrayList<Operator>();
-
- // get map root operators
- while (iterator.hasNext()) {
- Operator next = iterator.next().getValue();
- if (!mapRootOps.contains(next)) {
- // clear that only for the case of union
- mapRootOps.add(next);
- }
- }
-
- // get map local work
- MapredLocalWork localWork = work.getMapLocalWork();
- if (localWork != null) {
- HashMap<String, Operator<? extends Serializable>> localOperators = localWork.getAliasToWork();
-
- Set localEntries = localOperators.entrySet();
- Iterator<Entry<String, Operator>> localIterator = localEntries.iterator();
- while (localIterator.hasNext()) {
- mapRootOps.add(localIterator.next().getValue());
- }
-
- HashMap<String, FetchWork> localFetch = localWork.getAliasToFetchWork();
- Set localFetchEntries = localFetch.entrySet();
- Iterator<Entry<String, FetchWork>> localFetchIterator = localFetchEntries.iterator();
- while (localFetchIterator.hasNext()) {
- Entry<String, FetchWork> fetchMap = localFetchIterator.next();
- FetchWork fetch = fetchMap.getValue();
- String alias = fetchMap.getKey();
- List<PartitionDesc> dirPart = fetch.getPartDesc();
-
- // temporary hack: put the first partitionDesc into the map
- aliasToPath.put(alias, dirPart.get(0));
- }
- }
-
- Boolean visited = tasksVisited.get(task);
- if (visited != null && visited.booleanValue() == true) {
- return mapRootOps;
- }
-
- // do that only for union operator
- for (Operator op : mapRootOps)
- if (op.getParentOperators() != null)
- op.getParentOperators().clear();
-
- List<Operator> mapLeaves = new ArrayList<Operator>();
- downToLeaves(mapRootOps, mapLeaves);
- List<Operator> reduceOps = new ArrayList<Operator>();
-
- if (work.getReducer() != null)
- reduceOps.add(work.getReducer());
-
- for (Operator mapLeaf : mapLeaves) {
- mapLeaf.setChildOperators(reduceOps);
- }
-
- for (Operator reduceOp : reduceOps) {
- if (reduceOp != null)
- reduceOp.setParentOperators(mapLeaves);
- }
-
- List<Operator> leafs = new ArrayList<Operator>();
- if (reduceOps.size() > 0) {
- downToLeaves(reduceOps, leafs);
- } else {
- leafs = mapLeaves;
- }
-
- List<Operator> mapChildren = new ArrayList<Operator>();
- if (task.getChildTasks() != null && task.getChildTasks().size() > 0) {
- for (Object child : task.getChildTasks()) {
- List<Operator> childMapOps = articulateMapReduceOperators((Task) child, rootOps, aliasToPath, rootTasks);
- if (childMapOps == null)
- continue;
-
- for (Operator childMap : childMapOps) {
- if (childMap instanceof TableScanOperator) {
- TableScanDesc topDesc = (TableScanDesc) childMap.getConf();
- if (topDesc == null)
- mapChildren.add(childMap);
- else {
- rootOps.add(childMap);
- }
- } else {
- // if not table scan, add the child
- mapChildren.add(childMap);
- }
- }
- }
-
- if (mapChildren.size() > 0) {
- int i = 0;
- for (Operator leaf : leafs) {
- if (leaf.getChildOperators() == null || leaf.getChildOperators().size() == 0)
- leaf.setChildOperators(new ArrayList<Operator>());
- leaf.getChildOperators().add(mapChildren.get(i));
- i++;
- }
- i = 0;
- for (Operator child : mapChildren) {
- if (child.getParentOperators() == null || child.getParentOperators().size() == 0)
- child.setParentOperators(new ArrayList<Operator>());
- child.getParentOperators().add(leafs.get(i));
- i++;
- }
- }
- }
-
- // mark this task as visited
- this.tasksVisited.put(task, true);
- return mapRootOps;
- }
-
- /**
- * down to leaf nodes
- *
- * @param ops
- * @param leaves
- */
- private void downToLeaves(List<Operator> ops, List<Operator> leaves) {
-
- // Operator currentOp;
- for (Operator op : ops) {
- if (op != null && op.getChildOperators() != null && op.getChildOperators().size() > 0) {
- downToLeaves(op.getChildOperators(), leaves);
- } else {
- if (op != null && leaves.indexOf(op) < 0)
- leaves.add(op);
- }
- }
- }
-
- private void getLeaves(List<Operator> roots, List<Operator> currentLeaves) {
- for (Operator op : roots) {
- List<Operator> children = op.getChildOperators();
- if (children == null || children.size() <= 0) {
- currentLeaves.add(op);
- } else {
- getLeaves(children, currentLeaves);
- }
- }
- }
-
- private void executeHyracksJob(JobSpecification job) throws Exception {
-
- /**
- * load the properties file if it is not loaded
- */
- if (clusterProps == null) {
- clusterProps = new Properties();
- InputStream confIn = new FileInputStream(clusterPropertiesPath);
- clusterProps.load(confIn);
- confIn.close();
- }
-
- if (hcc == null) {
- BufferedReader ipReader = new BufferedReader(new InputStreamReader(new FileInputStream(masterFilePath)));
- String masterNode = ipReader.readLine();
- ipReader.close();
-
- InetAddress[] ips = InetAddress.getAllByName(masterNode);
- int port = Integer.parseInt(clusterProps.getProperty("CC_CLIENTPORT"));
- for (InetAddress ip : ips) {
- if (ip.getAddress().length <= 4) {
- try {
- hcc = new HyracksConnection(ip.getHostAddress(), port);
- break;
- } catch (Exception e) {
- continue;
- }
- }
- }
- }
-
- long start = System.currentTimeMillis();
- JobId jobId = hcc.startJob(job);
- hcc.waitForCompletion(jobId);
-
- // System.out.println("job finished: " + jobId.toString());
- // call all leave nodes to end
- for (Operator leaf : leaveOps) {
- jobClose(leaf);
- }
-
- long end = System.currentTimeMillis();
- System.err.println(start + " " + end + " " + (end - start));
- }
-
- /**
- * mv to final directory on hdfs (not real final)
- *
- * @param leaf
- * @throws Exception
- */
- private void jobClose(Operator leaf) throws Exception {
- FileSinkOperator fsOp = (FileSinkOperator) leaf;
- FileSinkDesc desc = fsOp.getConf();
- boolean isNativeTable = !desc.getTableInfo().isNonNative();
- if ((conf != null) && isNativeTable) {
- String specPath = desc.getDirName();
- DynamicPartitionCtx dpCtx = desc.getDynPartCtx();
- // for 0.7.0
- fsOp.mvFileToFinalPath(specPath, conf, true, LOG, dpCtx);
- // for 0.8.0
- // Utilities.mvFileToFinalPath(specPath, conf, true, LOG, dpCtx,
- // desc);
- }
- }
-}
+package edu.uci.ics.hivesterix.runtime.exec;
+
+import java.io.BufferedReader;
+import java.io.FileInputStream;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.PrintWriter;
+import java.io.Serializable;
+import java.net.InetAddress;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Properties;
+import java.util.Set;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.ConditionalTask;
+import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
+import org.apache.hadoop.hive.ql.exec.MapRedTask;
+import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.exec.TableScanOperator;
+import org.apache.hadoop.hive.ql.exec.Task;
+import org.apache.hadoop.hive.ql.plan.DynamicPartitionCtx;
+import org.apache.hadoop.hive.ql.plan.FetchWork;
+import org.apache.hadoop.hive.ql.plan.FileSinkDesc;
+import org.apache.hadoop.hive.ql.plan.MapredLocalWork;
+import org.apache.hadoop.hive.ql.plan.MapredWork;
+import org.apache.hadoop.hive.ql.plan.PartitionDesc;
+import org.apache.hadoop.hive.ql.plan.TableScanDesc;
+
+import edu.uci.ics.hivesterix.common.config.ConfUtil;
+import edu.uci.ics.hivesterix.logical.expression.HiveExpressionTypeComputer;
+import edu.uci.ics.hivesterix.logical.expression.HiveMergeAggregationExpressionFactory;
+import edu.uci.ics.hivesterix.logical.expression.HiveNullableTypeComputer;
+import edu.uci.ics.hivesterix.logical.expression.HivePartialAggregationTypeComputer;
+import edu.uci.ics.hivesterix.logical.plan.HiveAlgebricksTranslator;
+import edu.uci.ics.hivesterix.logical.plan.HiveLogicalPlanAndMetaData;
+import edu.uci.ics.hivesterix.optimizer.rulecollections.HiveRuleCollections;
+import edu.uci.ics.hivesterix.runtime.factory.evaluator.HiveExpressionRuntimeProvider;
+import edu.uci.ics.hivesterix.runtime.factory.nullwriter.HiveNullWriterFactory;
+import edu.uci.ics.hivesterix.runtime.inspector.HiveBinaryBooleanInspectorFactory;
+import edu.uci.ics.hivesterix.runtime.inspector.HiveBinaryIntegerInspectorFactory;
+import edu.uci.ics.hivesterix.runtime.jobgen.HiveConnectorPolicyAssignmentPolicy;
+import edu.uci.ics.hivesterix.runtime.jobgen.HiveConnectorPolicyAssignmentPolicy.Policy;
+import edu.uci.ics.hivesterix.runtime.provider.HiveBinaryComparatorFactoryProvider;
+import edu.uci.ics.hivesterix.runtime.provider.HiveBinaryHashFunctionFactoryProvider;
+import edu.uci.ics.hivesterix.runtime.provider.HiveBinaryHashFunctionFamilyProvider;
+import edu.uci.ics.hivesterix.runtime.provider.HiveNormalizedKeyComputerFactoryProvider;
+import edu.uci.ics.hivesterix.runtime.provider.HivePrinterFactoryProvider;
+import edu.uci.ics.hivesterix.runtime.provider.HiveSerializerDeserializerProvider;
+import edu.uci.ics.hivesterix.runtime.provider.HiveTypeTraitProvider;
+import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.common.utils.Pair;
+import edu.uci.ics.hyracks.algebricks.compiler.api.HeuristicCompilerFactoryBuilder;
+import edu.uci.ics.hyracks.algebricks.compiler.api.HeuristicCompilerFactoryBuilder.DefaultOptimizationContextFactory;
+import edu.uci.ics.hyracks.algebricks.compiler.api.ICompiler;
+import edu.uci.ics.hyracks.algebricks.compiler.api.ICompilerFactory;
+import edu.uci.ics.hyracks.algebricks.compiler.rewriter.rulecontrollers.SequentialFixpointRuleController;
+import edu.uci.ics.hyracks.algebricks.compiler.rewriter.rulecontrollers.SequentialOnceRuleController;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalPlan;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalPlanAndMetadata;
+import edu.uci.ics.hyracks.algebricks.core.algebra.prettyprint.LogicalOperatorPrettyPrintVisitor;
+import edu.uci.ics.hyracks.algebricks.core.algebra.prettyprint.PlanPrettyPrinter;
+import edu.uci.ics.hyracks.algebricks.core.rewriter.base.AbstractRuleController;
+import edu.uci.ics.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule;
+import edu.uci.ics.hyracks.algebricks.core.rewriter.base.PhysicalOptimizationConfig;
+import edu.uci.ics.hyracks.api.client.HyracksConnection;
+import edu.uci.ics.hyracks.api.client.IHyracksClientConnection;
+import edu.uci.ics.hyracks.api.job.JobId;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+
+@SuppressWarnings({ "rawtypes", "unchecked" })
+public class HyracksExecutionEngine implements IExecutionEngine {
+
+ private static final Log LOG = LogFactory.getLog(HyracksExecutionEngine.class.getName());
+ private static final String clusterPropertiesPath = "conf/cluster.properties";
+ private static final String masterFilePath = "conf/master";
+
+ private static List<Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>> DEFAULT_LOGICAL_REWRITES = new ArrayList<Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>>();
+ private static List<Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>> DEFAULT_PHYSICAL_REWRITES = new ArrayList<Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>>();
+ static {
+ SequentialFixpointRuleController seqCtrlNoDfs = new SequentialFixpointRuleController(false);
+ SequentialFixpointRuleController seqCtrlFullDfs = new SequentialFixpointRuleController(true);
+ SequentialOnceRuleController seqOnceCtrl = new SequentialOnceRuleController(true);
+ DEFAULT_LOGICAL_REWRITES.add(new Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>(seqCtrlFullDfs,
+ HiveRuleCollections.NORMALIZATION));
+ DEFAULT_LOGICAL_REWRITES.add(new Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>(seqCtrlNoDfs,
+ HiveRuleCollections.COND_PUSHDOWN_AND_JOIN_INFERENCE));
+ DEFAULT_LOGICAL_REWRITES.add(new Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>(seqCtrlFullDfs,
+ HiveRuleCollections.LOAD_FIELDS));
+ DEFAULT_LOGICAL_REWRITES.add(new Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>(seqCtrlNoDfs,
+ HiveRuleCollections.OP_PUSHDOWN));
+ DEFAULT_LOGICAL_REWRITES.add(new Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>(seqOnceCtrl,
+ HiveRuleCollections.DATA_EXCHANGE));
+ DEFAULT_LOGICAL_REWRITES.add(new Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>(seqCtrlNoDfs,
+ HiveRuleCollections.CONSOLIDATION));
+
+ DEFAULT_PHYSICAL_REWRITES.add(new Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>(seqOnceCtrl,
+ HiveRuleCollections.PHYSICAL_PLAN_REWRITES));
+ DEFAULT_PHYSICAL_REWRITES.add(new Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>(seqOnceCtrl,
+ HiveRuleCollections.prepareJobGenRules));
+ }
+
+ /**
+ * static configurations for compiler
+ */
+ private HeuristicCompilerFactoryBuilder builder;
+
+ /**
+ * compiler
+ */
+ private ICompiler compiler;
+
+ /**
+ * physical optimization config
+ */
+ private PhysicalOptimizationConfig physicalOptimizationConfig;
+
+ /**
+ * final ending operators
+ */
+ private List<Operator> leaveOps = new ArrayList<Operator>();
+
+ /**
+ * tasks that are already visited
+ */
+ private Map<Task<? extends Serializable>, Boolean> tasksVisited = new HashMap<Task<? extends Serializable>, Boolean>();
+
+ /**
+ * hyracks job spec
+ */
+ private JobSpecification jobSpec;
+
+ /**
+ * hive configuration
+ */
+ private HiveConf conf;
+
+ /**
+ * plan printer
+ */
+ private PrintWriter planPrinter;
+
+ /**
+ * properties
+ */
+ private Properties clusterProps;
+
+ /**
+ * the Hyracks client connection
+ */
+ private IHyracksClientConnection hcc;
+
+ public HyracksExecutionEngine(HiveConf conf) {
+ this.conf = conf;
+ init(conf);
+ }
+
+ public HyracksExecutionEngine(HiveConf conf, PrintWriter planPrinter) {
+ this.conf = conf;
+ this.planPrinter = planPrinter;
+ init(conf);
+ }
+
+ private void init(HiveConf conf) {
+ builder = new HeuristicCompilerFactoryBuilder(DefaultOptimizationContextFactory.INSTANCE);
+ builder.setLogicalRewrites(DEFAULT_LOGICAL_REWRITES);
+ builder.setPhysicalRewrites(DEFAULT_PHYSICAL_REWRITES);
+ builder.setIMergeAggregationExpressionFactory(HiveMergeAggregationExpressionFactory.INSTANCE);
+ builder.setExpressionTypeComputer(HiveExpressionTypeComputer.INSTANCE);
+ builder.setNullableTypeComputer(HiveNullableTypeComputer.INSTANCE);
+
+ long memSizeExternalGby = conf.getLong("hive.algebricks.groupby.external.memory", 268435456);
+ long memSizeExternalSort = conf.getLong("hive.algebricks.sort.memory", 536870912);
+ int frameSize = conf.getInt("hive.algebricks.framesize", 32768);
+
+ physicalOptimizationConfig = new PhysicalOptimizationConfig();
+ int frameLimitExtGby = (int) (memSizeExternalGby / frameSize);
+ physicalOptimizationConfig.setMaxFramesExternalGroupBy(frameLimitExtGby);
+ int frameLimitExtSort = (int) (memSizeExternalSort / frameSize);
+ physicalOptimizationConfig.setMaxFramesExternalSort(frameLimitExtSort);
+ builder.setPhysicalOptimizationConfig(physicalOptimizationConfig);
+ }
+
+ @Override
+ public int compileJob(List<Task<? extends Serializable>> rootTasks) {
+ // clean up
+ leaveOps.clear();
+ tasksVisited.clear();
+ jobSpec = null;
+
+ HashMap<String, PartitionDesc> aliasToPath = new HashMap<String, PartitionDesc>();
+ List<Operator> rootOps = generateRootOperatorDAG(rootTasks, aliasToPath);
+
+ // get all leave Ops
+ getLeaves(rootOps, leaveOps);
+
+ HiveAlgebricksTranslator translator = new HiveAlgebricksTranslator();
+ try {
+ translator.translate(rootOps, null, aliasToPath);
+
+ ILogicalPlan plan = translator.genLogicalPlan();
+
+ if (plan.getRoots() != null && plan.getRoots().size() > 0 && plan.getRoots().get(0).getValue() != null) {
+ translator.printOperators();
+ ILogicalPlanAndMetadata planAndMetadata = new HiveLogicalPlanAndMetaData(plan,
+ translator.getMetadataProvider());
+
+ ICompilerFactory compilerFactory = builder.create();
+ compiler = compilerFactory.createCompiler(planAndMetadata.getPlan(),
+ planAndMetadata.getMetadataProvider(), translator.getVariableCounter());
+
+ // run optimization and re-writing rules for Hive plan
+ compiler.optimize();
+
+ // print optimized plan
+ LogicalOperatorPrettyPrintVisitor pvisitor = new LogicalOperatorPrettyPrintVisitor();
+ StringBuilder buffer = new StringBuilder();
+ PlanPrettyPrinter.printPlan(plan, buffer, pvisitor, 0);
+ String planStr = buffer.toString();
+ System.out.println(planStr);
+
+ if (planPrinter != null)
+ planPrinter.print(planStr);
+ } else {
+ /** it is not a map reduce task DAG */
+ return 2;
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ return 1;
+ }
+
+ return 0;
+ }
+
+ private void codeGen() throws AlgebricksException {
+ try {
+ // number of cpu cores in the cluster
+ builder.setClusterLocations(new AlgebricksAbsolutePartitionConstraint(ConfUtil.getNCs()));
+ } catch (Exception e) {
+ throw new AlgebricksException(e);
+ }
+ // builder.setClusterTopology(ConfUtil.getClusterTopology());
+ builder.setBinaryBooleanInspectorFactory(HiveBinaryBooleanInspectorFactory.INSTANCE);
+ builder.setBinaryIntegerInspectorFactory(HiveBinaryIntegerInspectorFactory.INSTANCE);
+ builder.setComparatorFactoryProvider(HiveBinaryComparatorFactoryProvider.INSTANCE);
+ builder.setExpressionRuntimeProvider(HiveExpressionRuntimeProvider.INSTANCE);
+ builder.setHashFunctionFactoryProvider(HiveBinaryHashFunctionFactoryProvider.INSTANCE);
+ builder.setPrinterProvider(HivePrinterFactoryProvider.INSTANCE);
+ builder.setSerializerDeserializerProvider(HiveSerializerDeserializerProvider.INSTANCE);
+ builder.setNullWriterFactory(HiveNullWriterFactory.INSTANCE);
+ builder.setNormalizedKeyComputerFactoryProvider(HiveNormalizedKeyComputerFactoryProvider.INSTANCE);
+ builder.setPartialAggregationTypeComputer(HivePartialAggregationTypeComputer.INSTANCE);
+ builder.setTypeTraitProvider(HiveTypeTraitProvider.INSTANCE);
+ builder.setHashFunctionFamilyProvider(HiveBinaryHashFunctionFamilyProvider.INSTANCE);
+
+ jobSpec = compiler.createJob(null, null);
+
+ // set the policy
+ String policyStr = conf.get("hive.hyracks.connectorpolicy");
+ if (policyStr == null)
+ policyStr = "PIPELINING";
+ Policy policyValue = Policy.valueOf(policyStr);
+ jobSpec.setConnectorPolicyAssignmentPolicy(new HiveConnectorPolicyAssignmentPolicy(policyValue));
+ jobSpec.setUseConnectorPolicyForScheduling(false);
+ }
+
+ @Override
+ public int executeJob() {
+ try {
+ codeGen();
+ executeHyracksJob(jobSpec);
+ } catch (Exception e) {
+ e.printStackTrace();
+ return 1;
+ }
+ return 0;
+ }
+
+ private List<Operator> generateRootOperatorDAG(List<Task<? extends Serializable>> rootTasks,
+ HashMap<String, PartitionDesc> aliasToPath) {
+
+ List<Operator> rootOps = new ArrayList<Operator>();
+ List<Task<? extends Serializable>> toDelete = new ArrayList<Task<? extends Serializable>>();
+ tasksVisited.clear();
+
+ for (int i = rootTasks.size() - 1; i >= 0; i--) {
+ /**
+ * list of map-reduce tasks
+ */
+ Task<? extends Serializable> task = rootTasks.get(i);
+
+ if (task instanceof MapRedTask) {
+ List<Operator> mapRootOps = articulateMapReduceOperators(task, rootOps, aliasToPath, rootTasks);
+ if (i == 0)
+ rootOps.addAll(mapRootOps);
+ else {
+ List<Operator> leaves = new ArrayList<Operator>();
+ getLeaves(rootOps, leaves);
+
+ List<Operator> mapChildren = new ArrayList<Operator>();
+ for (Operator childMap : mapRootOps) {
+ if (childMap instanceof TableScanOperator) {
+ TableScanDesc topDesc = (TableScanDesc) childMap.getConf();
+ if (topDesc == null)
+ mapChildren.add(childMap);
+ else {
+ rootOps.add(childMap);
+ }
+ } else
+ mapChildren.add(childMap);
+ }
+
+ if (mapChildren.size() > 0) {
+ for (Operator leaf : leaves)
+ leaf.setChildOperators(mapChildren);
+ for (Operator child : mapChildren)
+ child.setParentOperators(leaves);
+ }
+ }
+
+ MapredWork mr = (MapredWork) task.getWork();
+ HashMap<String, PartitionDesc> map = mr.getAliasToPartnInfo();
+
+ addAliasToPartition(aliasToPath, map);
+ toDelete.add(task);
+ }
+ }
+
+ for (Task<? extends Serializable> task : toDelete)
+ rootTasks.remove(task);
+
+ return rootOps;
+ }
+
+ private void addAliasToPartition(HashMap<String, PartitionDesc> aliasToPath, HashMap<String, PartitionDesc> map) {
+ Iterator<String> keys = map.keySet().iterator();
+ while (keys.hasNext()) {
+ String key = keys.next();
+ PartitionDesc part = map.get(key);
+ String[] names = key.split(":");
+ for (String name : names) {
+ aliasToPath.put(name, part);
+ }
+ }
+ }
+
+ private List<Operator> articulateMapReduceOperators(Task task, List<Operator> rootOps,
+ HashMap<String, PartitionDesc> aliasToPath, List<Task<? extends Serializable>> rootTasks) {
+ // System.out.println("!"+task.getName());
+ if (!(task instanceof MapRedTask)) {
+ if (!(task instanceof ConditionalTask)) {
+ rootTasks.add(task);
+ return null;
+ } else {
+ // remove map-reduce branches in condition task
+ ConditionalTask condition = (ConditionalTask) task;
+ List<Task<? extends Serializable>> branches = condition.getListTasks();
+ for (int i = branches.size() - 1; i >= 0; i--) {
+ Task branch = branches.get(i);
+ if (branch instanceof MapRedTask) {
+ return articulateMapReduceOperators(branch, rootOps, aliasToPath, rootTasks);
+ }
+ }
+ rootTasks.add(task);
+ return null;
+ }
+ }
+
+ MapredWork mr = (MapredWork) task.getWork();
+ HashMap<String, PartitionDesc> map = mr.getAliasToPartnInfo();
+
+ // put all aliasToParitionDesc mapping into the map
+ addAliasToPartition(aliasToPath, map);
+
+ MapRedTask mrtask = (MapRedTask) task;
+ MapredWork work = (MapredWork) mrtask.getWork();
+ HashMap<String, Operator<? extends Serializable>> operators = work.getAliasToWork();
+
+ Set entries = operators.entrySet();
+ Iterator<Entry<String, Operator>> iterator = entries.iterator();
+ List<Operator> mapRootOps = new ArrayList<Operator>();
+
+ // get map root operators
+ while (iterator.hasNext()) {
+ Operator next = iterator.next().getValue();
+ if (!mapRootOps.contains(next)) {
+ // clear that only for the case of union
+ mapRootOps.add(next);
+ }
+ }
+
+ // get map local work
+ MapredLocalWork localWork = work.getMapLocalWork();
+ if (localWork != null) {
+ HashMap<String, Operator<? extends Serializable>> localOperators = localWork.getAliasToWork();
+
+ Set localEntries = localOperators.entrySet();
+ Iterator<Entry<String, Operator>> localIterator = localEntries.iterator();
+ while (localIterator.hasNext()) {
+ mapRootOps.add(localIterator.next().getValue());
+ }
+
+ HashMap<String, FetchWork> localFetch = localWork.getAliasToFetchWork();
+ Set localFetchEntries = localFetch.entrySet();
+ Iterator<Entry<String, FetchWork>> localFetchIterator = localFetchEntries.iterator();
+ while (localFetchIterator.hasNext()) {
+ Entry<String, FetchWork> fetchMap = localFetchIterator.next();
+ FetchWork fetch = fetchMap.getValue();
+ String alias = fetchMap.getKey();
+ List<PartitionDesc> dirPart = fetch.getPartDesc();
+
+ // temporary hack: put the first partitionDesc into the map
+ aliasToPath.put(alias, dirPart.get(0));
+ }
+ }
+
+ Boolean visited = tasksVisited.get(task);
+ if (visited != null && visited.booleanValue() == true) {
+ return mapRootOps;
+ }
+
+ // do that only for union operator
+ for (Operator op : mapRootOps)
+ if (op.getParentOperators() != null)
+ op.getParentOperators().clear();
+
+ List<Operator> mapLeaves = new ArrayList<Operator>();
+ downToLeaves(mapRootOps, mapLeaves);
+ List<Operator> reduceOps = new ArrayList<Operator>();
+
+ if (work.getReducer() != null)
+ reduceOps.add(work.getReducer());
+
+ for (Operator mapLeaf : mapLeaves) {
+ mapLeaf.setChildOperators(reduceOps);
+ }
+
+ for (Operator reduceOp : reduceOps) {
+ if (reduceOp != null)
+ reduceOp.setParentOperators(mapLeaves);
+ }
+
+ List<Operator> leafs = new ArrayList<Operator>();
+ if (reduceOps.size() > 0) {
+ downToLeaves(reduceOps, leafs);
+ } else {
+ leafs = mapLeaves;
+ }
+
+ List<Operator> mapChildren = new ArrayList<Operator>();
+ if (task.getChildTasks() != null && task.getChildTasks().size() > 0) {
+ for (Object child : task.getChildTasks()) {
+ List<Operator> childMapOps = articulateMapReduceOperators((Task) child, rootOps, aliasToPath, rootTasks);
+ if (childMapOps == null)
+ continue;
+
+ for (Operator childMap : childMapOps) {
+ if (childMap instanceof TableScanOperator) {
+ TableScanDesc topDesc = (TableScanDesc) childMap.getConf();
+ if (topDesc == null)
+ mapChildren.add(childMap);
+ else {
+ rootOps.add(childMap);
+ }
+ } else {
+ // if not table scan, add the child
+ mapChildren.add(childMap);
+ }
+ }
+ }
+
+ if (mapChildren.size() > 0) {
+ int i = 0;
+ for (Operator leaf : leafs) {
+ if (leaf.getChildOperators() == null || leaf.getChildOperators().size() == 0)
+ leaf.setChildOperators(new ArrayList<Operator>());
+ leaf.getChildOperators().add(mapChildren.get(i));
+ i++;
+ }
+ i = 0;
+ for (Operator child : mapChildren) {
+ if (child.getParentOperators() == null || child.getParentOperators().size() == 0)
+ child.setParentOperators(new ArrayList<Operator>());
+ child.getParentOperators().add(leafs.get(i));
+ i++;
+ }
+ }
+ }
+
+ // mark this task as visited
+ this.tasksVisited.put(task, true);
+ return mapRootOps;
+ }
+
+ /**
+ * down to leaf nodes
+ *
+ * @param ops
+ * @param leaves
+ */
+ private void downToLeaves(List<Operator> ops, List<Operator> leaves) {
+
+ // Operator currentOp;
+ for (Operator op : ops) {
+ if (op != null && op.getChildOperators() != null && op.getChildOperators().size() > 0) {
+ downToLeaves(op.getChildOperators(), leaves);
+ } else {
+ if (op != null && leaves.indexOf(op) < 0)
+ leaves.add(op);
+ }
+ }
+ }
+
+ private void getLeaves(List<Operator> roots, List<Operator> currentLeaves) {
+ for (Operator op : roots) {
+ List<Operator> children = op.getChildOperators();
+ if (children == null || children.size() <= 0) {
+ currentLeaves.add(op);
+ } else {
+ getLeaves(children, currentLeaves);
+ }
+ }
+ }
+
+ private void executeHyracksJob(JobSpecification job) throws Exception {
+
+ /**
+ * load the properties file if it is not loaded
+ */
+ if (clusterProps == null) {
+ clusterProps = new Properties();
+ InputStream confIn = new FileInputStream(clusterPropertiesPath);
+ clusterProps.load(confIn);
+ confIn.close();
+ }
+
+ if (hcc == null) {
+ BufferedReader ipReader = new BufferedReader(new InputStreamReader(new FileInputStream(masterFilePath)));
+ String masterNode = ipReader.readLine();
+ ipReader.close();
+
+ InetAddress[] ips = InetAddress.getAllByName(masterNode);
+ int port = Integer.parseInt(clusterProps.getProperty("CC_CLIENTPORT"));
+ for (InetAddress ip : ips) {
+ if (ip.getAddress().length <= 4) {
+ try {
+ hcc = new HyracksConnection(ip.getHostAddress(), port);
+ break;
+ } catch (Exception e) {
+ continue;
+ }
+ }
+ }
+ }
+
+ long start = System.currentTimeMillis();
+ JobId jobId = hcc.startJob(job);
+ hcc.waitForCompletion(jobId);
+
+ // System.out.println("job finished: " + jobId.toString());
+ // call all leave nodes to end
+ for (Operator leaf : leaveOps) {
+ jobClose(leaf);
+ }
+
+ long end = System.currentTimeMillis();
+ System.err.println(start + " " + end + " " + (end - start));
+ }
+
+ /**
+ * mv to final directory on hdfs (not real final)
+ *
+ * @param leaf
+ * @throws Exception
+ */
+ private void jobClose(Operator leaf) throws Exception {
+ FileSinkOperator fsOp = (FileSinkOperator) leaf;
+ FileSinkDesc desc = fsOp.getConf();
+ boolean isNativeTable = !desc.getTableInfo().isNonNative();
+ if ((conf != null) && isNativeTable) {
+ String specPath = desc.getDirName();
+ DynamicPartitionCtx dpCtx = desc.getDynPartCtx();
+ // for 0.7.0
+ fsOp.mvFileToFinalPath(specPath, conf, true, LOG, dpCtx);
+ // for 0.8.0
+ // Utilities.mvFileToFinalPath(specPath, conf, true, LOG, dpCtx,
+ // desc);
+ }
+ }
+}
diff --git a/hivesterix-dist/src/main/java/org/apache/hadoop/hive/ql/Driver.java b/hivesterix-dist/src/main/java/org/apache/hadoop/hive/ql/Driver.java
index 4c40f5d..4ef74e9 100644
--- a/hivesterix-dist/src/main/java/org/apache/hadoop/hive/ql/Driver.java
+++ b/hivesterix-dist/src/main/java/org/apache/hadoop/hive/ql/Driver.java
@@ -444,10 +444,11 @@
// hyracks run
if (sem instanceof SemanticAnalyzer && command.toLowerCase().indexOf("create") < 0) {
- hivesterix = true;
- return engine.compileJob(sem.getRootTasks());
+ int engineRet = engine.compileJob(sem.getRootTasks());
+ if (engineRet == 0) {
+ hivesterix = true;
+ }
}
-
return 0;
} catch (SemanticException e) {
errorMessage = "FAILED: Error in semantic analysis: " + e.getMessage();