merged from fullstack_asterix_stabilization to fullstack_lsm_staging -r3100:3171

git-svn-id: https://hyracks.googlecode.com/svn/branches/fullstack_lsm_staging@3173 123451ca-8445-de46-9d55-352943316053
diff --git a/hivesterix-common/src/main/java/edu/uci/ics/hivesterix/common/config/ConfUtil.java b/hivesterix-common/src/main/java/edu/uci/ics/hivesterix/common/config/ConfUtil.java
index 666d361..648deb6 100644
--- a/hivesterix-common/src/main/java/edu/uci/ics/hivesterix/common/config/ConfUtil.java
+++ b/hivesterix-common/src/main/java/edu/uci/ics/hivesterix/common/config/ConfUtil.java
@@ -1,7 +1,6 @@
 package edu.uci.ics.hivesterix.common.config;

 

 import java.io.BufferedReader;

-import java.io.DataInputStream;

 import java.io.FileInputStream;

 import java.io.InputStream;

 import java.io.InputStreamReader;

@@ -28,6 +27,7 @@
 @SuppressWarnings({ "rawtypes", "deprecation" })

 public class ConfUtil {

     private static final String clusterPropertiesPath = "conf/cluster.properties";

+    private static final String masterFilePath = "conf/master";

 

     private static JobConf job;

     private static HiveConf hconf;

@@ -134,15 +134,27 @@
                 clusterProps.load(confIn);

                 confIn.close();

             }

-            Process process = Runtime.getRuntime().exec("src/main/resources/scripts/getip.sh");

-            BufferedReader ipReader = new BufferedReader(new InputStreamReader(new DataInputStream(

-                    process.getInputStream())));

-            String ipAddress = ipReader.readLine();

-            ipReader.close();

-            int port = Integer.parseInt(clusterProps.getProperty("CC_CLIENTPORT"));

-            int mpl = Integer.parseInt(hconf.get("hive.hyracks.parrallelism"));

 

-            hcc = new HyracksConnection(ipAddress, port);

+            if (hcc == null) {

+                BufferedReader ipReader = new BufferedReader(new InputStreamReader(new FileInputStream(masterFilePath)));

+                String masterNode = ipReader.readLine();

+                ipReader.close();

+

+                InetAddress[] ips = InetAddress.getAllByName(masterNode);

+                int port = Integer.parseInt(clusterProps.getProperty("CC_CLIENTPORT"));

+                for (InetAddress ip : ips) {

+                    if (ip.getAddress().length <= 4) {

+                        try {

+                            hcc = new HyracksConnection(ip.getHostAddress(), port);

+                            break;

+                        } catch (Exception e) {

+                            continue;

+                        }

+                    }

+                }

+            }

+

+            int mpl = Integer.parseInt(hconf.get("hive.hyracks.parrallelism"));

             topology = hcc.getClusterTopology();

             ncNameToNcInfos = hcc.getNodeControllerInfos();

             NCs = new String[ncNameToNcInfos.size() * mpl];

diff --git a/hivesterix-dist/src/main/java/edu/uci/ics/hivesterix/runtime/exec/HyracksExecutionEngine.java b/hivesterix-dist/src/main/java/edu/uci/ics/hivesterix/runtime/exec/HyracksExecutionEngine.java
index c39485f..53c9d42 100644
--- a/hivesterix-dist/src/main/java/edu/uci/ics/hivesterix/runtime/exec/HyracksExecutionEngine.java
+++ b/hivesterix-dist/src/main/java/edu/uci/ics/hivesterix/runtime/exec/HyracksExecutionEngine.java
@@ -1,12 +1,12 @@
 package edu.uci.ics.hivesterix.runtime.exec;

 

 import java.io.BufferedReader;

-import java.io.DataInputStream;

 import java.io.FileInputStream;

 import java.io.InputStream;

 import java.io.InputStreamReader;

 import java.io.PrintWriter;

 import java.io.Serializable;

+import java.net.InetAddress;

 import java.util.ArrayList;

 import java.util.HashMap;

 import java.util.Iterator;

@@ -80,6 +80,7 @@
 

     private static final Log LOG = LogFactory.getLog(HyracksExecutionEngine.class.getName());

     private static final String clusterPropertiesPath = "conf/cluster.properties";

+    private static final String masterFilePath = "conf/master";

 

     private static List<Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>> DEFAULT_LOGICAL_REWRITES = new ArrayList<Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>>();

     private static List<Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>> DEFAULT_PHYSICAL_REWRITES = new ArrayList<Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>>();

@@ -151,6 +152,11 @@
      */

     private Properties clusterProps;

 

+    /**

+     * the Hyracks client connection

+     */

+    private IHyracksClientConnection hcc;

+

     public HyracksExecutionEngine(HiveConf conf) {

         this.conf = conf;

         init(conf);

@@ -533,17 +539,27 @@
             confIn.close();

         }

 

-        Process process = Runtime.getRuntime().exec("src/main/resources/scripts/getip.sh");

