1. add a concurrent hash map patch for hive serde2; 2. minimize the hyracks cc history size

git-svn-id: https://hyracks.googlecode.com/svn/branches/fullstack_asterix_stabilization@3118 123451ca-8445-de46-9d55-352943316053
diff --git a/hivesterix/hivesterix-dist/src/main/java/org/apache/hadoop/hive/serde2/typeinfo/TypeInfoFactory.java b/hivesterix/hivesterix-dist/src/main/java/org/apache/hadoop/hive/serde2/typeinfo/TypeInfoFactory.java
new file mode 100644
index 0000000..95b999e
--- /dev/null
+++ b/hivesterix/hivesterix-dist/src/main/java/org/apache/hadoop/hive/serde2/typeinfo/TypeInfoFactory.java
@@ -0,0 +1,128 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.serde2.typeinfo;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.hadoop.hive.serde.Constants;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorUtils;
+
+/**
+ * TypeInfoFactory can be used to create the TypeInfo object for any types.
+ * TypeInfo objects are all read-only so we can reuse them easily.
+ * TypeInfoFactory has internal cache to make sure we don't create 2 TypeInfo
+ * objects that represents the same type.
+ */
+public final class TypeInfoFactory {
+
+    static ConcurrentHashMap<String, TypeInfo> cachedPrimitiveTypeInfo = new ConcurrentHashMap<String, TypeInfo>();
+
+    private TypeInfoFactory() {
+        // prevent instantiation
+    }
+
+    public static TypeInfo getPrimitiveTypeInfo(String typeName) {
+        if (null == PrimitiveObjectInspectorUtils.getTypeEntryFromTypeName(typeName)) {
+            throw new RuntimeException("Cannot getPrimitiveTypeInfo for " + typeName);
+        }
+        TypeInfo result = cachedPrimitiveTypeInfo.get(typeName);
+        if (result == null) {
+            result = new PrimitiveTypeInfo(typeName);
+            cachedPrimitiveTypeInfo.put(typeName, result);
+        }
+        return result;
+    }
+
+    public static final TypeInfo voidTypeInfo = getPrimitiveTypeInfo(Constants.VOID_TYPE_NAME);
+    public static final TypeInfo booleanTypeInfo = getPrimitiveTypeInfo(Constants.BOOLEAN_TYPE_NAME);
+    public static final TypeInfo intTypeInfo = getPrimitiveTypeInfo(Constants.INT_TYPE_NAME);
+    public static final TypeInfo longTypeInfo = getPrimitiveTypeInfo(Constants.BIGINT_TYPE_NAME);
+    public static final TypeInfo stringTypeInfo = getPrimitiveTypeInfo(Constants.STRING_TYPE_NAME);
+    public static final TypeInfo floatTypeInfo = getPrimitiveTypeInfo(Constants.FLOAT_TYPE_NAME);
+    public static final TypeInfo doubleTypeInfo = getPrimitiveTypeInfo(Constants.DOUBLE_TYPE_NAME);
+    public static final TypeInfo byteTypeInfo = getPrimitiveTypeInfo(Constants.TINYINT_TYPE_NAME);
+    public static final TypeInfo shortTypeInfo = getPrimitiveTypeInfo(Constants.SMALLINT_TYPE_NAME);
+
+    public static final TypeInfo unknownTypeInfo = getPrimitiveTypeInfo("unknown");
+
+    public static TypeInfo getPrimitiveTypeInfoFromPrimitiveWritable(Class<?> clazz) {
+        String typeName = PrimitiveObjectInspectorUtils.getTypeNameFromPrimitiveWritable(clazz);
+        if (typeName == null) {
+            throw new RuntimeException("Internal error: Cannot get typeName for " + clazz);
+        }
+        return getPrimitiveTypeInfo(typeName);
+    }
+
+    public static TypeInfo getPrimitiveTypeInfoFromJavaPrimitive(Class<?> clazz) {
+        return getPrimitiveTypeInfo(PrimitiveObjectInspectorUtils.getTypeNameFromPrimitiveJava(clazz));
+    }
+
+    static ConcurrentHashMap<ArrayList<List<?>>, TypeInfo> cachedStructTypeInfo = new ConcurrentHashMap<ArrayList<List<?>>, TypeInfo>();
+
+    public static TypeInfo getStructTypeInfo(List<String> names, List<TypeInfo> typeInfos) {
+        ArrayList<List<?>> signature = new ArrayList<List<?>>(2);
+        signature.add(names);
+        signature.add(typeInfos);
+        TypeInfo result = cachedStructTypeInfo.get(signature);
+        if (result == null) {
+            result = new StructTypeInfo(names, typeInfos);
+            cachedStructTypeInfo.put(signature, result);
+        }
+        return result;
+    }
+
+    static ConcurrentHashMap<List<?>, TypeInfo> cachedUnionTypeInfo = new ConcurrentHashMap<List<?>, TypeInfo>();
+
+    public static TypeInfo getUnionTypeInfo(List<TypeInfo> typeInfos) {
+        TypeInfo result = cachedUnionTypeInfo.get(typeInfos);
+        if (result == null) {
+            result = new UnionTypeInfo(typeInfos);
+            cachedUnionTypeInfo.put(typeInfos, result);
+        }
+        return result;
+    }
+
+    static ConcurrentHashMap<TypeInfo, TypeInfo> cachedListTypeInfo = new ConcurrentHashMap<TypeInfo, TypeInfo>();
+
+    public static TypeInfo getListTypeInfo(TypeInfo elementTypeInfo) {
+        TypeInfo result = cachedListTypeInfo.get(elementTypeInfo);
+        if (result == null) {
+            result = new ListTypeInfo(elementTypeInfo);
+            cachedListTypeInfo.put(elementTypeInfo, result);
+        }
+        return result;
+    }
+
+    static ConcurrentHashMap<ArrayList<TypeInfo>, TypeInfo> cachedMapTypeInfo = new ConcurrentHashMap<ArrayList<TypeInfo>, TypeInfo>();
+
+    public static TypeInfo getMapTypeInfo(TypeInfo keyTypeInfo, TypeInfo valueTypeInfo) {
+        ArrayList<TypeInfo> signature = new ArrayList<TypeInfo>(2);
+        signature.add(keyTypeInfo);
+        signature.add(valueTypeInfo);
+        TypeInfo result = cachedMapTypeInfo.get(signature);
+        if (result == null) {
+            result = new MapTypeInfo(keyTypeInfo, valueTypeInfo);
+            cachedMapTypeInfo.put(signature, result);
+        }
+        return result;
+    };
+
+}
\ No newline at end of file
diff --git a/hivesterix/hivesterix-dist/src/main/resources/scripts/startcc.sh b/hivesterix/hivesterix-dist/src/main/resources/scripts/startcc.sh
index fe2551d..efb79ce 100644
--- a/hivesterix/hivesterix-dist/src/main/resources/scripts/startcc.sh
+++ b/hivesterix/hivesterix-dist/src/main/resources/scripts/startcc.sh
@@ -22,4 +22,4 @@
 
 #Launch hyracks cc script
 chmod -R 755 $HYRACKS_HOME
