svn merge -r3112:3163 https://hyracks.googlecode.com/svn/branches/fullstack_asterix_stabilization svn merge -r3173:3188 https://hyracks.googlecode.com/svn/branches/fullstack_release_candidate@3234 svn merge -r3209:3233 https://hyracks.googlecode.com/svn/trunk/fullstack

git-svn-id: https://hyracks.googlecode.com/svn/branches/fullstack_genomix@3237 123451ca-8445-de46-9d55-352943316053
diff --git a/hivesterix/hivesterix-dist/src/main/java/edu/uci/ics/hivesterix/runtime/exec/HyracksExecutionEngine.java b/hivesterix/hivesterix-dist/src/main/java/edu/uci/ics/hivesterix/runtime/exec/HyracksExecutionEngine.java
index e6f47cf..379737f 100644
--- a/hivesterix/hivesterix-dist/src/main/java/edu/uci/ics/hivesterix/runtime/exec/HyracksExecutionEngine.java
+++ b/hivesterix/hivesterix-dist/src/main/java/edu/uci/ics/hivesterix/runtime/exec/HyracksExecutionEngine.java
@@ -1,12 +1,12 @@
 package edu.uci.ics.hivesterix.runtime.exec;

 

 import java.io.BufferedReader;

-import java.io.DataInputStream;

 import java.io.FileInputStream;

 import java.io.InputStream;

 import java.io.InputStreamReader;

 import java.io.PrintWriter;

 import java.io.Serializable;

+import java.net.InetAddress;

 import java.util.ArrayList;

 import java.util.HashMap;

 import java.util.Iterator;

@@ -80,6 +80,7 @@
 

     private static final Log LOG = LogFactory.getLog(HyracksExecutionEngine.class.getName());

     private static final String clusterPropertiesPath = "conf/cluster.properties";

+    private static final String masterFilePath = "conf/master";

 

     private static List<Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>> DEFAULT_LOGICAL_REWRITES = new ArrayList<Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>>();

     private static List<Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>> DEFAULT_PHYSICAL_REWRITES = new ArrayList<Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>>();

@@ -151,6 +152,11 @@
      */

     private Properties clusterProps;

 

+    /**

+     * the Hyracks client connection

+     */

+    private IHyracksClientConnection hcc;

+

     public HyracksExecutionEngine(HiveConf conf) {

         this.conf = conf;

         init(conf);

@@ -533,15 +539,26 @@
             confIn.close();

         }

 

-        Process process = Runtime.getRuntime().exec("src/main/resources/scripts/getip.sh");

-        BufferedReader ipReader = new BufferedReader(new InputStreamReader(

-                new DataInputStream(process.getInputStream())));

-        String ipAddress = ipReader.readLine();

-        ipReader.close();

-        int port = Integer.parseInt(clusterProps.getProperty("CC_CLIENTPORT"));

-        String applicationName = "hivesterix";

+        if (hcc == null) {

+            BufferedReader ipReader = new BufferedReader(new InputStreamReader(new FileInputStream(masterFilePath)));

+            String masterNode = ipReader.readLine();

+            ipReader.close();

 

-        IHyracksClientConnection hcc = new HyracksConnection(ipAddress, port);

+            InetAddress[] ips = InetAddress.getAllByName(masterNode);

+            int port = Integer.parseInt(clusterProps.getProperty("CC_CLIENTPORT"));

+            for (InetAddress ip : ips) {

+                if (ip.getAddress().length <= 4) {

+                    try {

+                        hcc = new HyracksConnection(ip.getHostAddress(), port);

+                        break;

+                    } catch (Exception e) {

+                        continue;

+                    }

+                }

+            }

+        }

+

+        String applicationName = "hivesterix";

         long start = System.currentTimeMillis();

         JobId jobId = hcc.startJob(applicationName, job);

         hcc.waitForCompletion(jobId);