-        BufferedReader ipReader = new BufferedReader(new InputStreamReader(

-                new DataInputStream(process.getInputStream())));

-        String ipAddress = ipReader.readLine();

-        ipReader.close();

-        int port = Integer.parseInt(clusterProps.getProperty("CC_CLIENTPORT"));

-        String applicationName = "hivesterix";

+        if (hcc == null) {

+            BufferedReader ipReader = new BufferedReader(new InputStreamReader(new FileInputStream(masterFilePath)));

+            String masterNode = ipReader.readLine();

+            ipReader.close();

 

-        IHyracksClientConnection hcc = new HyracksConnection(ipAddress, port);

+            InetAddress[] ips = InetAddress.getAllByName(masterNode);

+            int port = Integer.parseInt(clusterProps.getProperty("CC_CLIENTPORT"));

+            for (InetAddress ip : ips) {

+                if (ip.getAddress().length <= 4) {

+                    try {

+                        hcc = new HyracksConnection(ip.getHostAddress(), port);

+                        break;

+                    } catch (Exception e) {

+                        continue;

+                    }

+                }

+            }

+        }

+

         long start = System.currentTimeMillis();

-        JobId jobId = hcc.startJob(applicationName, job);

+        JobId jobId = hcc.startJob(job);

         hcc.waitForCompletion(jobId);

 

         // System.out.println("job finished: " + jobId.toString());

