refactoring hivesterix codebase

git-svn-id: https://hyracks.googlecode.com/svn/branches/fullstack_release_cleanup@3080 123451ca-8445-de46-9d55-352943316053
diff --git a/hivesterix/hivesterix-common/src/main/java/edu/uci/ics/hivesterix/common/config/ConfUtil.java b/hivesterix/hivesterix-common/src/main/java/edu/uci/ics/hivesterix/common/config/ConfUtil.java
index 025f423..d41bdc8 100644
--- a/hivesterix/hivesterix-common/src/main/java/edu/uci/ics/hivesterix/common/config/ConfUtil.java
+++ b/hivesterix/hivesterix-common/src/main/java/edu/uci/ics/hivesterix/common/config/ConfUtil.java
@@ -1,10 +1,16 @@
 package edu.uci.ics.hivesterix.common.config;

 

+import java.io.BufferedReader;

+import java.io.DataInputStream;

+import java.io.FileInputStream;

+import java.io.InputStream;

+import java.io.InputStreamReader;

 import java.net.InetAddress;

 import java.util.ArrayList;

 import java.util.HashMap;

 import java.util.List;

 import java.util.Map;

+import java.util.Properties;

 

 import org.apache.hadoop.fs.Path;

 import org.apache.hadoop.hive.conf.HiveConf;

@@ -28,6 +34,8 @@
     private static Map<String, List<String>> ncMapping;

     private static IHyracksClientConnection hcc = null;

     private static ClusterTopology topology = null;

+    private static final String clusterPropertiesPath = "conf/cluster.properties";