diff --git a/hivesterix/hivesterix-dist/src/main/java/org/apache/hadoop/hive/serde2/lazy/objectinspector/LazyObjectInspectorFactory.java b/hivesterix/hivesterix-dist/src/main/java/org/apache/hadoop/hive/serde2/lazy/objectinspector/LazyObjectInspectorFactory.java
new file mode 100644
index 0000000..7920001
--- /dev/null
+++ b/hivesterix/hivesterix-dist/src/main/java/org/apache/hadoop/hive/serde2/lazy/objectinspector/LazyObjectInspectorFactory.java
@@ -0,0 +1,123 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.serde2.lazy.objectinspector;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.io.Text;
+
+/**
+ * ObjectInspectorFactory is the primary way to create new ObjectInspector
+ * instances.
+ * SerDe classes should call the static functions in this library to create an
+ * ObjectInspector to return to the caller of SerDe2.getObjectInspector().
+ * The reason of having caches here is that ObjectInspectors do not have an
+ * internal state - so ObjectInspectors with the same construction parameters
+ * should result in exactly the same ObjectInspector.
+ */
+public final class LazyObjectInspectorFactory {
+
+    static ConcurrentHashMap<ArrayList<Object>, LazySimpleStructObjectInspector> cachedLazySimpleStructObjectInspector = new ConcurrentHashMap<ArrayList<Object>, LazySimpleStructObjectInspector>();
+
+    public static LazySimpleStructObjectInspector getLazySimpleStructObjectInspector(List<String> structFieldNames,
+            List<ObjectInspector> structFieldObjectInspectors, byte separator, Text nullSequence,
+            boolean lastColumnTakesRest, boolean escaped, byte escapeChar) {
+        ArrayList<Object> signature = new ArrayList<Object>();
+        signature.add(structFieldNames);
+        signature.add(structFieldObjectInspectors);
+        signature.add(Byte.valueOf(separator));
+        signature.add(nullSequence.toString());
+        signature.add(Boolean.valueOf(lastColumnTakesRest));
+        signature.add(Boolean.valueOf(escaped));
+        signature.add(Byte.valueOf(escapeChar));
+        LazySimpleStructObjectInspector result = cachedLazySimpleStructObjectInspector.get(signature);
+        if (result == null) {
+            result = new LazySimpleStructObjectInspector(structFieldNames, structFieldObjectInspectors, separator,
+                    nullSequence, lastColumnTakesRest, escaped, escapeChar);
+            cachedLazySimpleStructObjectInspector.put(signature, result);
+        }
+        return result;
+    }
+
+    static ConcurrentHashMap<ArrayList<Object>, LazyListObjectInspector> cachedLazySimpleListObjectInspector = new ConcurrentHashMap<ArrayList<Object>, LazyListObjectInspector>();
+
+    public static LazyListObjectInspector getLazySimpleListObjectInspector(ObjectInspector listElementObjectInspector,
+            byte separator, Text nullSequence, boolean escaped, byte escapeChar) {
+        ArrayList<Object> signature = new ArrayList<Object>();
+        signature.add(listElementObjectInspector);
+        signature.add(Byte.valueOf(separator));
+        signature.add(nullSequence.toString());
+        signature.add(Boolean.valueOf(escaped));
+        signature.add(Byte.valueOf(escapeChar));
+        LazyListObjectInspector result = cachedLazySimpleListObjectInspector.get(signature);
+        if (result == null) {
+            result = new LazyListObjectInspector(listElementObjectInspector, separator, nullSequence, escaped,
+                    escapeChar);
+            cachedLazySimpleListObjectInspector.put(signature, result);
+        }
+        return result;
+    }
+
+    static ConcurrentHashMap<ArrayList<Object>, LazyMapObjectInspector> cachedLazySimpleMapObjectInspector = new ConcurrentHashMap<ArrayList<Object>, LazyMapObjectInspector>();
+
+    public static LazyMapObjectInspector getLazySimpleMapObjectInspector(ObjectInspector mapKeyObjectInspector,
+            ObjectInspector mapValueObjectInspector, byte itemSeparator, byte keyValueSeparator, Text nullSequence,
+            boolean escaped, byte escapeChar) {
+        ArrayList<Object> signature = new ArrayList<Object>();
+        signature.add(mapKeyObjectInspector);
+        signature.add(mapValueObjectInspector);
+        signature.add(Byte.valueOf(itemSeparator));
+        signature.add(Byte.valueOf(keyValueSeparator));
+        signature.add(nullSequence.toString());
+        signature.add(Boolean.valueOf(escaped));
+        signature.add(Byte.valueOf(escapeChar));
+        LazyMapObjectInspector result = cachedLazySimpleMapObjectInspector.get(signature);
+        if (result == null) {
+            result = new LazyMapObjectInspector(mapKeyObjectInspector, mapValueObjectInspector, itemSeparator,
+                    keyValueSeparator, nullSequence, escaped, escapeChar);
+            cachedLazySimpleMapObjectInspector.put(signature, result);
+        }
+        return result;
+    }
+
+    static ConcurrentHashMap<List<Object>, LazyUnionObjectInspector> cachedLazyUnionObjectInspector = new ConcurrentHashMap<List<Object>, LazyUnionObjectInspector>();
+
+    public static LazyUnionObjectInspector getLazyUnionObjectInspector(List<ObjectInspector> ois, byte separator,
+            Text nullSequence, boolean escaped, byte escapeChar) {
+        List<Object> signature = new ArrayList<Object>();
+        signature.add(ois);
+        signature.add(Byte.valueOf(separator));
+        signature.add(nullSequence.toString());
+        signature.add(Boolean.valueOf(escaped));
+        signature.add(Byte.valueOf(escapeChar));
+        LazyUnionObjectInspector result = cachedLazyUnionObjectInspector.get(signature);
+        if (result == null) {
+            result = new LazyUnionObjectInspector(ois, separator, nullSequence, escaped, escapeChar);
+            cachedLazyUnionObjectInspector.put(signature, result);
+        }
+        return result;
+    }
+
+    private LazyObjectInspectorFactory() {
+        // prevent instantiation
+    }
+}
\ No newline at end of file
diff --git a/hivesterix/hivesterix-dist/src/main/java/org/apache/hadoop/hive/serde2/typeinfo/TypeInfoFactory.java b/hivesterix/hivesterix-dist/src/main/java/org/apache/hadoop/hive/serde2/typeinfo/TypeInfoFactory.java
new file mode 100644
index 0000000..95b999e
--- /dev/null
+++ b/hivesterix/hivesterix-dist/src/main/java/org/apache/hadoop/hive/serde2/typeinfo/TypeInfoFactory.java
@@ -0,0 +1,128 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.serde2.typeinfo;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.hadoop.hive.serde.Constants;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorUtils;
+
+/**
+ * TypeInfoFactory can be used to create the TypeInfo object for any types.
+ * TypeInfo objects are all read-only so we can reuse them easily.
+ * TypeInfoFactory has internal cache to make sure we don't create 2 TypeInfo
+ * objects that represents the same type.
+ */
+public final class TypeInfoFactory {
+
+    static ConcurrentHashMap<String, TypeInfo> cachedPrimitiveTypeInfo = new ConcurrentHashMap<String, TypeInfo>();
+
+    private TypeInfoFactory() {
+        // prevent instantiation
+    }
+
+    public static TypeInfo getPrimitiveTypeInfo(String typeName) {
+        if (null == PrimitiveObjectInspectorUtils.getTypeEntryFromTypeName(typeName)) {
+            throw new RuntimeException("Cannot getPrimitiveTypeInfo for " + typeName);
+        }
+        TypeInfo result = cachedPrimitiveTypeInfo.get(typeName);
+        if (result == null) {
+            result = new PrimitiveTypeInfo(typeName);
+            cachedPrimitiveTypeInfo.put(typeName, result);
+        }
+        return result;
+    }
+
+    public static final TypeInfo voidTypeInfo = getPrimitiveTypeInfo(Constants.VOID_TYPE_NAME);
+    public static final TypeInfo booleanTypeInfo = getPrimitiveTypeInfo(Constants.BOOLEAN_TYPE_NAME);
+    public static final TypeInfo intTypeInfo = getPrimitiveTypeInfo(Constants.INT_TYPE_NAME);
+    public static final TypeInfo longTypeInfo = getPrimitiveTypeInfo(Constants.BIGINT_TYPE_NAME);
+    public static final TypeInfo stringTypeInfo = getPrimitiveTypeInfo(Constants.STRING_TYPE_NAME);
+    public static final TypeInfo floatTypeInfo = getPrimitiveTypeInfo(Constants.FLOAT_TYPE_NAME);
+    public static final TypeInfo doubleTypeInfo = getPrimitiveTypeInfo(Constants.DOUBLE_TYPE_NAME);
+    public static final TypeInfo byteTypeInfo = getPrimitiveTypeInfo(Constants.TINYINT_TYPE_NAME);
+    public static final TypeInfo shortTypeInfo = getPrimitiveTypeInfo(Constants.SMALLINT_TYPE_NAME);
+
+    public static final TypeInfo unknownTypeInfo = getPrimitiveTypeInfo("unknown");
+
+    public static TypeInfo getPrimitiveTypeInfoFromPrimitiveWritable(Class<?> clazz) {
+        String typeName = PrimitiveObjectInspectorUtils.getTypeNameFromPrimitiveWritable(clazz);
+        if (typeName == null) {
+            throw new RuntimeException("Internal error: Cannot get typeName for " + clazz);
+        }
+        return getPrimitiveTypeInfo(typeName);
+    }
+
+    public static TypeInfo getPrimitiveTypeInfoFromJavaPrimitive(Class<?> clazz) {
+        return getPrimitiveTypeInfo(PrimitiveObjectInspectorUtils.getTypeNameFromPrimitiveJava(clazz));
+    }
+
+    static ConcurrentHashMap<ArrayList<List<?>>, TypeInfo> cachedStructTypeInfo = new ConcurrentHashMap<ArrayList<List<?>>, TypeInfo>();
+
+    public static TypeInfo getStructTypeInfo(List<String> names, List<TypeInfo> typeInfos) {
+        ArrayList<List<?>> signature = new ArrayList<List<?>>(2);
+        signature.add(names);
+        signature.add(typeInfos);
+        TypeInfo result = cachedStructTypeInfo.get(signature);
+        if (result == null) {
+            result = new StructTypeInfo(names, typeInfos);
+            cachedStructTypeInfo.put(signature, result);
+        }
+        return result;
+    }
+
+    static ConcurrentHashMap<List<?>, TypeInfo> cachedUnionTypeInfo = new ConcurrentHashMap<List<?>, TypeInfo>();
+
+    public static TypeInfo getUnionTypeInfo(List<TypeInfo> typeInfos) {
+        TypeInfo result = cachedUnionTypeInfo.get(typeInfos);
+        if (result == null) {
+            result = new UnionTypeInfo(typeInfos);
+            cachedUnionTypeInfo.put(typeInfos, result);
+        }
+        return result;
+    }
+
+    static ConcurrentHashMap<TypeInfo, TypeInfo> cachedListTypeInfo = new ConcurrentHashMap<TypeInfo, TypeInfo>();
+
+    public static TypeInfo getListTypeInfo(TypeInfo elementTypeInfo) {
+        TypeInfo result = cachedListTypeInfo.get(elementTypeInfo);
+        if (result == null) {
+            result = new ListTypeInfo(elementTypeInfo);
+            cachedListTypeInfo.put(elementTypeInfo, result);
+        }
+        return result;
+    }
+
+    static ConcurrentHashMap<ArrayList<TypeInfo>, TypeInfo> cachedMapTypeInfo = new ConcurrentHashMap<ArrayList<TypeInfo>, TypeInfo>();
+
+    public static TypeInfo getMapTypeInfo(TypeInfo keyTypeInfo, TypeInfo valueTypeInfo) {
+        ArrayList<TypeInfo> signature = new ArrayList<TypeInfo>(2);
+        signature.add(keyTypeInfo);
+        signature.add(valueTypeInfo);
+        TypeInfo result = cachedMapTypeInfo.get(signature);
+        if (result == null) {
+            result = new MapTypeInfo(keyTypeInfo, valueTypeInfo);
+            cachedMapTypeInfo.put(signature, result);
+        }
+        return result;
+    };
+
+}
\ No newline at end of file
diff --git a/hivesterix/hivesterix-dist/src/main/resources/conf/topology-template.xml b/hivesterix/hivesterix-dist/src/main/resources/conf/topology-template.xml
new file mode 100755
index 0000000..4710706
--- /dev/null
+++ b/hivesterix/hivesterix-dist/src/main/resources/conf/topology-template.xml
@@ -0,0 +1,7 @@
+<cluster-topology>
+    <network-switch name="Global">
+        <network-switch name="local">
+            <terminal name="127.0.0.1"/>
+        </network-switch>
+    </network-switch>
+</cluster-topology>
\ No newline at end of file
diff --git a/hivesterix/hivesterix-dist/src/main/resources/scripts/getip.sh b/hivesterix/hivesterix-dist/src/main/resources/scripts/getip.sh
index e0cdf73..8c9ae76 100755
--- a/hivesterix/hivesterix-dist/src/main/resources/scripts/getip.sh
+++ b/hivesterix/hivesterix-dist/src/main/resources/scripts/getip.sh
@@ -8,6 +8,10 @@
         IPADDR=`/sbin/ifconfig eth0 | grep "inet " | awk '{print $2}' | cut -f 2 -d ':'`
 	if [ "$IPADDR" = "" ]
         then