diff --git a/hivesterix-dist/src/main/java/org/apache/hadoop/hive/serde2/lazy/objectinspector/LazyObjectInspectorFactory.java b/hivesterix-dist/src/main/java/org/apache/hadoop/hive/serde2/lazy/objectinspector/LazyObjectInspectorFactory.java
new file mode 100644
index 0000000..7920001
--- /dev/null
+++ b/hivesterix-dist/src/main/java/org/apache/hadoop/hive/serde2/lazy/objectinspector/LazyObjectInspectorFactory.java
@@ -0,0 +1,123 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.serde2.lazy.objectinspector;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.io.Text;
+
+/**
+ * ObjectInspectorFactory is the primary way to create new ObjectInspector
+ * instances.
+ * SerDe classes should call the static functions in this library to create an
+ * ObjectInspector to return to the caller of SerDe2.getObjectInspector().
+ * The reason of having caches here is that ObjectInspectors do not have an
+ * internal state - so ObjectInspectors with the same construction parameters
+ * should result in exactly the same ObjectInspector.
+ */
+public final class LazyObjectInspectorFactory {
+
+    static ConcurrentHashMap<ArrayList<Object>, LazySimpleStructObjectInspector> cachedLazySimpleStructObjectInspector = new ConcurrentHashMap<ArrayList<Object>, LazySimpleStructObjectInspector>();
+
+    public static LazySimpleStructObjectInspector getLazySimpleStructObjectInspector(List<String> structFieldNames,
+            List<ObjectInspector> structFieldObjectInspectors, byte separator, Text nullSequence,
+            boolean lastColumnTakesRest, boolean escaped, byte escapeChar) {
+        ArrayList<Object> signature = new ArrayList<Object>();
+        signature.add(structFieldNames);
+        signature.add(structFieldObjectInspectors);
+        signature.add(Byte.valueOf(separator));
+        signature.add(nullSequence.toString());
+        signature.add(Boolean.valueOf(lastColumnTakesRest));
+        signature.add(Boolean.valueOf(escaped));
+        signature.add(Byte.valueOf(escapeChar));
+        LazySimpleStructObjectInspector result = cachedLazySimpleStructObjectInspector.get(signature);
+        if (result == null) {
+            result = new LazySimpleStructObjectInspector(structFieldNames, structFieldObjectInspectors, separator,
+                    nullSequence, lastColumnTakesRest, escaped, escapeChar);
+            cachedLazySimpleStructObjectInspector.put(signature, result);
+        }
+        return result;
+    }
+
+    static ConcurrentHashMap<ArrayList<Object>, LazyListObjectInspector> cachedLazySimpleListObjectInspector = new ConcurrentHashMap<ArrayList<Object>, LazyListObjectInspector>();
+
+    public static LazyListObjectInspector getLazySimpleListObjectInspector(ObjectInspector listElementObjectInspector,
+            byte separator, Text nullSequence, boolean escaped, byte escapeChar) {
+        ArrayList<Object> signature = new ArrayList<Object>();
+        signature.add(listElementObjectInspector);
+        signature.add(Byte.valueOf(separator));
+        signature.add(nullSequence.toString());
+        signature.add(Boolean.valueOf(escaped));
+        signature.add(Byte.valueOf(escapeChar));
+        LazyListObjectInspector result = cachedLazySimpleListObjectInspector.get(signature);
+        if (result == null) {
+            result = new LazyListObjectInspector(listElementObjectInspector, separator, nullSequence, escaped,
+                    escapeChar);
+            cachedLazySimpleListObjectInspector.put(signature, result);
+        }
+        return result;
+    }
+
+    static ConcurrentHashMap<ArrayList<Object>, LazyMapObjectInspector> cachedLazySimpleMapObjectInspector = new ConcurrentHashMap<ArrayList<Object>, LazyMapObjectInspector>();
+
+    public static LazyMapObjectInspector getLazySimpleMapObjectInspector(ObjectInspector mapKeyObjectInspector,
+            ObjectInspector mapValueObjectInspector, byte itemSeparator, byte keyValueSeparator, Text nullSequence,
+            boolean escaped, byte escapeChar) {
+        ArrayList<Object> signature = new ArrayList<Object>();
+        signature.add(mapKeyObjectInspector);
+        signature.add(mapValueObjectInspector);
+        signature.add(Byte.valueOf(itemSeparator));
+        signature.add(Byte.valueOf(keyValueSeparator));
+        signature.add(nullSequence.toString());
+        signature.add(Boolean.valueOf(escaped));
+        signature.add(Byte.valueOf(escapeChar));
+        LazyMapObjectInspector result = cachedLazySimpleMapObjectInspector.get(signature);
+        if (result == null) {
+            result = new LazyMapObjectInspector(mapKeyObjectInspector, mapValueObjectInspector, itemSeparator,
+                    keyValueSeparator, nullSequence, escaped, escapeChar);
+            cachedLazySimpleMapObjectInspector.put(signature, result);
+        }
+        return result;
+    }
+
+    static ConcurrentHashMap<List<Object>, LazyUnionObjectInspector> cachedLazyUnionObjectInspector = new ConcurrentHashMap<List<Object>, LazyUnionObjectInspector>();
+
+    public static LazyUnionObjectInspector getLazyUnionObjectInspector(List<ObjectInspector> ois, byte separator,
+            Text nullSequence, boolean escaped, byte escapeChar) {
+        List<Object> signature = new ArrayList<Object>();
+        signature.add(ois);
+        signature.add(Byte.valueOf(separator));
+        signature.add(nullSequence.toString());
+        signature.add(Boolean.valueOf(escaped));
+        signature.add(Byte.valueOf(escapeChar));
+        LazyUnionObjectInspector result = cachedLazyUnionObjectInspector.get(signature);
+        if (result == null) {
+            result = new LazyUnionObjectInspector(ois, separator, nullSequence, escaped, escapeChar);
+            cachedLazyUnionObjectInspector.put(signature, result);
+        }
+        return result;
+    }
+
+    private LazyObjectInspectorFactory() {
+        // prevent instantiation
+    }
+}
\ No newline at end of file
diff --git a/hivesterix-dist/src/main/java/org/apache/hadoop/hive/serde2/typeinfo/TypeInfoFactory.java b/hivesterix-dist/src/main/java/org/apache/hadoop/hive/serde2/typeinfo/TypeInfoFactory.java
new file mode 100644
index 0000000..95b999e
--- /dev/null
+++ b/hivesterix-dist/src/main/java/org/apache/hadoop/hive/serde2/typeinfo/TypeInfoFactory.java
@@ -0,0 +1,128 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.serde2.typeinfo;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.hadoop.hive.serde.Constants;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorUtils;
+
+/**
+ * TypeInfoFactory can be used to create the TypeInfo object for any types.
+ * TypeInfo objects are all read-only so we can reuse them easily.
+ * TypeInfoFactory has internal cache to make sure we don't create 2 TypeInfo
+ * objects that represents the same type.
+ */
+public final class TypeInfoFactory {
+
+    static ConcurrentHashMap<String, TypeInfo> cachedPrimitiveTypeInfo = new ConcurrentHashMap<String, TypeInfo>();
+
+    private TypeInfoFactory() {
+        // prevent instantiation
+    }
+
+    public static TypeInfo getPrimitiveTypeInfo(String typeName) {
+        if (null == PrimitiveObjectInspectorUtils.getTypeEntryFromTypeName(typeName)) {
+            throw new RuntimeException("Cannot getPrimitiveTypeInfo for " + typeName);
+        }
+        TypeInfo result = cachedPrimitiveTypeInfo.get(typeName);
+        if (result == null) {
+            result = new PrimitiveTypeInfo(typeName);
+            cachedPrimitiveTypeInfo.put(typeName, result);
+        }
+        return result;
+    }
+
+    public static final TypeInfo voidTypeInfo = getPrimitiveTypeInfo(Constants.VOID_TYPE_NAME);
+    public static final TypeInfo booleanTypeInfo = getPrimitiveTypeInfo(Constants.BOOLEAN_TYPE_NAME);
+    public static final TypeInfo intTypeInfo = getPrimitiveTypeInfo(Constants.INT_TYPE_NAME);
+    public static final TypeInfo longTypeInfo = getPrimitiveTypeInfo(Constants.BIGINT_TYPE_NAME);
+    public static final TypeInfo stringTypeInfo = getPrimitiveTypeInfo(Constants.STRING_TYPE_NAME);
+    public static final TypeInfo floatTypeInfo = getPrimitiveTypeInfo(Constants.FLOAT_TYPE_NAME);
+    public static final TypeInfo doubleTypeInfo = getPrimitiveTypeInfo(Constants.DOUBLE_TYPE_NAME);
+    public static final TypeInfo byteTypeInfo = getPrimitiveTypeInfo(Constants.TINYINT_TYPE_NAME);
+    public static final TypeInfo shortTypeInfo = getPrimitiveTypeInfo(Constants.SMALLINT_TYPE_NAME);
+
+    public static final TypeInfo unknownTypeInfo = getPrimitiveTypeInfo("unknown");
+
+    public static TypeInfo getPrimitiveTypeInfoFromPrimitiveWritable(Class<?> clazz) {
+        String typeName = PrimitiveObjectInspectorUtils.getTypeNameFromPrimitiveWritable(clazz);
+        if (typeName == null) {
+            throw new RuntimeException("Internal error: Cannot get typeName for " + clazz);
+        }
+        return getPrimitiveTypeInfo(typeName);
+    }
+
+    public static TypeInfo getPrimitiveTypeInfoFromJavaPrimitive(Class<?> clazz) {
+        return getPrimitiveTypeInfo(PrimitiveObjectInspectorUtils.getTypeNameFromPrimitiveJava(clazz));
+    }
+
+    static ConcurrentHashMap<ArrayList<List<?>>, TypeInfo> cachedStructTypeInfo = new ConcurrentHashMap<ArrayList<List<?>>, TypeInfo>();
+
+    public static TypeInfo getStructTypeInfo(List<String> names, List<TypeInfo> typeInfos) {
+        ArrayList<List<?>> signature = new ArrayList<List<?>>(2);
+        signature.add(names);
+        signature.add(typeInfos);
+        TypeInfo result = cachedStructTypeInfo.get(signature);
+        if (result == null) {
+            result = new StructTypeInfo(names, typeInfos);
+            cachedStructTypeInfo.put(signature, result);
+        }
+        return result;
+    }
+
+    static ConcurrentHashMap<List<?>, TypeInfo> cachedUnionTypeInfo = new ConcurrentHashMap<List<?>, TypeInfo>();
+
+    public static TypeInfo getUnionTypeInfo(List<TypeInfo> typeInfos) {
+        TypeInfo result = cachedUnionTypeInfo.get(typeInfos);
+        if (result == null) {
+            result = new UnionTypeInfo(typeInfos);
+            cachedUnionTypeInfo.put(typeInfos, result);
+        }
+        return result;
+    }
+
+    static ConcurrentHashMap<TypeInfo, TypeInfo> cachedListTypeInfo = new ConcurrentHashMap<TypeInfo, TypeInfo>();
+
+    public static TypeInfo getListTypeInfo(TypeInfo elementTypeInfo) {
+        TypeInfo result = cachedListTypeInfo.get(elementTypeInfo);
+        if (result == null) {
+            result = new ListTypeInfo(elementTypeInfo);
+            cachedListTypeInfo.put(elementTypeInfo, result);
+        }
+        return result;
+    }
+
+    static ConcurrentHashMap<ArrayList<TypeInfo>, TypeInfo> cachedMapTypeInfo = new ConcurrentHashMap<ArrayList<TypeInfo>, TypeInfo>();
+
+    public static TypeInfo getMapTypeInfo(TypeInfo keyTypeInfo, TypeInfo valueTypeInfo) {
+        ArrayList<TypeInfo> signature = new ArrayList<TypeInfo>(2);
+        signature.add(keyTypeInfo);
+        signature.add(valueTypeInfo);
+        TypeInfo result = cachedMapTypeInfo.get(signature);
+        if (result == null) {
+            result = new MapTypeInfo(keyTypeInfo, valueTypeInfo);
+            cachedMapTypeInfo.put(signature, result);
+        }
+        return result;
+    };
+
+}
\ No newline at end of file
diff --git a/hivesterix-dist/src/main/resources/scripts/getip.sh b/hivesterix-dist/src/main/resources/scripts/getip.sh
index e0cdf73..8c9ae76 100755
--- a/hivesterix-dist/src/main/resources/scripts/getip.sh
+++ b/hivesterix-dist/src/main/resources/scripts/getip.sh
@@ -8,6 +8,10 @@
         IPADDR=`/sbin/ifconfig eth0 | grep "inet " | awk '{print $2}' | cut -f 2 -d ':'`
 	if [ "$IPADDR" = "" ]
         then