-$HYRACKS_HOME/hyracks-server/target/appassembler/bin/hyrackscc -client-net-ip-address $CCHOST -cluster-net-ip-address $CCHOST -client-net-port $CC_CLIENTPORT -cluster-net-port $CC_CLUSTERPORT -max-heartbeat-lapse-periods 999999 -default-max-job-attempts 0 -job-history-size 3 &> $CCLOGS_DIR/cc.log &
+$HYRACKS_HOME/hyracks-server/target/appassembler/bin/hyrackscc -client-net-ip-address $CCHOST -cluster-net-ip-address $CCHOST -client-net-port $CC_CLIENTPORT -cluster-net-port $CC_CLUSTERPORT -max-heartbeat-lapse-periods 999999 -default-max-job-attempts 0 -job-history-size 0 &> $CCLOGS_DIR/cc.log &
diff --git a/hivesterix/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/operator/filewrite/HiveFileWritePushRuntime.java b/hivesterix/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/operator/filewrite/HiveFileWritePushRuntime.java
index 3a62b2e..81faf38 100644
--- a/hivesterix/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/operator/filewrite/HiveFileWritePushRuntime.java
+++ b/hivesterix/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/operator/filewrite/HiveFileWritePushRuntime.java
@@ -24,124 +24,130 @@
 @SuppressWarnings("deprecation")

 public class HiveFileWritePushRuntime implements IPushRuntime {

 

-    /**

-     * frame tuple accessor to access byte buffer

-     */

-    private final FrameTupleAccessor accessor;

+	/**

+	 * frame tuple accessor to access byte buffer

+	 */

+	private final FrameTupleAccessor accessor;

 

-    /**

-     * input object inspector

-     */

-    private final ObjectInspector inputInspector;

+	/**

+	 * input object inspector

+	 */

+	private final ObjectInspector inputInspector;

 

-    /**

-     * cachedInput

-     */

-    private final LazyColumnar cachedInput;

+	/**

+	 * cachedInput

+	 */

+	private final LazyColumnar cachedInput;

 

-    /**

-     * File sink operator of Hive

-     */

-    private final FileSinkDesc fileSink;

+	/**

+	 * File sink operator of Hive

+	 */

+	private final FileSinkDesc fileSink;

 

-    /**

-     * job configuration, which contain name node and other configuration

-     * information

-     */

-    private JobConf conf;

+	/**

+	 * job configuration, which contain name node and other configuration

+	 * information

+	 */

+	private JobConf conf;

 

-    /**

-     * input object inspector

-     */

-    private final Schema inputSchema;

+	/**

+	 * input object inspector

+	 */

+	private final Schema inputSchema;

 

-    /**

-     * a copy of hive schema representation

-     */

-    private RowSchema rowSchema;

+	/**

+	 * a copy of hive schema representation

+	 */

+	private RowSchema rowSchema;

 

-    /**

-     * the Hive file sink operator

-     */

-    private FileSinkOperator fsOp;

+	/**

+	 * the Hive file sink operator

+	 */

+	private FileSinkOperator fsOp;

 

-    /**

-     * cached tuple object reference

-     */

-    private FrameTupleReference tuple = new FrameTupleReference();

+	/**

+	 * cached tuple object reference

+	 */

+	private FrameTupleReference tuple = new FrameTupleReference();

 

-    /**

-     * @param spec

-     * @param fsProvider

-     */

-    public HiveFileWritePushRuntime(IHyracksTaskContext context, RecordDescriptor inputRecordDesc, JobConf job,

-            FileSinkDesc fs, RowSchema schema, Schema oi) {

-        fileSink = fs;

-        fileSink.setGatherStats(false);

+	/**

+	 * @param spec

+	 * @param fsProvider

+	 */

+	public HiveFileWritePushRuntime(IHyracksTaskContext context,

+			RecordDescriptor inputRecordDesc, JobConf job, FileSinkDesc fs,

+			RowSchema schema, Schema oi) {

+		fileSink = fs;

+		fileSink.setGatherStats(false);

 

-        rowSchema = schema;

-        conf = job;

-        inputSchema = oi;

+		rowSchema = schema;

+		conf = job;

+		inputSchema = oi;

 

-        accessor = new FrameTupleAccessor(context.getFrameSize(), inputRecordDesc);

-        inputInspector = inputSchema.toObjectInspector();

-        cachedInput = new LazyColumnar((LazyColumnarObjectInspector) inputInspector);

-    }

+		accessor = new FrameTupleAccessor(context.getFrameSize(),

+				inputRecordDesc);

+		inputInspector = inputSchema.toObjectInspector();

+		cachedInput = new LazyColumnar(

+				(LazyColumnarObjectInspector) inputInspector);

+	}

 

-    @Override

-    public void open() throws HyracksDataException {

-        fsOp = (FileSinkOperator) OperatorFactory.get(fileSink, rowSchema);

-        fsOp.setChildOperators(null);

-        fsOp.setParentOperators(null);

-        conf.setClassLoader(this.getClass().getClassLoader());

+	@Override

+	public void open() throws HyracksDataException {

+		fsOp = (FileSinkOperator) OperatorFactory.get(fileSink, rowSchema);

+		fsOp.setChildOperators(null);

+		fsOp.setParentOperators(null);

+		conf.setClassLoader(this.getClass().getClassLoader());

 

-        ObjectInspector[] inspectors = new ObjectInspector[1];

-        inspectors[0] = inputInspector;

-        try {

-            fsOp.initialize(conf, inspectors);

-            fsOp.setExecContext(null);

-        } catch (Exception e) {

-            e.printStackTrace();

-        }

-    }

+		ObjectInspector[] inspectors = new ObjectInspector[1];

+		inspectors[0] = inputInspector;

+		try {

+			fsOp.initialize(conf, inspectors);

+			fsOp.setExecContext(null);

+		} catch (Exception e) {

+			e.printStackTrace();

+		}

+	}

 

-    @Override

-    public void nextFrame(ByteBuffer buffer) throws HyracksDataException {

-        accessor.reset(buffer);

-        int n = accessor.getTupleCount();

-        try {

-            for (int i = 0; i < n; ++i) {

-                tuple.reset(accessor, i);

-                cachedInput.init(tuple);

-                fsOp.process(cachedInput, 0);

-            }

-        } catch (HiveException e) {

-            throw new HyracksDataException(e);

-        }

-    }

+	@Override

+	public void nextFrame(ByteBuffer buffer) throws HyracksDataException {

+		accessor.reset(buffer);

+		int n = accessor.getTupleCount();

+		try {

+			for (int i = 0; i < n; ++i) {

+				tuple.reset(accessor, i);

+				cachedInput.init(tuple);

+				fsOp.process(cachedInput, 0);

+			}

+		} catch (HiveException e) {

+			throw new HyracksDataException(e);

+		}

+	}

 

-    @Override

-    public void close() throws HyracksDataException {

-        try {

-            Thread.currentThread().setContextClassLoader(this.getClass().getClassLoader());

-            fsOp.closeOp(false);

-        } catch (HiveException e) {

-            throw new HyracksDataException(e);

-        }

-    }

+	@Override

+	public void close() throws HyracksDataException {

+		try {

+			Thread.currentThread().setContextClassLoader(

+					this.getClass().getClassLoader());

+			fsOp.closeOp(false);

+		} catch (HiveException e) {

+			throw new HyracksDataException(e);

+		}

+	}

 

-    @Override

-    public void setFrameWriter(int index, IFrameWriter writer, RecordDescriptor recordDesc) {

-        throw new IllegalStateException();

-    }

+	@Override

+	public void setFrameWriter(int index, IFrameWriter writer,

+			RecordDescriptor recordDesc) {

+		throw new IllegalStateException();

+	}

 

-    @Override

-    public void setInputRecordDescriptor(int index, RecordDescriptor recordDescriptor) {

-    }

+	@Override

+	public void setInputRecordDescriptor(int index,

+			RecordDescriptor recordDescriptor) {

+	}

 

-    @Override

-    public void fail() throws HyracksDataException {

+	@Override

+	public void fail() throws HyracksDataException {

 

-    }

+	}

 

 }

diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java
index eba5560..7d50fa0 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java
@@ -36,8 +36,8 @@
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 
 /**
- * The runtime of a SuperActivity, which internally executes a DAG of one-to-one connected
- * activities in a single thread.
+ * The runtime of a SuperActivity, which internally executes a DAG of one-to-one
+ * connected activities in a single thread.
  * 
  * @author yingyib
  */
@@ -62,7 +62,8 @@
         this.nPartitions = nPartitions;
 
         /**
-         * initialize the writer-relationship for the internal DAG of operator node pushables
+         * initialize the writer-relationship for the internal DAG of operator
+         * node pushables
          */
         try {
             init();
@@ -72,7 +73,7 @@
     }
 
     @Override
-    public synchronized void initialize() throws HyracksDataException {
+    public void initialize() throws HyracksDataException {
         /**
          * initialize operator node pushables in the BFS order
          */
@@ -105,7 +106,8 @@
         }
 
         /**
-         * Using BFS (breadth-first search) to construct to runtime execution DAG;
+         * Using BFS (breadth-first search) to construct to runtime execution
+         * DAG;
          */
         while (childQueue.size() > 0) {
             /**
@@ -165,8 +167,7 @@
     }
 
     @Override
-    public synchronized void setOutputFrameWriter(int clusterOutputIndex, IFrameWriter writer,
-            RecordDescriptor recordDesc) {
+    public void setOutputFrameWriter(int clusterOutputIndex, IFrameWriter writer, RecordDescriptor recordDesc) {
         /**
          * set the right output frame writer
          */
@@ -176,7 +177,7 @@
     }
 
     @Override
-    public synchronized IFrameWriter getInputFrameWriter(final int index) {
+    public IFrameWriter getInputFrameWriter(final int index) {
         /**
          * get the right IFrameWriter from the cluster input index
          */
diff --git a/hyracks/hyracks-dist/src/main/resources/bin/getip.sh b/hyracks/hyracks-dist/src/main/resources/bin/getip.sh
index e0cdf73..a691c0f 100755
--- a/hyracks/hyracks-dist/src/main/resources/bin/getip.sh
+++ b/hyracks/hyracks-dist/src/main/resources/bin/getip.sh
@@ -6,6 +6,10 @@
 then
         #Get IP Address
         IPADDR=`/sbin/ifconfig eth0 | grep "inet " | awk '{print $2}' | cut -f 2 -d ':'`
+    	if [ "$IPADDR" = "" ]
+        then
+		IPADDR=`/sbin/ifconfig em1 | grep "inet " | awk '{print $2}' | cut -f 2 -d ':'`
+        fi 
 	if [ "$IPADDR" = "" ]
         then
 		IPADDR=`/sbin/ifconfig lo | grep "inet " | awk '{print $2}' | cut -f 2 -d ':'`
diff --git a/hyracks/hyracks-dist/src/main/resources/bin/startcc.sh b/hyracks/hyracks-dist/src/main/resources/bin/startcc.sh
index fe2551d..efb79ce 100755
--- a/hyracks/hyracks-dist/src/main/resources/bin/startcc.sh
+++ b/hyracks/hyracks-dist/src/main/resources/bin/startcc.sh
@@ -22,4 +22,4 @@
 
 #Launch hyracks cc script
 chmod -R 755 $HYRACKS_HOME
-$HYRACKS_HOME/hyracks-server/target/appassembler/bin/hyrackscc -client-net-ip-address $CCHOST -cluster-net-ip-address $CCHOST -client-net-port $CC_CLIENTPORT -cluster-net-port $CC_CLUSTERPORT -max-heartbeat-lapse-periods 999999 -default-max-job-attempts 0 -job-history-size 3 &> $CCLOGS_DIR/cc.log &
+$HYRACKS_HOME/hyracks-server/target/appassembler/bin/hyrackscc -client-net-ip-address $CCHOST -cluster-net-ip-address $CCHOST -client-net-port $CC_CLIENTPORT -cluster-net-port $CC_CLUSTERPORT -max-heartbeat-lapse-periods 999999 -default-max-job-attempts 0 -job-history-size 0 &> $CCLOGS_DIR/cc.log &
diff --git a/pregelix/pregelix-core/src/main/resources/scripts/startcc.sh b/pregelix/pregelix-core/src/main/resources/scripts/startcc.sh
index fe2551d..efb79ce 100644
--- a/pregelix/pregelix-core/src/main/resources/scripts/startcc.sh
+++ b/pregelix/pregelix-core/src/main/resources/scripts/startcc.sh
@@ -22,4 +22,4 @@
 
 #Launch hyracks cc script
 chmod -R 755 $HYRACKS_HOME
-$HYRACKS_HOME/hyracks-server/target/appassembler/bin/hyrackscc -client-net-ip-address $CCHOST -cluster-net-ip-address $CCHOST -client-net-port $CC_CLIENTPORT -cluster-net-port $CC_CLUSTERPORT -max-heartbeat-lapse-periods 999999 -default-max-job-attempts 0 -job-history-size 3 &> $CCLOGS_DIR/cc.log &
+$HYRACKS_HOME/hyracks-server/target/appassembler/bin/hyrackscc -client-net-ip-address $CCHOST -cluster-net-ip-address $CCHOST -client-net-port $CC_CLIENTPORT -cluster-net-port $CC_CLUSTERPORT -max-heartbeat-lapse-periods 999999 -default-max-job-attempts 0 -job-history-size 0 &> $CCLOGS_DIR/cc.log &