+                IPADDR=`/sbin/ifconfig em1 | grep "inet " | awk '{print $2}' | cut -f 2 -d ':'`
+        fi
+	if [ "$IPADDR" = "" ]
+        then
 		IPADDR=`/sbin/ifconfig lo | grep "inet " | awk '{print $2}' | cut -f 2 -d ':'`
         fi 
 else
diff --git a/hivesterix/hivesterix-dist/src/main/resources/scripts/startcc.sh b/hivesterix/hivesterix-dist/src/main/resources/scripts/startcc.sh
index fe2551d..484ecac 100644
--- a/hivesterix/hivesterix-dist/src/main/resources/scripts/startcc.sh
+++ b/hivesterix/hivesterix-dist/src/main/resources/scripts/startcc.sh
@@ -22,4 +22,10 @@
 
 #Launch hyracks cc script
 chmod -R 755 $HYRACKS_HOME
-$HYRACKS_HOME/hyracks-server/target/appassembler/bin/hyrackscc -client-net-ip-address $CCHOST -cluster-net-ip-address $CCHOST -client-net-port $CC_CLIENTPORT -cluster-net-port $CC_CLUSTERPORT -max-heartbeat-lapse-periods 999999 -default-max-job-attempts 0 -job-history-size 3 &> $CCLOGS_DIR/cc.log &
+if [ -f "conf/topology.xml"  ]; then
+#Launch hyracks cc script with topology
+$HYRACKS_HOME/hyracks-server/target/appassembler/bin/hyrackscc -client-net-ip-address $CCHOST -cluster-net-ip-address $CCHOST -client-net-port $CC_CLIENTPORT -cluster-net-port $CC_CLUSTERPORT -max-heartbeat-lapse-periods 999999 -default-max-job-attempts 0 -job-history-size 0 -cluster-topology "conf/topology.xml" &> $CCLOGS_DIR/cc.log &
+else
+#Launch hyracks cc script without toplogy
+$HYRACKS_HOME/hyracks-server/target/appassembler/bin/hyrackscc -client-net-ip-address $CCHOST -cluster-net-ip-address $CCHOST -client-net-port $CC_CLIENTPORT -cluster-net-port $CC_CLUSTERPORT -max-heartbeat-lapse-periods 999999 -default-max-job-attempts 0 -job-history-size 0 &> $CCLOGS_DIR/cc.log &
+fi
diff --git a/hivesterix/hivesterix-dist/src/main/resources/scripts/startnc.sh b/hivesterix/hivesterix-dist/src/main/resources/scripts/startnc.sh
index 6e0f90e..23a4c36 100644
--- a/hivesterix/hivesterix-dist/src/main/resources/scripts/startnc.sh
+++ b/hivesterix/hivesterix-dist/src/main/resources/scripts/startnc.sh
@@ -46,4 +46,4 @@
 cd $NCTMP_DIR
 
 #Launch hyracks nc