+                IPADDR=`/sbin/ifconfig em1 | grep "inet " | awk '{print $2}' | cut -f 2 -d ':'`
+        fi
+	if [ "$IPADDR" = "" ]
+        then
 		IPADDR=`/sbin/ifconfig lo | grep "inet " | awk '{print $2}' | cut -f 2 -d ':'`
         fi 
 else
diff --git a/hivesterix-dist/src/main/resources/scripts/startcc.sh b/hivesterix-dist/src/main/resources/scripts/startcc.sh
index fe2551d..efb79ce 100644
--- a/hivesterix-dist/src/main/resources/scripts/startcc.sh
+++ b/hivesterix-dist/src/main/resources/scripts/startcc.sh
@@ -22,4 +22,4 @@
 
 #Launch hyracks cc script
 chmod -R 755 $HYRACKS_HOME
-$HYRACKS_HOME/hyracks-server/target/appassembler/bin/hyrackscc -client-net-ip-address $CCHOST -cluster-net-ip-address $CCHOST -client-net-port $CC_CLIENTPORT -cluster-net-port $CC_CLUSTERPORT -max-heartbeat-lapse-periods 999999 -default-max-job-attempts 0 -job-history-size 3 &> $CCLOGS_DIR/cc.log &
+$HYRACKS_HOME/hyracks-server/target/appassembler/bin/hyrackscc -client-net-ip-address $CCHOST -cluster-net-ip-address $CCHOST -client-net-port $CC_CLIENTPORT -cluster-net-port $CC_CLUSTERPORT -max-heartbeat-lapse-periods 999999 -default-max-job-attempts 0 -job-history-size 0 &> $CCLOGS_DIR/cc.log &
diff --git a/hivesterix-dist/src/main/resources/scripts/startnc.sh b/hivesterix-dist/src/main/resources/scripts/startnc.sh
index 6e0f90e..23a4c36 100644
--- a/hivesterix-dist/src/main/resources/scripts/startnc.sh
+++ b/hivesterix-dist/src/main/resources/scripts/startnc.sh
@@ -46,4 +46,4 @@
 cd $NCTMP_DIR
 
 #Launch hyracks nc