+    private static Properties clusterProps;

 

     public static JobConf getJobConf(Class<? extends InputFormat> format, Path path) {

         JobConf conf = new JobConf();

@@ -104,9 +112,24 @@
     private static void loadClusterConfig() {

         try {

             getHiveConf();

-            String ipAddress = hconf.get("hive.hyracks.host");

-            int port = Integer.parseInt(hconf.get("hive.hyracks.port"));

+

+            /**

+             * load the properties file if it is not loaded

+             */

+            if (clusterProps == null) {

+                clusterProps = new Properties();

+                InputStream confIn = new FileInputStream(clusterPropertiesPath);

+                clusterProps.load(confIn);

+                confIn.close();

+            }

+            Process process = Runtime.getRuntime().exec("src/main/resources/scripts/getip.sh");

+            BufferedReader ipReader = new BufferedReader(new InputStreamReader(new DataInputStream(

+                    process.getInputStream())));

+            String ipAddress = ipReader.readLine();

+            ipReader.close();

+            int port = Integer.parseInt(clusterProps.getProperty("CC_CLIENTPORT"));

             int mpl = Integer.parseInt(hconf.get("hive.hyracks.parrallelism"));

+

             hcc = new HyracksConnection(ipAddress, port);

             topology = hcc.getClusterTopology();

             Map<String, NodeControllerInfo> ncNameToNcInfos = hcc.getNodeControllerInfos();

diff --git a/hivesterix/hivesterix-dist/conf/cluster b/hivesterix/hivesterix-dist/conf/cluster
deleted file mode 100644
index 6cc8cca..0000000
--- a/hivesterix/hivesterix-dist/conf/cluster
+++ /dev/null
@@ -1,11 +0,0 @@
-4
-10.0.0.1 asterix-001
-10.0.0.2 asterix-002
-10.0.0.3 asterix-003
-10.0.0.4 asterix-004
-10.0.0.5 asterix-005
-10.0.0.6 asterix-006
-10.0.0.7 asterix-007
-10.0.0.8 asterix-008
-10.0.0.9 asterix-009
-10.0.0.10 asterix-010
diff --git a/hivesterix/hivesterix-dist/conf/cluster.properties b/hivesterix/hivesterix-dist/conf/cluster.properties
new file mode 100644
index 0000000..2d2401a
--- /dev/null
+++ b/hivesterix/hivesterix-dist/conf/cluster.properties
@@ -0,0 +1,37 @@
+#The CC port for Hyracks clients
+CC_CLIENTPORT=3099
+
+#The CC port for Hyracks cluster management
+CC_CLUSTERPORT=1099
+
+#The directory of hyracks binaries
+HYRACKS_HOME=../../../../hyracks
+
+#The tmp directory for cc to install jars
+CCTMP_DIR=/tmp/t1
+
+#The tmp directory for nc to install jars
+NCTMP_DIR=/tmp/t2
+
+#The directory to put cc logs
+CCLOGS_DIR=$CCTMP_DIR/logs
+
+#The directory to put nc logs
+NCLOGS_DIR=$NCTMP_DIR/logs
+
+#Comma separated I/O directories for the spilling of external sort
+IO_DIRS="/tmp/t3,/tmp/t4"
+
+#The JAVA_HOME
+JAVA_HOME=$JAVA_HOME
+
+#The frame size of the internal dataflow engine
+FRAME_SIZE=65536
+
+#CC JAVA_OPTS
+CCJAVA_OPTS="-Xdebug -Xrunjdwp:transport=dt_socket,address=7001,server=y,suspend=n -Xmx1g -Djava.util.logging.config.file=logging.properties"
+# Yourkit option: -agentpath:/grid/0/dev/vborkar/tools/yjp-10.0.4/bin/linux-x86-64/libyjpagent.so=port=20001"
+
+#NC JAVA_OPTS
+NCJAVA_OPTS="-Xdebug -Xrunjdwp:transport=dt_socket,address=7002,server=y,suspend=n -Xmx1g -Djava.util.logging.config.file=logging.properties"
+
diff --git a/hivesterix/hivesterix-dist/conf/debugnc.properties b/hivesterix/hivesterix-dist/conf/debugnc.properties
new file mode 100755
index 0000000..27afa26
--- /dev/null
+++ b/hivesterix/hivesterix-dist/conf/debugnc.properties
@@ -0,0 +1,12 @@
+#The tmp directory for nc to install jars
+NCTMP_DIR2=/tmp/t-1
+
+#The directory to put nc logs
+NCLOGS_DIR2=$NCTMP_DIR/logs
+
+#Comma separated I/O directories for the spilling of external sort
+IO_DIRS2="/tmp/t-2,/tmp/t-3"
+
+#NC JAVA_OPTS
+NCJAVA_OPTS2="-Xdebug -Xrunjdwp:transport=dt_socket,address=7003,server=y,suspend=n -Xmx1g -Djava.util.logging.config.file=logging.properties"
+
diff --git a/hivesterix/hivesterix-dist/conf/hive-default.xml b/hivesterix/hivesterix-dist/conf/hive-default.xml
index 034ea61..587eede 100644
--- a/hivesterix/hivesterix-dist/conf/hive-default.xml
+++ b/hivesterix/hivesterix-dist/conf/hive-default.xml
@@ -23,22 +23,11 @@
 			By setting this property to -1, Hive will automatically figure out what
 			should be the number of reducers.
   </description>
-	</property>
 
-	<property>
-		<name>hive.hyracks.host</name>
-		<value>128.195.14.4</value>
-	</property>
-
-	<property>
-		<name>hive.hyracks.port</name>
-		<value>3099</value>
-	</property>
-
-	<property>
-		<name>hive.hyracks.app</name>
-		<value>hivesterix</value>
-	</property>
+        <property>
+		<name>hive.hyracks.connectorpolicy</name>
+		<value>PIPELINING</value>
+        </property>
 
 	<property>
 		<name>hive.hyracks.parrallelism</name>
@@ -52,12 +41,12 @@
 	
 	<property>
 		<name>hive.algebricks.groupby.external.memory</name>
-		<value>536870912</value>
+		<value>33554432</value>
 	</property>
 	
 	<property>
 		<name>hive.algebricks.sort.memory</name>
-		<value>536870912</value>
+		<value>33554432</value>
 	</property>
 
 	<property>
diff --git a/hivesterix/hivesterix-dist/conf/master b/hivesterix/hivesterix-dist/conf/master
new file mode 100644
index 0000000..2fbb50c
--- /dev/null
+++ b/hivesterix/hivesterix-dist/conf/master
@@ -0,0 +1 @@
+localhost
diff --git a/hivesterix/hivesterix-dist/conf/slaves b/hivesterix/hivesterix-dist/conf/slaves
new file mode 100644
index 0000000..2fbb50c
--- /dev/null
+++ b/hivesterix/hivesterix-dist/conf/slaves
@@ -0,0 +1 @@
+localhost
diff --git a/hivesterix/hivesterix-dist/src/main/java/edu/uci/ics/hivesterix/runtime/exec/HyracksExecutionEngine.java b/hivesterix/hivesterix-dist/src/main/java/edu/uci/ics/hivesterix/runtime/exec/HyracksExecutionEngine.java
index 88ecf6d..e6f47cf 100644
--- a/hivesterix/hivesterix-dist/src/main/java/edu/uci/ics/hivesterix/runtime/exec/HyracksExecutionEngine.java
+++ b/hivesterix/hivesterix-dist/src/main/java/edu/uci/ics/hivesterix/runtime/exec/HyracksExecutionEngine.java
@@ -1,5 +1,10 @@
 package edu.uci.ics.hivesterix.runtime.exec;

 

+import java.io.BufferedReader;

+import java.io.DataInputStream;

+import java.io.FileInputStream;

+import java.io.InputStream;

+import java.io.InputStreamReader;

 import java.io.PrintWriter;

 import java.io.Serializable;

 import java.util.ArrayList;

@@ -8,6 +13,7 @@
 import java.util.List;

 import java.util.Map;

 import java.util.Map.Entry;

+import java.util.Properties;

 import java.util.Set;

 

 import org.apache.commons.logging.Log;

@@ -72,524 +78,502 @@
 @SuppressWarnings({ "rawtypes", "unchecked" })

 public class HyracksExecutionEngine implements IExecutionEngine {

 

-	private static final Log LOG = LogFactory

-			.getLog(HyracksExecutionEngine.class.getName());

+    private static final Log LOG = LogFactory.getLog(HyracksExecutionEngine.class.getName());

+    private static final String clusterPropertiesPath = "conf/cluster.properties";

 

-	private static List<Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>> DEFAULT_LOGICAL_REWRITES = new ArrayList<Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>>();

-	private static List<Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>> DEFAULT_PHYSICAL_REWRITES = new ArrayList<Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>>();

-	static {

-		SequentialFixpointRuleController seqCtrlNoDfs = new SequentialFixpointRuleController(

-				false);

-		SequentialFixpointRuleController seqCtrlFullDfs = new SequentialFixpointRuleController(

-				true);

-		SequentialOnceRuleController seqOnceCtrl = new SequentialOnceRuleController(

-				true);

-		DEFAULT_LOGICAL_REWRITES

-				.add(new Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>(

-						seqCtrlFullDfs, HiveRuleCollections.NORMALIZATION));

-		DEFAULT_LOGICAL_REWRITES

-				.add(new Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>(

-						seqCtrlNoDfs,

-						HiveRuleCollections.COND_PUSHDOWN_AND_JOIN_INFERENCE));

-		DEFAULT_LOGICAL_REWRITES

-				.add(new Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>(

-						seqCtrlFullDfs, HiveRuleCollections.LOAD_FIELDS));

-		DEFAULT_LOGICAL_REWRITES

-				.add(new Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>(

-						seqCtrlNoDfs, HiveRuleCollections.OP_PUSHDOWN));

-		DEFAULT_LOGICAL_REWRITES

-				.add(new Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>(

-						seqOnceCtrl, HiveRuleCollections.DATA_EXCHANGE));

-		DEFAULT_LOGICAL_REWRITES

-				.add(new Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>(

-						seqCtrlNoDfs, HiveRuleCollections.CONSOLIDATION));

+    private static List<Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>> DEFAULT_LOGICAL_REWRITES = new ArrayList<Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>>();

+    private static List<Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>> DEFAULT_PHYSICAL_REWRITES = new ArrayList<Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>>();

+    static {

+        SequentialFixpointRuleController seqCtrlNoDfs = new SequentialFixpointRuleController(false);

+        SequentialFixpointRuleController seqCtrlFullDfs = new SequentialFixpointRuleController(true);

+        SequentialOnceRuleController seqOnceCtrl = new SequentialOnceRuleController(true);

+        DEFAULT_LOGICAL_REWRITES.add(new Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>(seqCtrlFullDfs,

+                HiveRuleCollections.NORMALIZATION));

+        DEFAULT_LOGICAL_REWRITES.add(new Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>(seqCtrlNoDfs,

+                HiveRuleCollections.COND_PUSHDOWN_AND_JOIN_INFERENCE));

+        DEFAULT_LOGICAL_REWRITES.add(new Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>(seqCtrlFullDfs,

+                HiveRuleCollections.LOAD_FIELDS));

+        DEFAULT_LOGICAL_REWRITES.add(new Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>(seqCtrlNoDfs,

+                HiveRuleCollections.OP_PUSHDOWN));

+        DEFAULT_LOGICAL_REWRITES.add(new Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>(seqOnceCtrl,

+                HiveRuleCollections.DATA_EXCHANGE));

+        DEFAULT_LOGICAL_REWRITES.add(new Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>(seqCtrlNoDfs,

+                HiveRuleCollections.CONSOLIDATION));

 

-		DEFAULT_PHYSICAL_REWRITES

-				.add(new Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>(

-						seqOnceCtrl, HiveRuleCollections.PHYSICAL_PLAN_REWRITES));

-		DEFAULT_PHYSICAL_REWRITES

-				.add(new Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>(

-						seqOnceCtrl, HiveRuleCollections.prepareJobGenRules));

-	}

+        DEFAULT_PHYSICAL_REWRITES.add(new Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>(seqOnceCtrl,

+                HiveRuleCollections.PHYSICAL_PLAN_REWRITES));

+        DEFAULT_PHYSICAL_REWRITES.add(new Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>(seqOnceCtrl,

+                HiveRuleCollections.prepareJobGenRules));

+    }

 

-	/**

-	 * static configurations for compiler

-	 */

-	private HeuristicCompilerFactoryBuilder builder;

+    /**

+     * static configurations for compiler

+     */

+    private HeuristicCompilerFactoryBuilder builder;

 

-	/**

-	 * compiler

-	 */

-	private ICompiler compiler;

+    /**

+     * compiler

+     */

+    private ICompiler compiler;

 

-	/**

-	 * physical optimization config

-	 */

-	private PhysicalOptimizationConfig physicalOptimizationConfig;

+    /**

+     * physical optimization config

+     */

+    private PhysicalOptimizationConfig physicalOptimizationConfig;

 

-	/**

-	 * final ending operators

-	 */

-	private List<Operator> leaveOps = new ArrayList<Operator>();

+    /**

+     * final ending operators

+     */

+    private List<Operator> leaveOps = new ArrayList<Operator>();

 

-	/**

-	 * tasks that are already visited

-	 */

-	private Map<Task<? extends Serializable>, Boolean> tasksVisited = new HashMap<Task<? extends Serializable>, Boolean>();

+    /**

+     * tasks that are already visited

+     */

+    private Map<Task<? extends Serializable>, Boolean> tasksVisited = new HashMap<Task<? extends Serializable>, Boolean>();

 

-	/**

-	 * hyracks job spec

-	 */

-	private JobSpecification jobSpec;

+    /**

+     * hyracks job spec

+     */

+    private JobSpecification jobSpec;

 

-	/**

-	 * hive configuration

-	 */

-	private HiveConf conf;

+    /**

+     * hive configuration

+     */

+    private HiveConf conf;

 

-	/**

-	 * plan printer

-	 */

-	private PrintWriter planPrinter;

+    /**

+     * plan printer

+     */

+    private PrintWriter planPrinter;

 

-	public HyracksExecutionEngine(HiveConf conf) {

-		this.conf = conf;

-		init(conf);

-	}

+    /**

+     * properties

+     */

+    private Properties clusterProps;

 

-	public HyracksExecutionEngine(HiveConf conf, PrintWriter planPrinter) {

-		this.conf = conf;

-		this.planPrinter = planPrinter;

-		init(conf);

-	}

+    public HyracksExecutionEngine(HiveConf conf) {

+        this.conf = conf;

+        init(conf);

+    }

 

-	private void init(HiveConf conf) {

-		builder = new HeuristicCompilerFactoryBuilder(

-				DefaultOptimizationContextFactory.INSTANCE);

-		builder.setLogicalRewrites(DEFAULT_LOGICAL_REWRITES);

-		builder.setPhysicalRewrites(DEFAULT_PHYSICAL_REWRITES);

-		builder.setIMergeAggregationExpressionFactory(HiveMergeAggregationExpressionFactory.INSTANCE);

-		builder.setExpressionTypeComputer(HiveExpressionTypeComputer.INSTANCE);

-		builder.setNullableTypeComputer(HiveNullableTypeComputer.INSTANCE);

+    public HyracksExecutionEngine(HiveConf conf, PrintWriter planPrinter) {

+        this.conf = conf;

+        this.planPrinter = planPrinter;

+        init(conf);

+    }

 

-		long memSizeExternalGby = conf.getLong(

-				"hive.algebricks.groupby.external.memory", 268435456);

-		long memSizeExternalSort = conf.getLong("hive.algebricks.sort.memory",

-				536870912);

-		int frameSize = conf.getInt("hive.algebricks.framesize", 32768);

+    private void init(HiveConf conf) {

+        builder = new HeuristicCompilerFactoryBuilder(DefaultOptimizationContextFactory.INSTANCE);

+        builder.setLogicalRewrites(DEFAULT_LOGICAL_REWRITES);

+        builder.setPhysicalRewrites(DEFAULT_PHYSICAL_REWRITES);

+        builder.setIMergeAggregationExpressionFactory(HiveMergeAggregationExpressionFactory.INSTANCE);

+        builder.setExpressionTypeComputer(HiveExpressionTypeComputer.INSTANCE);

+        builder.setNullableTypeComputer(HiveNullableTypeComputer.INSTANCE);

 

-		physicalOptimizationConfig = new PhysicalOptimizationConfig();

-		int frameLimitExtGby = (int) (memSizeExternalGby / frameSize);

-		physicalOptimizationConfig

-				.setMaxFramesExternalGroupBy(frameLimitExtGby);

-		int frameLimitExtSort = (int) (memSizeExternalSort / frameSize);

-		physicalOptimizationConfig.setMaxFramesExternalSort(frameLimitExtSort);

-		builder.setPhysicalOptimizationConfig(physicalOptimizationConfig);

-	}

+        long memSizeExternalGby = conf.getLong("hive.algebricks.groupby.external.memory", 268435456);

+        long memSizeExternalSort = conf.getLong("hive.algebricks.sort.memory", 536870912);

+        int frameSize = conf.getInt("hive.algebricks.framesize", 32768);

 

-	@Override

-	public int compileJob(List<Task<? extends Serializable>> rootTasks) {

-		// clean up

-		leaveOps.clear();

-		tasksVisited.clear();

-		jobSpec = null;

+        physicalOptimizationConfig = new PhysicalOptimizationConfig();

+        int frameLimitExtGby = (int) (memSizeExternalGby / frameSize);

+        physicalOptimizationConfig.setMaxFramesExternalGroupBy(frameLimitExtGby);

+        int frameLimitExtSort = (int) (memSizeExternalSort / frameSize);

+        physicalOptimizationConfig.setMaxFramesExternalSort(frameLimitExtSort);

+        builder.setPhysicalOptimizationConfig(physicalOptimizationConfig);

+    }

 

-		HashMap<String, PartitionDesc> aliasToPath = new HashMap<String, PartitionDesc>();

-		List<Operator> rootOps = generateRootOperatorDAG(rootTasks, aliasToPath);

+    @Override

+    public int compileJob(List<Task<? extends Serializable>> rootTasks) {

+        // clean up

+        leaveOps.clear();

+        tasksVisited.clear();

+        jobSpec = null;

 

-		// get all leave Ops

-		getLeaves(rootOps, leaveOps);

+        HashMap<String, PartitionDesc> aliasToPath = new HashMap<String, PartitionDesc>();

+        List<Operator> rootOps = generateRootOperatorDAG(rootTasks, aliasToPath);

 

-		HiveAlgebricksTranslator translator = new HiveAlgebricksTranslator();

-		try {

-			translator.translate(rootOps, null, aliasToPath);

+        // get all leave Ops

+        getLeaves(rootOps, leaveOps);

 

-			ILogicalPlan plan = translator.genLogicalPlan();

+        HiveAlgebricksTranslator translator = new HiveAlgebricksTranslator();

+        try {

+            translator.translate(rootOps, null, aliasToPath);

 

-			if (plan.getRoots() != null && plan.getRoots().size() > 0

-					&& plan.getRoots().get(0).getValue() != null) {

-				translator.printOperators();

-				ILogicalPlanAndMetadata planAndMetadata = new HiveLogicalPlanAndMetaData(

-						plan, translator.getMetadataProvider());

+            ILogicalPlan plan = translator.genLogicalPlan();

 

-				ICompilerFactory compilerFactory = builder.create();

-				compiler = compilerFactory.createCompiler(

-						planAndMetadata.getPlan(),

-						planAndMetadata.getMetadataProvider(),

-						translator.getVariableCounter());

+            if (plan.getRoots() != null && plan.getRoots().size() > 0 && plan.getRoots().get(0).getValue() != null) {

+                translator.printOperators();

+                ILogicalPlanAndMetadata planAndMetadata = new HiveLogicalPlanAndMetaData(plan,

+                        translator.getMetadataProvider());

 

-				// run optimization and re-writing rules for Hive plan

-				compiler.optimize();

+                ICompilerFactory compilerFactory = builder.create();

+                compiler = compilerFactory.createCompiler(planAndMetadata.getPlan(),

+                        planAndMetadata.getMetadataProvider(), translator.getVariableCounter());

 

-				// print optimized plan

-				LogicalOperatorPrettyPrintVisitor pvisitor = new LogicalOperatorPrettyPrintVisitor();

-				StringBuilder buffer = new StringBuilder();

-				PlanPrettyPrinter.printPlan(plan, buffer, pvisitor, 0);

-				String planStr = buffer.toString();

-				System.out.println(planStr);

+                // run optimization and re-writing rules for Hive plan

+                compiler.optimize();

 

-				if (planPrinter != null)

-					planPrinter.print(planStr);

-			}

-		} catch (Exception e) {

-			e.printStackTrace();

-			return 1;

-		}

+                // print optimized plan

+                LogicalOperatorPrettyPrintVisitor pvisitor = new LogicalOperatorPrettyPrintVisitor();

+                StringBuilder buffer = new StringBuilder();

+                PlanPrettyPrinter.printPlan(plan, buffer, pvisitor, 0);

+                String planStr = buffer.toString();

+                System.out.println(planStr);

 

-		return 0;

-	}

+                if (planPrinter != null)

+                    planPrinter.print(planStr);

+            }

+        } catch (Exception e) {

+            e.printStackTrace();

+            return 1;

+        }

 

-	private void codeGen() throws AlgebricksException {

-		try {

-			// number of cpu cores in the cluster

-			builder.setClusterLocations(new AlgebricksAbsolutePartitionConstraint(

-					ConfUtil.getNCs()));

-		} catch (Exception e) {

-			throw new AlgebricksException(e);

-		}

-		// builder.setClusterTopology(ConfUtil.getClusterTopology());

-		builder.setBinaryBooleanInspectorFactory(HiveBinaryBooleanInspectorFactory.INSTANCE);

-		builder.setBinaryIntegerInspectorFactory(HiveBinaryIntegerInspectorFactory.INSTANCE);

-		builder.setComparatorFactoryProvider(HiveBinaryComparatorFactoryProvider.INSTANCE);

-		builder.setExpressionRuntimeProvider(HiveExpressionRuntimeProvider.INSTANCE);

-		builder.setHashFunctionFactoryProvider(HiveBinaryHashFunctionFactoryProvider.INSTANCE);

-		builder.setPrinterProvider(HivePrinterFactoryProvider.INSTANCE);

-		builder.setSerializerDeserializerProvider(HiveSerializerDeserializerProvider.INSTANCE);

-		builder.setNullWriterFactory(HiveNullWriterFactory.INSTANCE);

-		builder.setNormalizedKeyComputerFactoryProvider(HiveNormalizedKeyComputerFactoryProvider.INSTANCE);

-		builder.setPartialAggregationTypeComputer(HivePartialAggregationTypeComputer.INSTANCE);

-		builder.setTypeTraitProvider(HiveTypeTraitProvider.INSTANCE);

-		builder.setHashFunctionFamilyProvider(HiveBinaryHashFunctionFamilyProvider.INSTANCE);

+        return 0;

+    }

 

-		jobSpec = compiler.createJob(null);

+    private void codeGen() throws AlgebricksException {

+        try {

+            // number of cpu cores in the cluster

+            builder.setClusterLocations(new AlgebricksAbsolutePartitionConstraint(ConfUtil.getNCs()));

+        } catch (Exception e) {

+            throw new AlgebricksException(e);

+        }

+        // builder.setClusterTopology(ConfUtil.getClusterTopology());

+        builder.setBinaryBooleanInspectorFactory(HiveBinaryBooleanInspectorFactory.INSTANCE);

+        builder.setBinaryIntegerInspectorFactory(HiveBinaryIntegerInspectorFactory.INSTANCE);

+        builder.setComparatorFactoryProvider(HiveBinaryComparatorFactoryProvider.INSTANCE);

+        builder.setExpressionRuntimeProvider(HiveExpressionRuntimeProvider.INSTANCE);

+        builder.setHashFunctionFactoryProvider(HiveBinaryHashFunctionFactoryProvider.INSTANCE);

+        builder.setPrinterProvider(HivePrinterFactoryProvider.INSTANCE);

+        builder.setSerializerDeserializerProvider(HiveSerializerDeserializerProvider.INSTANCE);

+        builder.setNullWriterFactory(HiveNullWriterFactory.INSTANCE);

+        builder.setNormalizedKeyComputerFactoryProvider(HiveNormalizedKeyComputerFactoryProvider.INSTANCE);

+        builder.setPartialAggregationTypeComputer(HivePartialAggregationTypeComputer.INSTANCE);

+        builder.setTypeTraitProvider(HiveTypeTraitProvider.INSTANCE);

+        builder.setHashFunctionFamilyProvider(HiveBinaryHashFunctionFamilyProvider.INSTANCE);

 

-		// set the policy

-		String policyStr = conf.get("hive.hyracks.connectorpolicy");

-		if (policyStr == null)

-			policyStr = "PIPELINING";

-		Policy policyValue = Policy.valueOf(policyStr);

-		jobSpec.setConnectorPolicyAssignmentPolicy(new HiveConnectorPolicyAssignmentPolicy(

-				policyValue));

-		jobSpec.setUseConnectorPolicyForScheduling(false);

-	}

+        jobSpec = compiler.createJob(null);

 

-	@Override

-	public int executeJob() {

-		try {

-			codeGen();

-			executeHyraxJob(jobSpec);

-		} catch (Exception e) {

-			e.printStackTrace();

-			return 1;

-		}

-		return 0;

-	}

+        // set the policy

+        String policyStr = conf.get("hive.hyracks.connectorpolicy");

+        if (policyStr == null)

+            policyStr = "PIPELINING";

+        Policy policyValue = Policy.valueOf(policyStr);

+        jobSpec.setConnectorPolicyAssignmentPolicy(new HiveConnectorPolicyAssignmentPolicy(policyValue));

+        jobSpec.setUseConnectorPolicyForScheduling(false);

+    }

 

-	private List<Operator> generateRootOperatorDAG(

-			List<Task<? extends Serializable>> rootTasks,

-			HashMap<String, PartitionDesc> aliasToPath) {

+    @Override

+    public int executeJob() {

+        try {

+            codeGen();

+            executeHyracksJob(jobSpec);

+        } catch (Exception e) {

+            e.printStackTrace();

+            return 1;

+        }

+        return 0;

+    }

 

-		List<Operator> rootOps = new ArrayList<Operator>();

-		List<Task<? extends Serializable>> toDelete = new ArrayList<Task<? extends Serializable>>();

-		tasksVisited.clear();

+    private List<Operator> generateRootOperatorDAG(List<Task<? extends Serializable>> rootTasks,

+            HashMap<String, PartitionDesc> aliasToPath) {

 

-		for (int i = rootTasks.size() - 1; i >= 0; i--) {

-			/**

-			 * list of map-reduce tasks

-			 */

-			Task<? extends Serializable> task = rootTasks.get(i);

+        List<Operator> rootOps = new ArrayList<Operator>();

+        List<Task<? extends Serializable>> toDelete = new ArrayList<Task<? extends Serializable>>();

+        tasksVisited.clear();

 

-			if (task instanceof MapRedTask) {

-				List<Operator> mapRootOps = articulateMapReduceOperators(task,

-						rootOps, aliasToPath, rootTasks);

-				if (i == 0)

-					rootOps.addAll(mapRootOps);

-				else {

-					List<Operator> leaves = new ArrayList<Operator>();

-					getLeaves(rootOps, leaves);

+        for (int i = rootTasks.size() - 1; i >= 0; i--) {

+            /**

+             * list of map-reduce tasks

+             */

+            Task<? extends Serializable> task = rootTasks.get(i);

 

-					List<Operator> mapChildren = new ArrayList<Operator>();

-					for (Operator childMap : mapRootOps) {

-						if (childMap instanceof TableScanOperator) {

-							TableScanDesc topDesc = (TableScanDesc) childMap

-									.getConf();

-							if (topDesc == null)

-								mapChildren.add(childMap);

-							else {

-								rootOps.add(childMap);

-							}

-						} else

-							mapChildren.add(childMap);

-					}

+            if (task instanceof MapRedTask) {

+                List<Operator> mapRootOps = articulateMapReduceOperators(task, rootOps, aliasToPath, rootTasks);

+                if (i == 0)

+                    rootOps.addAll(mapRootOps);

+                else {

+                    List<Operator> leaves = new ArrayList<Operator>();

+                    getLeaves(rootOps, leaves);

 

-					if (mapChildren.size() > 0) {

-						for (Operator leaf : leaves)

-							leaf.setChildOperators(mapChildren);

-						for (Operator child : mapChildren)

-							child.setParentOperators(leaves);

-					}

-				}

+                    List<Operator> mapChildren = new ArrayList<Operator>();

+                    for (Operator childMap : mapRootOps) {

+                        if (childMap instanceof TableScanOperator) {

+                            TableScanDesc topDesc = (TableScanDesc) childMap.getConf();

+                            if (topDesc == null)

+                                mapChildren.add(childMap);

+                            else {

+                                rootOps.add(childMap);

+                            }

+                        } else

+                            mapChildren.add(childMap);

+                    }

 

-				MapredWork mr = (MapredWork) task.getWork();

-				HashMap<String, PartitionDesc> map = mr.getAliasToPartnInfo();

+                    if (mapChildren.size() > 0) {

+                        for (Operator leaf : leaves)

+                            leaf.setChildOperators(mapChildren);

+                        for (Operator child : mapChildren)

+                            child.setParentOperators(leaves);

+                    }

+                }

 

-				addAliasToPartition(aliasToPath, map);

-				toDelete.add(task);

-			}

-		}

+                MapredWork mr = (MapredWork) task.getWork();

+                HashMap<String, PartitionDesc> map = mr.getAliasToPartnInfo();

 

-		for (Task<? extends Serializable> task : toDelete)

-			rootTasks.remove(task);

+                addAliasToPartition(aliasToPath, map);

+                toDelete.add(task);

+            }

+        }

+

+        for (Task<? extends Serializable> task : toDelete)

+            rootTasks.remove(task);

+

+        return rootOps;

+    }

 

-		return rootOps;

-	}

+    private void addAliasToPartition(HashMap<String, PartitionDesc> aliasToPath, HashMap<String, PartitionDesc> map) {

+        Iterator<String> keys = map.keySet().iterator();

+        while (keys.hasNext()) {

+            String key = keys.next();

+            PartitionDesc part = map.get(key);

+            String[] names = key.split(":");

+            for (String name : names) {

+                aliasToPath.put(name, part);

+            }

+        }

+    }

 

-	private void addAliasToPartition(

-			HashMap<String, PartitionDesc> aliasToPath,

-			HashMap<String, PartitionDesc> map) {

-		Iterator<String> keys = map.keySet().iterator();

-		while (keys.hasNext()) {

-			String key = keys.next();

-			PartitionDesc part = map.get(key);

-			String[] names = key.split(":");

-			for (String name : names) {

-				aliasToPath.put(name, part);

-			}

-		}

-	}

+    private List<Operator> articulateMapReduceOperators(Task task, List<Operator> rootOps,

+            HashMap<String, PartitionDesc> aliasToPath, List<Task<? extends Serializable>> rootTasks) {

+        // System.out.println("!"+task.getName());

+        if (!(task instanceof MapRedTask)) {

+            if (!(task instanceof ConditionalTask)) {

+                rootTasks.add(task);

+                return null;

+            } else {

+                // remove map-reduce branches in condition task

+                ConditionalTask condition = (ConditionalTask) task;

+                List<Task<? extends Serializable>> branches = condition.getListTasks();

+                for (int i = branches.size() - 1; i >= 0; i--) {

+                    Task branch = branches.get(i);

+                    if (branch instanceof MapRedTask) {

+                        return articulateMapReduceOperators(branch, rootOps, aliasToPath, rootTasks);

+                    }

+                }

+                rootTasks.add(task);

+                return null;

+            }

+        }

 

-	private List<Operator> articulateMapReduceOperators(Task task,

-			List<Operator> rootOps, HashMap<String, PartitionDesc> aliasToPath,

-			List<Task<? extends Serializable>> rootTasks) {

-		// System.out.println("!"+task.getName());

-		if (!(task instanceof MapRedTask)) {

-			if (!(task instanceof ConditionalTask)) {

-				rootTasks.add(task);

-				return null;

-			} else {

-				// remove map-reduce branches in condition task

-				ConditionalTask condition = (ConditionalTask) task;

-				List<Task<? extends Serializable>> branches = condition

-						.getListTasks();

-				for (int i = branches.size() - 1; i >= 0; i--) {

-					Task branch = branches.get(i);

-					if (branch instanceof MapRedTask) {

-						return articulateMapReduceOperators(branch, rootOps,

-								aliasToPath, rootTasks);

-					}

-				}

-				rootTasks.add(task);

-				return null;

-			}

-		}

+        MapredWork mr = (MapredWork) task.getWork();

+        HashMap<String, PartitionDesc> map = mr.getAliasToPartnInfo();

 

-		MapredWork mr = (MapredWork) task.getWork();

-		HashMap<String, PartitionDesc> map = mr.getAliasToPartnInfo();

+        // put all aliasToParitionDesc mapping into the map

+        addAliasToPartition(aliasToPath, map);

 

-		// put all aliasToParitionDesc mapping into the map

-		addAliasToPartition(aliasToPath, map);

+        MapRedTask mrtask = (MapRedTask) task;

+        MapredWork work = (MapredWork) mrtask.getWork();

+        HashMap<String, Operator<? extends Serializable>> operators = work.getAliasToWork();

 

-		MapRedTask mrtask = (MapRedTask) task;

-		MapredWork work = (MapredWork) mrtask.getWork();

-		HashMap<String, Operator<? extends Serializable>> operators = work

-				.getAliasToWork();

+        Set entries = operators.entrySet();

+        Iterator<Entry<String, Operator>> iterator = entries.iterator();

+        List<Operator> mapRootOps = new ArrayList<Operator>();

 

-		Set entries = operators.entrySet();

-		Iterator<Entry<String, Operator>> iterator = entries.iterator();

-		List<Operator> mapRootOps = new ArrayList<Operator>();

+        // get map root operators

+        while (iterator.hasNext()) {

+            Operator next = iterator.next().getValue();

+            if (!mapRootOps.contains(next)) {

+                // clear that only for the case of union

+                mapRootOps.add(next);

+            }

+        }

 

-		// get map root operators

-		while (iterator.hasNext()) {

-			Operator next = iterator.next().getValue();

-			if (!mapRootOps.contains(next)) {

-				// clear that only for the case of union

-				mapRootOps.add(next);

-			}

-		}

+        // get map local work

+        MapredLocalWork localWork = work.getMapLocalWork();

+        if (localWork != null) {

+            HashMap<String, Operator<? extends Serializable>> localOperators = localWork.getAliasToWork();

 

-		// get map local work

-		MapredLocalWork localWork = work.getMapLocalWork();

-		if (localWork != null) {

-			HashMap<String, Operator<? extends Serializable>> localOperators = localWork

-					.getAliasToWork();

+            Set localEntries = localOperators.entrySet();

+            Iterator<Entry<String, Operator>> localIterator = localEntries.iterator();

+            while (localIterator.hasNext()) {

+                mapRootOps.add(localIterator.next().getValue());

+            }

 

-			Set localEntries = localOperators.entrySet();

-			Iterator<Entry<String, Operator>> localIterator = localEntries

-					.iterator();

-			while (localIterator.hasNext()) {

-				mapRootOps.add(localIterator.next().getValue());

-			}

+            HashMap<String, FetchWork> localFetch = localWork.getAliasToFetchWork();

+            Set localFetchEntries = localFetch.entrySet();

+            Iterator<Entry<String, FetchWork>> localFetchIterator = localFetchEntries.iterator();

+            while (localFetchIterator.hasNext()) {

+                Entry<String, FetchWork> fetchMap = localFetchIterator.next();

+                FetchWork fetch = fetchMap.getValue();

+                String alias = fetchMap.getKey();

+                List<PartitionDesc> dirPart = fetch.getPartDesc();

 

-			HashMap<String, FetchWork> localFetch = localWork

-					.getAliasToFetchWork();

-			Set localFetchEntries = localFetch.entrySet();

-			Iterator<Entry<String, FetchWork>> localFetchIterator = localFetchEntries

-					.iterator();

-			while (localFetchIterator.hasNext()) {

-				Entry<String, FetchWork> fetchMap = localFetchIterator.next();

-				FetchWork fetch = fetchMap.getValue();

-				String alias = fetchMap.getKey();

-				List<PartitionDesc> dirPart = fetch.getPartDesc();

+                // temporary hack: put the first partitionDesc into the map

+                aliasToPath.put(alias, dirPart.get(0));

+            }

+        }

 

-				// temporary hack: put the first partitionDesc into the map

-				aliasToPath.put(alias, dirPart.get(0));

-			}

-		}

+        Boolean visited = tasksVisited.get(task);

+        if (visited != null && visited.booleanValue() == true) {

+            return mapRootOps;

+        }

 

-		Boolean visited = tasksVisited.get(task);

-		if (visited != null && visited.booleanValue() == true) {

-			return mapRootOps;

-		}

+        // do that only for union operator

+        for (Operator op : mapRootOps)

+            if (op.getParentOperators() != null)

+                op.getParentOperators().clear();

 

-		// do that only for union operator

-		for (Operator op : mapRootOps)

-			if (op.getParentOperators() != null)

-				op.getParentOperators().clear();

+        List<Operator> mapLeaves = new ArrayList<Operator>();

+        downToLeaves(mapRootOps, mapLeaves);

+        List<Operator> reduceOps = new ArrayList<Operator>();

 

-		List<Operator> mapLeaves = new ArrayList<Operator>();

-		downToLeaves(mapRootOps, mapLeaves);

-		List<Operator> reduceOps = new ArrayList<Operator>();

+        if (work.getReducer() != null)

+            reduceOps.add(work.getReducer());

 

-		if (work.getReducer() != null)

-			reduceOps.add(work.getReducer());

+        for (Operator mapLeaf : mapLeaves) {

+            mapLeaf.setChildOperators(reduceOps);

+        }

 

-		for (Operator mapLeaf : mapLeaves) {

-			mapLeaf.setChildOperators(reduceOps);

-		}

+        for (Operator reduceOp : reduceOps) {

+            if (reduceOp != null)

+                reduceOp.setParentOperators(mapLeaves);

+        }

 

-		for (Operator reduceOp : reduceOps) {

-			if (reduceOp != null)

-				reduceOp.setParentOperators(mapLeaves);

-		}

+        List<Operator> leafs = new ArrayList<Operator>();

+        if (reduceOps.size() > 0) {

+            downToLeaves(reduceOps, leafs);

+        } else {

+            leafs = mapLeaves;

+        }

 

-		List<Operator> leafs = new ArrayList<Operator>();

-		if (reduceOps.size() > 0) {

-			downToLeaves(reduceOps, leafs);

-		} else {

-			leafs = mapLeaves;

-		}

+        List<Operator> mapChildren = new ArrayList<Operator>();

+        if (task.getChildTasks() != null && task.getChildTasks().size() > 0) {

+            for (Object child : task.getChildTasks()) {

+                List<Operator> childMapOps = articulateMapReduceOperators((Task) child, rootOps, aliasToPath, rootTasks);

+                if (childMapOps == null)

+                    continue;

 

-		List<Operator> mapChildren = new ArrayList<Operator>();

-		if (task.getChildTasks() != null && task.getChildTasks().size() > 0) {

-			for (Object child : task.getChildTasks()) {

-				List<Operator> childMapOps = articulateMapReduceOperators(

-						(Task) child, rootOps, aliasToPath, rootTasks);

-				if (childMapOps == null)

-					continue;

+                for (Operator childMap : childMapOps) {

+                    if (childMap instanceof TableScanOperator) {

+                        TableScanDesc topDesc = (TableScanDesc) childMap.getConf();

+                        if (topDesc == null)

+                            mapChildren.add(childMap);

+                        else {

+                            rootOps.add(childMap);

+                        }

+                    } else {

+                        // if not table scan, add the child

+                        mapChildren.add(childMap);

+                    }

+                }

+            }

 

-				for (Operator childMap : childMapOps) {

-					if (childMap instanceof TableScanOperator) {

-						TableScanDesc topDesc = (TableScanDesc) childMap

-								.getConf();

-						if (topDesc == null)

-							mapChildren.add(childMap);

-						else {

-							rootOps.add(childMap);

-						}

-					} else {

-						// if not table scan, add the child

-						mapChildren.add(childMap);

-					}

-				}

-			}

+            if (mapChildren.size() > 0) {

+                int i = 0;

+                for (Operator leaf : leafs) {

+                    if (leaf.getChildOperators() == null || leaf.getChildOperators().size() == 0)

+                        leaf.setChildOperators(new ArrayList<Operator>());

+                    leaf.getChildOperators().add(mapChildren.get(i));

+                    i++;

+                }

+                i = 0;

+                for (Operator child : mapChildren) {

+                    if (child.getParentOperators() == null || child.getParentOperators().size() == 0)

+                        child.setParentOperators(new ArrayList<Operator>());

+                    child.getParentOperators().add(leafs.get(i));

+                    i++;

+                }

+            }

+        }

 

-			if (mapChildren.size() > 0) {

-				int i = 0;

-				for (Operator leaf : leafs) {

-					if (leaf.getChildOperators() == null

-							|| leaf.getChildOperators().size() == 0)

-						leaf.setChildOperators(new ArrayList<Operator>());

-					leaf.getChildOperators().add(mapChildren.get(i));

-					i++;

-				}

-				i = 0;

-				for (Operator child : mapChildren) {

-					if (child.getParentOperators() == null

-							|| child.getParentOperators().size() == 0)

-						child.setParentOperators(new ArrayList<Operator>());

-					child.getParentOperators().add(leafs.get(i));

-					i++;

-				}

-			}

-		}

+        // mark this task as visited

+        this.tasksVisited.put(task, true);

+        return mapRootOps;

+    }

 

-		// mark this task as visited

-		this.tasksVisited.put(task, true);

-		return mapRootOps;

-	}

+    /**

+     * down to leaf nodes

+     * 

+     * @param ops

+     * @param leaves

+     */

+    private void downToLeaves(List<Operator> ops, List<Operator> leaves) {

 

-	/**

-	 * down to leaf nodes

-	 * 

-	 * @param ops

-	 * @param leaves

-	 */

-	private void downToLeaves(List<Operator> ops, List<Operator> leaves) {

+        // Operator currentOp;

+        for (Operator op : ops) {

+            if (op != null && op.getChildOperators() != null && op.getChildOperators().size() > 0) {

+                downToLeaves(op.getChildOperators(), leaves);

+            } else {

+                if (op != null && leaves.indexOf(op) < 0)

+                    leaves.add(op);

+            }

+        }

+    }

 

-		// Operator currentOp;

-		for (Operator op : ops) {

-			if (op != null && op.getChildOperators() != null

-					&& op.getChildOperators().size() > 0) {

-				downToLeaves(op.getChildOperators(), leaves);

-			} else {

-				if (op != null && leaves.indexOf(op) < 0)

-					leaves.add(op);

-			}

-		}

-	}

+    private void getLeaves(List<Operator> roots, List<Operator> currentLeaves) {

+        for (Operator op : roots) {

+            List<Operator> children = op.getChildOperators();

+            if (children == null || children.size() <= 0) {

+                currentLeaves.add(op);

+            } else {

+                getLeaves(children, currentLeaves);

+            }

+        }

+    }

 

-	private void getLeaves(List<Operator> roots, List<Operator> currentLeaves) {

-		for (Operator op : roots) {

-			List<Operator> children = op.getChildOperators();

-			if (children == null || children.size() <= 0) {

-				currentLeaves.add(op);

-			} else {

-				getLeaves(children, currentLeaves);

-			}

-		}

-	}

+    private void executeHyracksJob(JobSpecification job) throws Exception {

 

-	private void executeHyraxJob(JobSpecification job) throws Exception {

-		String ipAddress = conf.get("hive.hyracks.host");

-		int port = Integer.parseInt(conf.get("hive.hyracks.port"));

-		String applicationName = conf.get("hive.hyracks.app");

-		// System.out.println("connect to " + ipAddress + " " + port);

+        /**

+         * load the properties file if it is not loaded

+         */

+        if (clusterProps == null) {

+            clusterProps = new Properties();

+            InputStream confIn = new FileInputStream(clusterPropertiesPath);

+            clusterProps.load(confIn);

+            confIn.close();

+        }

 

-		IHyracksClientConnection hcc = new HyracksConnection(ipAddress, port);

+        Process process = Runtime.getRuntime().exec("src/main/resources/scripts/getip.sh");

+        BufferedReader ipReader = new BufferedReader(new InputStreamReader(

+                new DataInputStream(process.getInputStream())));

+        String ipAddress = ipReader.readLine();

+        ipReader.close();

+        int port = Integer.parseInt(clusterProps.getProperty("CC_CLIENTPORT"));

+        String applicationName = "hivesterix";

 

-		// System.out.println("get connected");

-		long start = System.currentTimeMillis();

-		JobId jobId = hcc.startJob(applicationName, job);

-		hcc.waitForCompletion(jobId);

+        IHyracksClientConnection hcc = new HyracksConnection(ipAddress, port);

+        long start = System.currentTimeMillis();

+        JobId jobId = hcc.startJob(applicationName, job);

+        hcc.waitForCompletion(jobId);

 

-		// System.out.println("job finished: " + jobId.toString());

-		// call all leave nodes to end

-		for (Operator leaf : leaveOps) {

-			jobClose(leaf);

-		}

+        // System.out.println("job finished: " + jobId.toString());

+        // call all leave nodes to end

+        for (Operator leaf : leaveOps) {

+            jobClose(leaf);

+        }

 

-		long end = System.currentTimeMillis();

-		System.err.println(start + " " + end + " " + (end - start));

-	}

+        long end = System.currentTimeMillis();

+        System.err.println(start + " " + end + " " + (end - start));

+    }

 

-	/**

-	 * mv to final directory on hdfs (not real final)

-	 * 

-	 * @param leaf

-	 * @throws Exception

-	 */

-	private void jobClose(Operator leaf) throws Exception {

-		FileSinkOperator fsOp = (FileSinkOperator) leaf;

-		FileSinkDesc desc = fsOp.getConf();

-		boolean isNativeTable = !desc.getTableInfo().isNonNative();

-		if ((conf != null) && isNativeTable) {

-			String specPath = desc.getDirName();

-			DynamicPartitionCtx dpCtx = desc.getDynPartCtx();

-			// for 0.7.0

-			fsOp.mvFileToFinalPath(specPath, conf, true, LOG, dpCtx);

-			// for 0.8.0

-			// Utilities.mvFileToFinalPath(specPath, conf, true, LOG, dpCtx,

-			// desc);

-		}

-	}

+    /**

+     * mv to final directory on hdfs (not real final)

+     * 

+     * @param leaf

+     * @throws Exception

+     */

+    private void jobClose(Operator leaf) throws Exception {

+        FileSinkOperator fsOp = (FileSinkOperator) leaf;

+        FileSinkDesc desc = fsOp.getConf();

+        boolean isNativeTable = !desc.getTableInfo().isNonNative();

+        if ((conf != null) && isNativeTable) {

+            String specPath = desc.getDirName();

+            DynamicPartitionCtx dpCtx = desc.getDynPartCtx();

+            // for 0.7.0

+            fsOp.mvFileToFinalPath(specPath, conf, true, LOG, dpCtx);

+            // for 0.8.0

+            // Utilities.mvFileToFinalPath(specPath, conf, true, LOG, dpCtx,

+            // desc);

+        }

+    }

 }

diff --git a/hivesterix/hivesterix-dist/src/main/resources/conf/configuration.xsl b/hivesterix/hivesterix-dist/src/main/resources/conf/configuration.xsl
new file mode 100644
index 0000000..377cdbe
--- /dev/null
+++ b/hivesterix/hivesterix-dist/src/main/resources/conf/configuration.xsl
@@ -0,0 +1,24 @@
+<?xml version="1.0"?>
+<xsl:stylesheet xmlns:xsl="http://www.w3.org/1999/XSL/Transform" version="1.0">
+<xsl:output method="html"/>
+<xsl:template match="configuration">
+<html>
+<body>
+<table border="1">
+<tr>
+ <td>name</td>
+ <td>value</td>
+ <td>description</td>
+</tr>
+<xsl:for-each select="property">
+<tr>
+  <td><a name="{name}"><xsl:value-of select="name"/></a></td>
+  <td><xsl:value-of select="value"/></td>
+  <td><xsl:value-of select="description"/></td>
+</tr>
+</xsl:for-each>
+</table>
+</body>
+</html>
+</xsl:template>
+</xsl:stylesheet>
diff --git a/hivesterix/hivesterix-dist/src/main/resources/conf/hive-default.xml b/hivesterix/hivesterix-dist/src/main/resources/conf/hive-default.xml
new file mode 100644
index 0000000..587eede
--- /dev/null
+++ b/hivesterix/hivesterix-dist/src/main/resources/conf/hive-default.xml
@@ -0,0 +1,758 @@
+<?xml version="1.0"?>
+<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
+
+<configuration>
+
+	<!-- Hive Configuration can either be stored in this file or in the hadoop 
+		configuration files -->
+	<!-- that are implied by Hadoop setup variables. -->
+	<!-- Aside from Hadoop setup variables - this file is provided as a convenience 
+		so that Hive -->
+	<!-- users do not have to edit hadoop configuration files (that may be managed 
+		as a centralized -->
+	<!-- resource). -->
+
+	<!-- Hive Execution Parameters -->
+	<property>
+		<name>mapred.reduce.tasks</name>
+		<value>-1</value>
+		<description>The default number of reduce tasks per job. Typically set
+			to a prime close to the number of available hosts. Ignored when
+			mapred.job.tracker is "local". Hadoop set this to 1 by default,
+			whereas hive uses -1 as its default value.
+			By setting this property to -1, Hive will automatically figure out what
+			should be the number of reducers.
+  </description>
+
+        <property>
+		<name>hive.hyracks.connectorpolicy</name>
+		<value>PIPELINING</value>
+        </property>
+
+	<property>
+		<name>hive.hyracks.parrallelism</name>
+		<value>4</value>
+	</property>
+
+	<property>
+		<name>hive.algebricks.groupby.external</name>
+		<value>true</value>
+	</property>
+	
+	<property>
+		<name>hive.algebricks.groupby.external.memory</name>
+		<value>33554432</value>
+	</property>
+	
+	<property>
+		<name>hive.algebricks.sort.memory</name>
+		<value>33554432</value>
+	</property>
+
+	<property>
+		<name>hive.exec.reducers.bytes.per.reducer</name>
+		<value>1000000000</value>
+		<description>size per reducer.The default is 1G, i.e if the input size
+			is 10G, it will use 10 reducers.</description>
+	</property>
+
+	<property>
+		<name>hive.exec.reducers.max</name>
+		<value>999</value>
+		<description>max number of reducers will be used. If the one
+			specified in the configuration parameter mapred.reduce.tasks is
+			negative, hive will use this one as the max number of reducers when
+			automatically determine number of reducers.</description>
+	</property>
+
+	<property>
+		<name>hive.exec.scratchdir</name>
+		<value>/hive-${user.name}</value>
+		<description>Scratch space for Hive jobs</description>
+	</property>
+
+	<property>
+		<name>hive.test.mode</name>
+		<value>false</value>
+		<description>whether hive is running in test mode. If yes, it turns on
+			sampling and prefixes the output tablename</description>
+	</property>
+
+	<property>
+		<name>hive.test.mode.prefix</name>
+		<value>test_</value>
+		<description>if hive is running in test mode, prefixes the output
+			table by this string</description>
+	</property>
+
+	<!-- If the input table is not bucketed, the denominator of the tablesample 
+		is determinied by the parameter below -->
+	<!-- For example, the following query: -->
+	<!-- INSERT OVERWRITE TABLE dest -->
+	<!-- SELECT col1 from src -->
+	<!-- would be converted to -->
+	<!-- INSERT OVERWRITE TABLE test_dest -->
+	<!-- SELECT col1 from src TABLESAMPLE (BUCKET 1 out of 32 on rand(1)) -->
+	<property>
+		<name>hive.test.mode.samplefreq</name>
+		<value>32</value>
+		<description>if hive is running in test mode and table is not
+			bucketed, sampling frequency</description>
+	</property>
+
+	<property>
+		<name>hive.test.mode.nosamplelist</name>
+		<value></value>
+		<description>if hive is running in test mode, dont sample the above
+			comma seperated list of tables</description>
+	</property>
+
+	<property>
+		<name>hive.metastore.local</name>
+		<value>true</value>
+		<description>controls whether to connect to remove metastore server or
+			open a new metastore server in Hive Client JVM</description>
+	</property>
+
+	<property>
+		<name>javax.jdo.option.ConnectionURL</name>
+		<value>jdbc:derby:;databaseName=metastore_db;create=true</value>
+		<description>JDBC connect string for a JDBC metastore</description>
+	</property>
+
+	<property>
+		<name>javax.jdo.option.ConnectionDriverName</name>
+		<value>org.apache.derby.jdbc.EmbeddedDriver</value>
+		<description>Driver class name for a JDBC metastore</description>
+	</property>
+
+	<property>
+		<name>javax.jdo.PersistenceManagerFactoryClass</name>
+		<value>org.datanucleus.jdo.JDOPersistenceManagerFactory</value>
+		<description>class implementing the jdo persistence</description>
+	</property>
+
+	<property>
+		<name>datanucleus.connectionPoolingType</name>
+		<value>DBCP</value>
+		<description>Uses a DBCP connection pool for JDBC metastore
+		</description>
+	</property>
+
+	<property>
+		<name>javax.jdo.option.DetachAllOnCommit</name>
+		<value>true</value>
+		<description>detaches all objects from session so that they can be
+			used after transaction is committed</description>
+	</property>
+
+	<property>
+		<name>javax.jdo.option.NonTransactionalRead</name>
+		<value>true</value>
+		<description>reads outside of transactions</description>
+	</property>
+
+	<property>
+		<name>javax.jdo.option.ConnectionUserName</name>
+		<value>APP</value>
+		<description>username to use against metastore database</description>
+	</property>
+
+	<property>
+		<name>javax.jdo.option.ConnectionPassword</name>
+		<value>mine</value>
+		<description>password to use against metastore database</description>
+	</property>
+
+	<property>
+		<name>datanucleus.validateTables</name>
+		<value>false</value>
+		<description>validates existing schema against code. turn this on if
+			you want to verify existing schema </description>
+	</property>
+
+	<property>
+		<name>datanucleus.validateColumns</name>
+		<value>false</value>
+		<description>validates existing schema against code. turn this on if
+			you want to verify existing schema </description>
+	</property>
+
+	<property>
+		<name>datanucleus.validateConstraints</name>
+		<value>false</value>
+		<description>validates existing schema against code. turn this on if
+			you want to verify existing schema </description>
+	</property>
+
+	<property>
+		<name>datanucleus.storeManagerType</name>
+		<value>rdbms</value>
+		<description>metadata store type</description>
+	</property>
+
+	<property>
+		<name>datanucleus.autoCreateSchema</name>
+		<value>true</value>
+		<description>creates necessary schema on a startup if one doesn't
+			exist. set this to false, after creating it once</description>
+	</property>
+
+	<property>
+		<name>datanucleus.autoStartMechanismMode</name>
+		<value>checked</value>
+		<description>throw exception if metadata tables are incorrect
+		</description>
+	</property>
+
+	<property>
+		<name>datanucleus.transactionIsolation</name>
+		<value>read-committed</value>
+		<description>Default transaction isolation level for identity
+			generation. </description>
+	</property>
+
+	<property>
+		<name>datanucleus.cache.level2</name>
+		<value>false</value>
+		<description>Use a level 2 cache. Turn this off if metadata is changed
+			independently of hive metastore server</description>
+	</property>
+
+	<property>
+		<name>datanucleus.cache.level2.type</name>
+		<value>SOFT</value>
+		<description>SOFT=soft reference based cache, WEAK=weak reference
+			based cache.</description>
+	</property>
+
+	<property>
+		<name>datanucleus.identifierFactory</name>
+		<value>datanucleus</value>
+		<description>Name of the identifier factory to use when generating
+			table/column names etc. 'datanucleus' is used for backward
+			compatibility</description>
+	</property>
+
+	<property>
+		<name>hive.metastore.warehouse.dir</name>
+		<value>/user/hivesterix</value>
+		<description>location of default database for the warehouse
+		</description>
+	</property>
+
+	<property>
+		<name>hive.metastore.connect.retries</name>
+		<value>5</value>
+		<description>Number of retries while opening a connection to metastore
+		</description>
+	</property>
+
+	<property>
+		<name>hive.metastore.rawstore.impl</name>
+		<value>org.apache.hadoop.hive.metastore.ObjectStore</value>
+		<description>Name of the class that implements
+			org.apache.hadoop.hive.metastore.rawstore interface. This class is
+			used to store and retrieval of raw metadata objects such as table,
+			database</description>
+	</property>
+
+	<property>
+		<name>hive.default.fileformat</name>
+		<value>TextFile</value>
+		<description>Default file format for CREATE TABLE statement. Options
+			are TextFile and SequenceFile. Users can explicitly say CREATE TABLE
+			... STORED AS &lt;TEXTFILE|SEQUENCEFILE&gt; to override</description>
+	</property>
+
+	<property>
+		<name>hive.fileformat.check</name>
+		<value>true</value>
+		<description>Whether to check file format or not when loading data
+			files</description>
+	</property>
+
+	<property>
+		<name>hive.map.aggr</name>
+		<value>true</value>
+		<description>Whether to use map-side aggregation in Hive Group By
+			queries</description>
+	</property>
+
+	<property>
+		<name>hive.groupby.skewindata</name>
+		<value>false</value>
+		<description>Whether there is skew in data to optimize group by
+			queries</description>
+	</property>
+
+	<property>
+		<name>hive.groupby.mapaggr.checkinterval</name>
+		<value>100000</value>
+		<description>Number of rows after which size of the grouping
+			keys/aggregation classes is performed</description>
+	</property>
+
+	<property>
+		<name>hive.mapred.local.mem</name>
+		<value>0</value>
+		<description>For local mode, memory of the mappers/reducers
+		</description>
+	</property>
+
+	<property>
+		<name>hive.map.aggr.hash.percentmemory</name>
+		<value>0.5</value>
+		<description>Portion of total memory to be used by map-side grup
+			aggregation hash table</description>
+	</property>
+
+	<property>
+		<name>hive.map.aggr.hash.min.reduction</name>
+		<value>0.5</value>
+		<description>Hash aggregation will be turned off if the ratio between
+			hash
+			table size and input rows is bigger than this number. Set to 1 to make
+			sure
+			hash aggregation is never turned off.</description>
+	</property>
+
+	<property>
+		<name>hive.optimize.cp</name>
+		<value>true</value>
+		<description>Whether to enable column pruner</description>
+	</property>
+
+	<property>
+		<name>hive.optimize.ppd</name>
+		<value>true</value>
+		<description>Whether to enable predicate pushdown</description>
+	</property>
+
+	<property>
+		<name>hive.optimize.pruner</name>
+		<value>true</value>
+		<description>Whether to enable the new partition pruner which depends
+			on predicate pushdown. If this is disabled,
+			the old partition pruner which is based on AST will be enabled.
+		</description>
+	</property>
+
+	<property>
+		<name>hive.optimize.groupby</name>
+		<value>true</value>
+		<description>Whether to enable the bucketed group by from bucketed
+			partitions/tables.</description>
+	</property>
+
+	<property>
+		<name>hive.join.emit.interval</name>
+		<value>1000</value>
+		<description>How many rows in the right-most join operand Hive should
+			buffer before emitting the join result. </description>
+	</property>
+
+	<property>
+		<name>hive.join.cache.size</name>
+		<value>25000</value>
+		<description>How many rows in the joining tables (except the streaming
+			table) should be cached in memory. </description>
+	</property>
+
+	<property>
+		<name>hive.mapjoin.bucket.cache.size</name>
+		<value>100</value>
+		<description>How many values in each keys in the map-joined table
+			should be cached in memory. </description>
+	</property>
+
+	<property>
+		<name>hive.mapjoin.maxsize</name>
+		<value>100000</value>
+		<description>Maximum # of rows of the small table that can be handled
+			by map-side join. If the size is reached and hive.task.progress is
+			set, a fatal error counter is set and the job will be killed.
+		</description>
+	</property>
+
+	<property>
+		<name>hive.mapjoin.cache.numrows</name>
+		<value>25000</value>
+		<description>How many rows should be cached by jdbm for map join.
+		</description>
+	</property>
+
+	<property>
+		<name>hive.optimize.skewjoin</name>
+		<value>false</value>
+		<description>Whether to enable skew join optimization. </description>
+	</property>
+
+	<property>
+		<name>hive.skewjoin.key</name>
+		<value>100000</value>
+		<description>Determine if we get a skew key in join. If we see more
+			than the specified number of rows with the same key in join operator,
+			we think the key as a skew join key. </description>
+	</property>
+
+	<property>
+		<name>hive.skewjoin.mapjoin.map.tasks</name>
+		<value>10000</value>
+		<description> Determine the number of map task used in the follow up
+			map join job
+			for a skew join. It should be used together with
+			hive.skewjoin.mapjoin.min.split
+			to perform a fine grained control.</description>
+	</property>
+
+	<property>
+		<name>hive.skewjoin.mapjoin.min.split</name>
+		<value>33554432</value>
+		<description> Determine the number of map task at most used in the
+			follow up map join job
+			for a skew join by specifying the minimum split size. It should be used
+			together with
+			hive.skewjoin.mapjoin.map.tasks to perform a fine grained control.</description>
+	</property>
+
+	<property>
+		<name>hive.mapred.mode</name>
+		<value>nonstrict</value>
+		<description>The mode in which the hive operations are being
+			performed. In strict mode, some risky queries are not allowed to run
+		</description>
+	</property>
+
+	<property>
+		<name>hive.exec.script.maxerrsize</name>
+		<value>100000</value>
+		<description>Maximum number of bytes a script is allowed to emit to
+			standard error (per map-reduce task). This prevents runaway scripts
+			from filling logs partitions to capacity </description>
+	</property>
+
+	<property>
+		<name>hive.exec.script.allow.partial.consumption</name>
+		<value>false</value>
+		<description> When enabled, this option allows a user script to exit
+			successfully without consuming all the data from the standard input.
+		</description>
+	</property>
+
+	<property>
+		<name>hive.script.operator.id.env.var</name>
+		<value>HIVE_SCRIPT_OPERATOR_ID</value>
+		<description> Name of the environment variable that holds the unique
+			script operator ID in the user's transform function (the custom
+			mapper/reducer that the user has specified in the query)
+		</description>
+	</property>
+
+	<property>
+		<name>hive.exec.compress.output</name>
+		<value>false</value>
+		<description> This controls whether the final outputs of a query (to a
+			local/hdfs file or a hive table) is compressed. The compression codec
+			and other options are determined from hadoop config variables
+			mapred.output.compress* </description>
+	</property>
+
+	<property>
+		<name>hive.exec.compress.intermediate</name>
+		<value>false</value>
+		<description> This controls whether intermediate files produced by
+			hive between multiple map-reduce jobs are compressed. The compression
+			codec and other options are determined from hadoop config variables
+			mapred.output.compress* </description>
+	</property>
+
+	<property>
+		<name>hive.exec.parallel</name>
+		<value>false</value>
+		<description>Whether to execute jobs in parallel</description>
+	</property>
+
+	<property>
+		<name>hive.exec.parallel.thread.number</name>
+		<value>8</value>
+		<description>How many jobs at most can be executed in parallel
+		</description>
+	</property>
+
+	<property>
+		<name>hive.hwi.war.file</name>
+		<value>lib\hive-hwi-0.7.0.war</value>
+		<description>This sets the path to the HWI war file, relative to
+			${HIVE_HOME}. </description>
+	</property>
+
+	<property>
+		<name>hive.hwi.listen.host</name>
+		<value>0.0.0.0</value>
+		<description>This is the host address the Hive Web Interface will
+			listen on</description>
+	</property>
+
+	<property>
+		<name>hive.hwi.listen.port</name>
+		<value>9999</value>
+		<description>This is the port the Hive Web Interface will listen on
+		</description>
+	</property>
+
+	<property>
+		<name>hive.exec.pre.hooks</name>
+		<value></value>
+		<description>Pre Execute Hook for Tests</description>
+	</property>
+
+	<property>
+		<name>hive.merge.mapfiles</name>
+		<value>true</value>
+		<description>Merge small files at the end of a map-only job
+		</description>
+	</property>
+
+	<property>
+		<name>hive.merge.mapredfiles</name>
+		<value>false</value>
+		<description>Merge small files at the end of a map-reduce job
+		</description>
+	</property>
+
+	<property>
+		<name>hive.heartbeat.interval</name>
+		<value>1000</value>
+		<description>Send a heartbeat after this interval - used by mapjoin
+			and filter operators</description>
+	</property>
+
+	<property>
+		<name>hive.merge.size.per.task</name>
+		<value>256000000</value>
+		<description>Size of merged files at the end of the job</description>
+	</property>
+
+	<property>
+		<name>hive.merge.size.smallfiles.avgsize</name>
+		<value>16000000</value>
+		<description>When the average output file size of a job is less than
+			this number, Hive will start an additional map-reduce job to merge
+			the output files into bigger files. This is only done for map-only
+			jobs if hive.merge.mapfiles is true, and for map-reduce jobs if
+			hive.merge.mapredfiles is true.</description>
+	</property>
+
+	<property>
+		<name>hive.script.auto.progress</name>
+		<value>false</value>
+		<description>Whether Hive Tranform/Map/Reduce Clause should
+			automatically send progress information to TaskTracker to avoid the
+			task getting killed because of inactivity. Hive sends progress
+			information when the script is outputting to stderr. This option
+			removes the need of periodically producing stderr messages, but users
+			should be cautious because this may prevent infinite loops in the
+			scripts to be killed by TaskTracker.  </description>
+	</property>
+
+	<property>
+		<name>hive.script.serde</name>
+		<value>org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe</value>
+		<description>The default serde for trasmitting input data to and
+			reading output data from the user scripts. </description>
+	</property>
+
+	<property>
+		<name>hive.script.recordreader</name>
+		<value>org.apache.hadoop.hive.ql.exec.TextRecordReader</value>
+		<description>The default record reader for reading data from the user
+			scripts. </description>
+	</property>
+
+	<property>
+		<name>hive.script.recordwriter</name>
+		<value>org.apache.hadoop.hive.ql.exec.TextRecordWriter</value>
+		<description>The default record writer for writing data to the user
+			scripts. </description>
+	</property>
+
+	<property>
+		<name>hive.input.format</name>
+		<value>org.apache.hadoop.hive.ql.io.HiveInputFormat</value>
+		<description>The default input format, if it is not specified, the
+			system assigns it. It is set to HiveInputFormat for hadoop versions
+			17, 18 and 19, whereas it is set to CombinedHiveInputFormat for
+			hadoop 20. The user can always overwrite it - if there is a bug in
+			CombinedHiveInputFormat, it can always be manually set to
+			HiveInputFormat. </description>
+	</property>
+
+	<property>
+		<name>hive.udtf.auto.progress</name>
+		<value>false</value>
+		<description>Whether Hive should automatically send progress
+			information to TaskTracker when using UDTF's to prevent the task
+			getting killed because of inactivity. Users should be cautious
+			because this may prevent TaskTracker from killing tasks with infinte
+			loops.  </description>
+	</property>
+
+	<property>
+		<name>hive.mapred.reduce.tasks.speculative.execution</name>
+		<value>true</value>
+		<description>Whether speculative execution for reducers should be
+			turned on. </description>
+	</property>
+
+	<property>
+		<name>hive.exec.counters.pull.interval</name>
+		<value>1000</value>
+		<description>The interval with which to poll the JobTracker for the
+			counters the running job. The smaller it is the more load there will
+			be on the jobtracker, the higher it is the less granular the caught
+			will be.</description>
+	</property>
+
+	<property>
+		<name>hive.enforce.bucketing</name>
+		<value>false</value>
+		<description>Whether bucketing is enforced. If true, while inserting
+			into the table, bucketing is enforced. </description>
+	</property>
+
+	<property>
+		<name>hive.enforce.sorting</name>
+		<value>false</value>
+		<description>Whether sorting is enforced. If true, while inserting
+			into the table, sorting is enforced. </description>
+	</property>
+
+	<property>
+		<name>hive.metastore.ds.connection.url.hook</name>
+		<value></value>
+		<description>Name of the hook to use for retriving the JDO connection
+			URL. If empty, the value in javax.jdo.option.ConnectionURL is used
+		</description>
+	</property>
+
+	<property>
+		<name>hive.metastore.ds.retry.attempts</name>
+		<value>1</value>
+		<description>The number of times to retry a metastore call if there
+			were a connection error</description>
+	</property>
+
+	<property>
+		<name>hive.metastore.ds.retry.interval</name>
+		<value>1000</value>
+		<description>The number of miliseconds between metastore retry
+			attempts</description>
+	</property>
+
+	<property>
+		<name>hive.metastore.server.min.threads</name>
+		<value>200</value>
+		<description>Minimum number of worker threads in the Thrift server's
+			pool.</description>
+	</property>
+
+	<property>
+		<name>hive.metastore.server.max.threads</name>
+		<value>100000</value>
+		<description>Maximum number of worker threads in the Thrift server's
+			pool.</description>
+	</property>
+
+	<property>
+		<name>hive.metastore.server.tcp.keepalive</name>
+		<value>true</value>
+		<description>Whether to enable TCP keepalive for the metastore server.
+			Keepalive will prevent accumulation of half-open connections.
+		</description>
+	</property>
+
+	<property>
+		<name>hive.optimize.reducededuplication</name>
+		<value>true</value>
+		<description>Remove extra map-reduce jobs if the data is already
+			clustered by the same key which needs to be used again. This should
+			always be set to true. Since it is a new feature, it has been made
+			configurable.</description>
+	</property>
+
+	<property>
+		<name>hive.exec.dynamic.partition</name>
+		<value>false</value>
+		<description>Whether or not to allow dynamic partitions in DML/DDL.
+		</description>
+	</property>
+
+	<property>
+		<name>hive.exec.dynamic.partition.mode</name>
+		<value>strict</value>
+		<description>In strict mode, the user must specify at least one static
+			partition in case the user accidentally overwrites all partitions.
+		</description>
+	</property>
+
+	<property>
+		<name>hive.exec.max.dynamic.partitions</name>
+		<value>1000</value>
+		<description>Maximum number of dynamic partitions allowed to be
+			created in total.</description>
+	</property>
+
+	<property>
+		<name>hive.exec.max.dynamic.partitions.pernode</name>
+		<value>100</value>
+		<description>Maximum number of dynamic partitions allowed to be
+			created in each mapper/reducer node.</description>
+	</property>
+
+	<property>
+		<name>hive.default.partition.name</name>
+		<value>__HIVE_DEFAULT_PARTITION__</value>
+		<description>The default partition name in case the dynamic partition
+			column value is null/empty string or anyother values that cannot be
+			escaped. This value must not contain any special character used in
+			HDFS URI (e.g., ':', '%', '/' etc). The user has to be aware that the
+			dynamic partition value should not contain this value to avoid
+			confusions.</description>
+	</property>
+
+	<property>
+		<name>fs.har.impl</name>
+		<value>org.apache.hadoop.hive.shims.HiveHarFileSystem</value>
+		<description>The implementation for accessing Hadoop Archives. Note
+			that this won't be applicable to Hadoop vers less than 0.20
+		</description>
+	</property>
+
+	<property>
+		<name>hive.archive.enabled</name>
+		<value>false</value>
+		<description>Whether archiving operations are permitted</description>
+	</property>
+
+	<property>
+		<name>hive.archive.har.parentdir.settable</name>
+		<value>false</value>
+		<description>In new Hadoop versions, the parent directory must be set
+			while
+			creating a HAR. Because this functionality is hard to detect with just
+			version
+			numbers, this conf var needs to be set manually.</description>
+	</property>
+
+	<!-- HBase Storage Handler Parameters -->
+
+	<property>
+		<name>hive.hbase.wal.enabled</name>
+		<value>true</value>
+		<description>Whether writes to HBase should be forced to the
+			write-ahead log. Disabling this improves HBase write performance at
+			the risk of lost writes in case of a crash.</description>
+	</property>
+
+</configuration>
diff --git a/hivesterix/hivesterix-dist/src/main/resources/conf/hive-log4j.properties b/hivesterix/hivesterix-dist/src/main/resources/conf/hive-log4j.properties
new file mode 100644
index 0000000..784a274
--- /dev/null
+++ b/hivesterix/hivesterix-dist/src/main/resources/conf/hive-log4j.properties
@@ -0,0 +1,58 @@
+#------------------------------------------------------------------------------
+#
+#  The following properties set the logging levels and log appender.  The
+#  log4j.rootCategory variable defines the default log level and one or more
+#  appenders.  For the console, use 'S'.  For the daily rolling file, use 'R'.
+#  For an HTML formatted log, use 'H'.
+#
+#  To override the default (rootCategory) log level, define a property of the
+#  form (see below for available values):
+#
+#        log4j.logger. =
+#
+#    Available logger names:
+#      TODO
+#
+#    Possible Log Levels:
+#      FATAL, ERROR, WARN, INFO, DEBUG
+#
+#------------------------------------------------------------------------------
+log4j.rootCategory=INFO, S
+
+log4j.logger.com.dappit.Dapper.parser=ERROR
+log4j.logger.org.w3c.tidy=FATAL
+
+#------------------------------------------------------------------------------
+#
+#  The following properties configure the console (stdout) appender.
+#  See http://logging.apache.org/log4j/docs/api/index.html for details.
+#
+#------------------------------------------------------------------------------
+log4j.appender.S = org.apache.log4j.ConsoleAppender
+log4j.appender.S.layout = org.apache.log4j.PatternLayout
+log4j.appender.S.layout.ConversionPattern = %d{yyyy-MM-dd HH:mm:ss} %c{1} [%p] %m%n
+
+#------------------------------------------------------------------------------
+#
+#  The following properties configure the Daily Rolling File appender.
+#  See http://logging.apache.org/log4j/docs/api/index.html for details.
+#
+#------------------------------------------------------------------------------
+log4j.appender.R = org.apache.log4j.DailyRollingFileAppender
+log4j.appender.R.File = logs/bensApps.log
+log4j.appender.R.Append = true
+log4j.appender.R.DatePattern = '.'yyy-MM-dd
+log4j.appender.R.layout = org.apache.log4j.PatternLayout
+log4j.appender.R.layout.ConversionPattern = %d{yyyy-MM-dd HH:mm:ss} %c{1} [%p] %m%n
+
+#------------------------------------------------------------------------------
+#
+#  The following properties configure the Rolling File appender in HTML.
+#  See http://logging.apache.org/log4j/docs/api/index.html for details.
+#
+#------------------------------------------------------------------------------
+log4j.appender.H = org.apache.log4j.RollingFileAppender
+log4j.appender.H.File = logs/bensApps.html
+log4j.appender.H.MaxFileSize = 100KB
+log4j.appender.H.Append = false
+log4j.appender.H.layout = org.apache.log4j.HTMLLayout
diff --git a/hivesterix/hivesterix-dist/src/main/resources/scripts/startCluster.sh b/hivesterix/hivesterix-dist/src/main/resources/scripts/startCluster.sh
index a0c2063..6aa9161 100644
--- a/hivesterix/hivesterix-dist/src/main/resources/scripts/startCluster.sh
+++ b/hivesterix/hivesterix-dist/src/main/resources/scripts/startCluster.sh
@@ -1,3 +1,19 @@
 bin/startcc.sh
 sleep 5
 bin/startAllNCs.sh
+
+. conf/cluster.properties
+# do we need to specify the version somewhere?
+hyrackcmd=`ls ${HYRACKS_HOME}/hyracks-cli/target/hyracks-cli-*-binary-assembly/bin/hyrackscli`
+# find zip file
+appzip=`ls $PWD/../hivesterix-dist-*-binary-assembly.zip`
+
+[ -f $hyrackcmd ] || { echo "Hyracks commandline is missing"; exit -1;}
+[ -f $appzip ] || { echo "Genomix binary-assembly.zip is missing"; exit -1;}
+
+CCHOST_NAME=`cat conf/master`
+
+IPADDR=`bin/getip.sh`
+echo "connect to \"${IPADDR}:${CC_CLIENTPORT}\"; create application hivesterix \"$appzip\";" | $hyrackcmd 
+echo ""
+
diff --git a/hivesterix/hivesterix-dist/src/test/java/edu/uci/ics/hivesterix/test/base/AbstractTestSuiteClass.java b/hivesterix/hivesterix-dist/src/test/java/edu/uci/ics/hivesterix/test/base/AbstractTestSuiteClass.java
index 50d8529..e455527 100644
--- a/hivesterix/hivesterix-dist/src/test/java/edu/uci/ics/hivesterix/test/base/AbstractTestSuiteClass.java
+++ b/hivesterix/hivesterix-dist/src/test/java/edu/uci/ics/hivesterix/test/base/AbstractTestSuiteClass.java
@@ -1,15 +1,20 @@
 package edu.uci.ics.hivesterix.test.base;

 

 import java.io.BufferedReader;

+import java.io.DataInputStream;

 import java.io.File;

+import java.io.FileInputStream;

 import java.io.FileNotFoundException;

 import java.io.FileReader;

 import java.io.IOException;

+import java.io.InputStream;

+import java.io.InputStreamReader;

 import java.util.ArrayList;

 import java.util.HashMap;

 import java.util.Iterator;

 import java.util.List;

 import java.util.Map;

+import java.util.Properties;

 

 import junit.framework.TestSuite;

 

@@ -39,6 +44,9 @@
     private static final String PATH_TO_CLUSTER_CONF = "src/test/resources/runtimefunctionts/hive/conf/topology.xml";

     private static final String PATH_TO_DATA = "src/test/resources/runtimefunctionts/data/";

 

+    private static final String clusterPropertiesPath = "conf/cluster.properties";

+    private Properties clusterProps;

+

     private MiniDFSCluster dfsCluster;

     private MiniMRCluster mrCluster;

 

@@ -92,10 +100,23 @@
         HiveConf hconf = new HiveConf(SessionState.class);

         hconf.addResource(new Path(PATH_TO_HIVE_CONF));

         SessionState.start(hconf);

-        String ipAddress = hconf.get("hive.hyracks.host");

-        int clientPort = Integer.parseInt(hconf.get("hive.hyracks.port"));

-        int netPort = clientPort + 1;

-        String applicationName = hconf.get("hive.hyracks.app");

+        /**

+         * load the properties file if it is not loaded

+         */

+        if (clusterProps == null) {

+            clusterProps = new Properties();

+            InputStream confIn = new FileInputStream(clusterPropertiesPath);

+            clusterProps.load(confIn);

+            confIn.close();

+        }

+        Process process = Runtime.getRuntime().exec("src/main/resources/scripts/getip.sh");

+        BufferedReader ipReader = new BufferedReader(new InputStreamReader(

+                new DataInputStream(process.getInputStream())));

+        String ipAddress = ipReader.readLine();

+        ipReader.close();

+        int clientPort = Integer.parseInt(clusterProps.getProperty("CC_CLIENTPORT"));

+        int netPort = Integer.parseInt(clusterProps.getProperty("CC_CLUSTERPORT"));

+        String applicationName = "hivesterix";

 

         // start hyracks cc

         CCConfig ccConfig = new CCConfig();

diff --git a/hivesterix/hivesterix-optimizer/src/test/java/edu/uci/ics/hyracks/AppTest.java b/hivesterix/hivesterix-optimizer/src/test/java/edu/uci/ics/hyracks/AppTest.java
deleted file mode 100644
index 0c701c8..0000000
--- a/hivesterix/hivesterix-optimizer/src/test/java/edu/uci/ics/hyracks/AppTest.java
+++ /dev/null
@@ -1,38 +0,0 @@
-package edu.uci.ics.hyracks;
-
-import junit.framework.Test;
-import junit.framework.TestCase;
-import junit.framework.TestSuite;
-
-/**
- * Unit test for simple App.
- */
-public class AppTest 
-    extends TestCase
-{
-    /**
-     * Create the test case
-     *
-     * @param testName name of the test case
-     */
-    public AppTest( String testName )
-    {
-        super( testName );
-    }
-
-    /**
-     * @return the suite of tests being tested
-     */
-    public static Test suite()
-    {
-        return new TestSuite( AppTest.class );
-    }
-
-    /**
-     * Rigourous Test :-)
-     */
-    public void testApp()
-    {
-        assertTrue( true );
-    }
-}