-$HYRACKS_HOME/hyracks-server/target/appassembler/bin/hyracksnc -cc-host $CCHOST -cc-port $CC_CLUSTERPORT -cluster-net-ip-address $IPADDR  -data-ip-address $IPADDR -node-id $NODEID -iodevices "${IO_DIRS}" &> $NCLOGS_DIR/$NODEID.log &
+$HYRACKS_HOME/hyracks-server/target/appassembler/bin/hyracksnc -cc-host $CCHOST -cc-port $CC_CLUSTERPORT -cluster-net-ip-address $IPADDR  -data-ip-address $IPADDR -result-ip-address $IPADDR -node-id $NODEID -iodevices "${IO_DIRS}" &> $NCLOGS_DIR/$NODEID.log &
diff --git a/hivesterix/hivesterix-dist/src/main/resources/scripts/stopnc.sh b/hivesterix/hivesterix-dist/src/main/resources/scripts/stopnc.sh
index 03ce4e7..35c4794 100644
--- a/hivesterix/hivesterix-dist/src/main/resources/scripts/stopnc.sh
+++ b/hivesterix/hivesterix-dist/src/main/resources/scripts/stopnc.sh
@@ -5,6 +5,10 @@
 PID=`ps -ef|grep ${USER}|grep java|grep 'Dapp.name=hyracksnc'|awk '{print $2}'`
 
 if [ "$PID" == "" ]; then
+  PID=`ps -ef|grep ${USER}|grep java|grep 'hyracks'|awk '{print $2}'`
+fi
+
+if [ "$PID" == "" ]; then
   USERID=`id | sed 's/^uid=//;s/(.*$//'`
   PID=`ps -ef|grep ${USERID}|grep java|grep 'Dapp.name=hyracksnc'|awk '{print $2}'`
 fi