-$HYRACKS_HOME/hyracks-server/target/appassembler/bin/hyracksnc -cc-host $CCHOST -cc-port $CC_CLUSTERPORT -cluster-net-ip-address $IPADDR  -data-ip-address $IPADDR -node-id $NODEID -iodevices "${IO_DIRS}" &> $NCLOGS_DIR/$NODEID.log &
+$HYRACKS_HOME/hyracks-server/target/appassembler/bin/hyracksnc -cc-host $CCHOST -cc-port $CC_CLUSTERPORT -cluster-net-ip-address $IPADDR  -data-ip-address $IPADDR -result-ip-address $IPADDR -node-id $NODEID -iodevices "${IO_DIRS}" &> $NCLOGS_DIR/$NODEID.log &
diff --git a/hivesterix-dist/src/main/resources/scripts/stopnc.sh b/hivesterix-dist/src/main/resources/scripts/stopnc.sh
index 03ce4e7..35c4794 100644
--- a/hivesterix-dist/src/main/resources/scripts/stopnc.sh
+++ b/hivesterix-dist/src/main/resources/scripts/stopnc.sh
@@ -5,6 +5,10 @@
 PID=`ps -ef|grep ${USER}|grep java|grep 'Dapp.name=hyracksnc'|awk '{print $2}'`
 
 if [ "$PID" == "" ]; then
+  PID=`ps -ef|grep ${USER}|grep java|grep 'hyracks'|awk '{print $2}'`
+fi
+
+if [ "$PID" == "" ]; then
   USERID=`id | sed 's/^uid=//;s/(.*$//'`
   PID=`ps -ef|grep ${USERID}|grep java|grep 'Dapp.name=hyracksnc'|awk '{print $2}'`
 fi
diff --git a/hivesterix-dist/src/test/java/edu/uci/ics/hivesterix/perf/base/AbstractPerfTestSuiteClass.java b/hivesterix-dist/src/test/java/edu/uci/ics/hivesterix/perf/base/AbstractPerfTestSuiteClass.java
index d9492d9..393378f 100644
--- a/hivesterix-dist/src/test/java/edu/uci/ics/hivesterix/perf/base/AbstractPerfTestSuiteClass.java
+++ b/hivesterix-dist/src/test/java/edu/uci/ics/hivesterix/perf/base/AbstractPerfTestSuiteClass.java
@@ -22,8 +22,6 @@
 import org.apache.hadoop.mapred.MiniMRCluster;

 

 import edu.uci.ics.hivesterix.common.config.ConfUtil;

-import edu.uci.ics.hyracks.api.client.HyracksConnection;

-import edu.uci.ics.hyracks.api.client.IHyracksClientConnection;

 import edu.uci.ics.hyracks.control.cc.ClusterControllerService;

 import edu.uci.ics.hyracks.control.common.controllers.CCConfig;

 import edu.uci.ics.hyracks.control.common.controllers.NCConfig;

@@ -93,7 +91,6 @@
         String ipAddress = hconf.get("hive.hyracks.host");

         int clientPort = Integer.parseInt(hconf.get("hive.hyracks.port"));

         int clusterPort = clientPort;

-        String applicationName = hconf.get("hive.hyracks.app");

 

         // start hyracks cc

         CCConfig ccConfig = new CCConfig();

@@ -113,14 +110,12 @@
             ncConfig.clusterNetIPAddress = ipAddress;

             ncConfig.ccPort = clientPort;

             ncConfig.dataIPAddress = "127.0.0.1";

+            ncConfig.datasetIPAddress = "127.0.0.1";

             ncConfig.nodeId = "nc" + i;

             NodeControllerService nc = new NodeControllerService(ncConfig);

             nc.start();

             ncs.put(ncConfig.nodeId, nc);

         }

-

-        IHyracksClientConnection hcc = new HyracksConnection(ccConfig.clientNetIpAddress, clientPort);

-        hcc.createApplication(applicationName, null);

     }

 

     protected void makeDir(String path) throws IOException {

diff --git a/hivesterix-dist/src/test/java/edu/uci/ics/hivesterix/test/base/AbstractTestSuiteClass.java b/hivesterix-dist/src/test/java/edu/uci/ics/hivesterix/test/base/AbstractTestSuiteClass.java
index e455527..72c406f 100644
--- a/hivesterix-dist/src/test/java/edu/uci/ics/hivesterix/test/base/AbstractTestSuiteClass.java
+++ b/hivesterix-dist/src/test/java/edu/uci/ics/hivesterix/test/base/AbstractTestSuiteClass.java
@@ -1,7 +1,6 @@
 package edu.uci.ics.hivesterix.test.base;

 

 import java.io.BufferedReader;

-import java.io.DataInputStream;

 import java.io.File;

 import java.io.FileInputStream;

 import java.io.FileNotFoundException;

@@ -9,6 +8,7 @@
 import java.io.IOException;

 import java.io.InputStream;

 import java.io.InputStreamReader;

+import java.net.InetAddress;

 import java.util.ArrayList;

 import java.util.HashMap;

 import java.util.Iterator;

@@ -28,8 +28,6 @@
 import org.apache.hadoop.mapred.MiniMRCluster;

 

 import edu.uci.ics.hivesterix.common.config.ConfUtil;

-import edu.uci.ics.hyracks.api.client.HyracksConnection;

-import edu.uci.ics.hyracks.api.client.IHyracksClientConnection;

 import edu.uci.ics.hyracks.control.cc.ClusterControllerService;

 import edu.uci.ics.hyracks.control.common.controllers.CCConfig;

 import edu.uci.ics.hyracks.control.common.controllers.NCConfig;

@@ -45,8 +43,9 @@
     private static final String PATH_TO_DATA = "src/test/resources/runtimefunctionts/data/";

 

     private static final String clusterPropertiesPath = "conf/cluster.properties";

-    private Properties clusterProps;

+    private static final String masterFilePath = "conf/master";

 

+    private Properties clusterProps;

     private MiniDFSCluster dfsCluster;

     private MiniMRCluster mrCluster;

 

@@ -109,14 +108,18 @@
             clusterProps.load(confIn);

             confIn.close();

         }

-        Process process = Runtime.getRuntime().exec("src/main/resources/scripts/getip.sh");

-        BufferedReader ipReader = new BufferedReader(new InputStreamReader(

-                new DataInputStream(process.getInputStream())));

-        String ipAddress = ipReader.readLine();

+        BufferedReader ipReader = new BufferedReader(new InputStreamReader(new FileInputStream(masterFilePath)));

+        String masterNode = ipReader.readLine();

         ipReader.close();

+        InetAddress[] ips = InetAddress.getAllByName(masterNode);

+        String ipAddress = null;

+        for (InetAddress ip : ips) {

+            if (ip.getAddress().length <= 4) {

+                ipAddress = ip.getHostAddress();

+            }

+        }

         int clientPort = Integer.parseInt(clusterProps.getProperty("CC_CLIENTPORT"));

         int netPort = Integer.parseInt(clusterProps.getProperty("CC_CLUSTERPORT"));

-        String applicationName = "hivesterix";

 

         // start hyracks cc

         CCConfig ccConfig = new CCConfig();

@@ -137,14 +140,12 @@
             ncConfig.clusterNetIPAddress = ipAddress;

             ncConfig.ccPort = netPort;

             ncConfig.dataIPAddress = "127.0.0.1";

+            ncConfig.datasetIPAddress = "127.0.0.1";

             ncConfig.nodeId = "nc" + i;

             NodeControllerService nc = new NodeControllerService(ncConfig);

             nc.start();

             ncs.put(ncConfig.nodeId, nc);

         }

-

-        IHyracksClientConnection hcc = new HyracksConnection(ccConfig.clientNetIpAddress, clientPort);

-        hcc.createApplication(applicationName, null);

     }

 

     protected void makeDir(String path) throws IOException {

diff --git a/hivesterix-dist/src/test/resources/runtimefunctionts/hive/conf/topology.xml b/hivesterix-dist/src/test/resources/runtimefunctionts/hive/conf/topology.xml
index 4aac091..4710706 100644
--- a/hivesterix-dist/src/test/resources/runtimefunctionts/hive/conf/topology.xml
+++ b/hivesterix-dist/src/test/resources/runtimefunctionts/hive/conf/topology.xml
@@ -1,12 +1,7 @@
 <cluster-topology>
-    <network-switch name="switch1">
-        <network-switch name="switch2">
-            <terminal name="nc0"/>
-            <terminal name="nc3"/>
-        </network-switch>
-        <network-switch name="switch3">
-            <terminal name="nc1"/>
-            <terminal name="nc4"/>
+    <network-switch name="Global">
+        <network-switch name="local">
+            <terminal name="127.0.0.1"/>
         </network-switch>
     </network-switch>
 </cluster-topology>
\ No newline at end of file
diff --git a/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/jobgen/HiveMetaDataProvider.java b/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/jobgen/HiveMetaDataProvider.java
index 4dfb1da..fc046fb 100644
--- a/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/jobgen/HiveMetaDataProvider.java
+++ b/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/jobgen/HiveMetaDataProvider.java
@@ -95,6 +95,13 @@
     }

 

     @Override

+    public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getResultHandleRuntime(IDataSink sink,

+            int[] printColumns, IPrinterFactory[] printerFactories, RecordDescriptor inputDesc, boolean ordered,

+            JobSpecification spec) throws AlgebricksException {

+        return null;

+    }

+

+    @Override

     public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getWriteResultRuntime(IDataSource<S> arg0,

             IOperatorSchema arg1, List<LogicalVariable> arg2, LogicalVariable arg3, JobGenContext arg4,

             JobSpecification arg5) throws AlgebricksException {

diff --git a/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/jobgen/HiveScanRuntimeGenerator.java b/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/jobgen/HiveScanRuntimeGenerator.java
index 0d2c78a..5e4e21e 100644
--- a/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/jobgen/HiveScanRuntimeGenerator.java
+++ b/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/jobgen/HiveScanRuntimeGenerator.java
@@ -25,6 +25,7 @@
 import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;

 import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;

 import edu.uci.ics.hyracks.api.job.JobSpecification;

+import edu.uci.ics.hyracks.api.topology.ClusterTopology;

 import edu.uci.ics.hyracks.hdfs.dataflow.HDFSReadOperatorDescriptor;

 import edu.uci.ics.hyracks.hdfs.scheduler.Scheduler;

 

@@ -86,7 +87,8 @@
             JobConf conf = ConfUtil.getJobConf(fileDesc.getInputFileFormatClass(), filePath);

             String[] locConstraints = ConfUtil.getNCs();

             Map<String, NodeControllerInfo> ncNameToNcInfos = ConfUtil.getNodeControllerInfo();

-            Scheduler scheduler = new Scheduler(ncNameToNcInfos);

+            ClusterTopology topology = ConfUtil.getClusterTopology();

+            Scheduler scheduler = new Scheduler(ncNameToNcInfos, topology);

             InputSplit[] splits = conf.getInputFormat().getSplits(conf, locConstraints.length);

             String[] schedule = scheduler.getLocationConstraints(splits);

             IOperatorDescriptor scanner = new HDFSReadOperatorDescriptor(jobSpec, recDescriptor, conf, splits,

diff --git a/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/operator/filewrite/HiveFileWritePushRuntime.java b/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/operator/filewrite/HiveFileWritePushRuntime.java
index 3a62b2e..81faf38 100644
--- a/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/operator/filewrite/HiveFileWritePushRuntime.java
+++ b/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/operator/filewrite/HiveFileWritePushRuntime.java
@@ -24,124 +24,130 @@
 @SuppressWarnings("deprecation")

 public class HiveFileWritePushRuntime implements IPushRuntime {

 

-    /**

-     * frame tuple accessor to access byte buffer

-     */

-    private final FrameTupleAccessor accessor;

+	/**

+	 * frame tuple accessor to access byte buffer

+	 */

+	private final FrameTupleAccessor accessor;

 

-    /**

-     * input object inspector

-     */

-    private final ObjectInspector inputInspector;

+	/**

+	 * input object inspector

+	 */

+	private final ObjectInspector inputInspector;

 

-    /**

-     * cachedInput

-     */

-    private final LazyColumnar cachedInput;

+	/**

+	 * cachedInput

+	 */

+	private final LazyColumnar cachedInput;

 

-    /**

-     * File sink operator of Hive

-     */

-    private final FileSinkDesc fileSink;

+	/**

+	 * File sink operator of Hive

+	 */

+	private final FileSinkDesc fileSink;

 

-    /**

-     * job configuration, which contain name node and other configuration

-     * information

-     */

-    private JobConf conf;

+	/**

+	 * job configuration, which contain name node and other configuration

+	 * information

+	 */

+	private JobConf conf;

 

-    /**

-     * input object inspector

-     */

-    private final Schema inputSchema;

+	/**

+	 * input object inspector

+	 */

+	private final Schema inputSchema;

 

-    /**

-     * a copy of hive schema representation

-     */

-    private RowSchema rowSchema;

+	/**

+	 * a copy of hive schema representation

+	 */

+	private RowSchema rowSchema;

 

-    /**

-     * the Hive file sink operator

-     */

-    private FileSinkOperator fsOp;

+	/**

+	 * the Hive file sink operator

+	 */

+	private FileSinkOperator fsOp;

 

-    /**

-     * cached tuple object reference

-     */

-    private FrameTupleReference tuple = new FrameTupleReference();

+	/**

+	 * cached tuple object reference

+	 */

+	private FrameTupleReference tuple = new FrameTupleReference();

 

-    /**

-     * @param spec

-     * @param fsProvider

-     */

-    public HiveFileWritePushRuntime(IHyracksTaskContext context, RecordDescriptor inputRecordDesc, JobConf job,

-            FileSinkDesc fs, RowSchema schema, Schema oi) {

-        fileSink = fs;

-        fileSink.setGatherStats(false);

+	/**

+	 * @param spec

+	 * @param fsProvider

+	 */

+	public HiveFileWritePushRuntime(IHyracksTaskContext context,

+			RecordDescriptor inputRecordDesc, JobConf job, FileSinkDesc fs,

+			RowSchema schema, Schema oi) {

+		fileSink = fs;

+		fileSink.setGatherStats(false);

 

-        rowSchema = schema;

-        conf = job;

-        inputSchema = oi;

+		rowSchema = schema;

+		conf = job;

+		inputSchema = oi;

 

-        accessor = new FrameTupleAccessor(context.getFrameSize(), inputRecordDesc);

-        inputInspector = inputSchema.toObjectInspector();

-        cachedInput = new LazyColumnar((LazyColumnarObjectInspector) inputInspector);

-    }

+		accessor = new FrameTupleAccessor(context.getFrameSize(),

+				inputRecordDesc);

+		inputInspector = inputSchema.toObjectInspector();

+		cachedInput = new LazyColumnar(

+				(LazyColumnarObjectInspector) inputInspector);

+	}

 

-    @Override

-    public void open() throws HyracksDataException {

-        fsOp = (FileSinkOperator) OperatorFactory.get(fileSink, rowSchema);

-        fsOp.setChildOperators(null);

-        fsOp.setParentOperators(null);

-        conf.setClassLoader(this.getClass().getClassLoader());

+	@Override

+	public void open() throws HyracksDataException {

+		fsOp = (FileSinkOperator) OperatorFactory.get(fileSink, rowSchema);

+		fsOp.setChildOperators(null);

+		fsOp.setParentOperators(null);

+		conf.setClassLoader(this.getClass().getClassLoader());

 

-        ObjectInspector[] inspectors = new ObjectInspector[1];

-        inspectors[0] = inputInspector;

-        try {

-            fsOp.initialize(conf, inspectors);

-            fsOp.setExecContext(null);

-        } catch (Exception e) {

-            e.printStackTrace();

-        }

-    }

+		ObjectInspector[] inspectors = new ObjectInspector[1];

+		inspectors[0] = inputInspector;

+		try {

+			fsOp.initialize(conf, inspectors);

+			fsOp.setExecContext(null);

+		} catch (Exception e) {

+			e.printStackTrace();

+		}

+	}

 

-    @Override

-    public void nextFrame(ByteBuffer buffer) throws HyracksDataException {

-        accessor.reset(buffer);

-        int n = accessor.getTupleCount();

-        try {

-            for (int i = 0; i < n; ++i) {

-                tuple.reset(accessor, i);

-                cachedInput.init(tuple);

-                fsOp.process(cachedInput, 0);

-            }

-        } catch (HiveException e) {

-            throw new HyracksDataException(e);

-        }

-    }

+	@Override

+	public void nextFrame(ByteBuffer buffer) throws HyracksDataException {

+		accessor.reset(buffer);

+		int n = accessor.getTupleCount();

+		try {

+			for (int i = 0; i < n; ++i) {

+				tuple.reset(accessor, i);

+				cachedInput.init(tuple);

+				fsOp.process(cachedInput, 0);

+			}

+		} catch (HiveException e) {

+			throw new HyracksDataException(e);

+		}

+	}

 

-    @Override

-    public void close() throws HyracksDataException {

-        try {

-            Thread.currentThread().setContextClassLoader(this.getClass().getClassLoader());

-            fsOp.closeOp(false);

-        } catch (HiveException e) {

-            throw new HyracksDataException(e);

-        }

-    }

+	@Override

+	public void close() throws HyracksDataException {

+		try {

+			Thread.currentThread().setContextClassLoader(

+					this.getClass().getClassLoader());

+			fsOp.closeOp(false);

+		} catch (HiveException e) {

+			throw new HyracksDataException(e);

+		}

+	}

 

-    @Override

-    public void setFrameWriter(int index, IFrameWriter writer, RecordDescriptor recordDesc) {

-        throw new IllegalStateException();

-    }

+	@Override

+	public void setFrameWriter(int index, IFrameWriter writer,

+			RecordDescriptor recordDesc) {

+		throw new IllegalStateException();

+	}

 

-    @Override

-    public void setInputRecordDescriptor(int index, RecordDescriptor recordDescriptor) {

-    }

+	@Override

+	public void setInputRecordDescriptor(int index,

+			RecordDescriptor recordDescriptor) {

+	}

 

-    @Override

-    public void fail() throws HyracksDataException {

+	@Override

+	public void fail() throws HyracksDataException {

 

-    }

+	}

 

 }