1. add a concurrent hash map patch for hive serde2; 2. minimize the hyracks cc history size
git-svn-id: https://hyracks.googlecode.com/svn/branches/fullstack_asterix_stabilization@3118 123451ca-8445-de46-9d55-352943316053
diff --git a/hivesterix/hivesterix-dist/src/main/java/org/apache/hadoop/hive/serde2/typeinfo/TypeInfoFactory.java b/hivesterix/hivesterix-dist/src/main/java/org/apache/hadoop/hive/serde2/typeinfo/TypeInfoFactory.java
new file mode 100644
index 0000000..95b999e
--- /dev/null
+++ b/hivesterix/hivesterix-dist/src/main/java/org/apache/hadoop/hive/serde2/typeinfo/TypeInfoFactory.java
@@ -0,0 +1,128 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.serde2.typeinfo;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.hadoop.hive.serde.Constants;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorUtils;
+
+/**
+ * TypeInfoFactory can be used to create the TypeInfo object for any types.
+ * TypeInfo objects are all read-only so we can reuse them easily.
+ * TypeInfoFactory has internal cache to make sure we don't create 2 TypeInfo
+ * objects that represents the same type.
+ */
+public final class TypeInfoFactory {
+
+ static ConcurrentHashMap<String, TypeInfo> cachedPrimitiveTypeInfo = new ConcurrentHashMap<String, TypeInfo>();
+
+ private TypeInfoFactory() {
+ // prevent instantiation
+ }
+
+ public static TypeInfo getPrimitiveTypeInfo(String typeName) {
+ if (null == PrimitiveObjectInspectorUtils.getTypeEntryFromTypeName(typeName)) {
+ throw new RuntimeException("Cannot getPrimitiveTypeInfo for " + typeName);
+ }
+ TypeInfo result = cachedPrimitiveTypeInfo.get(typeName);
+ if (result == null) {
+ result = new PrimitiveTypeInfo(typeName);
+ cachedPrimitiveTypeInfo.put(typeName, result);
+ }
+ return result;
+ }
+
+ public static final TypeInfo voidTypeInfo = getPrimitiveTypeInfo(Constants.VOID_TYPE_NAME);
+ public static final TypeInfo booleanTypeInfo = getPrimitiveTypeInfo(Constants.BOOLEAN_TYPE_NAME);
+ public static final TypeInfo intTypeInfo = getPrimitiveTypeInfo(Constants.INT_TYPE_NAME);
+ public static final TypeInfo longTypeInfo = getPrimitiveTypeInfo(Constants.BIGINT_TYPE_NAME);
+ public static final TypeInfo stringTypeInfo = getPrimitiveTypeInfo(Constants.STRING_TYPE_NAME);
+ public static final TypeInfo floatTypeInfo = getPrimitiveTypeInfo(Constants.FLOAT_TYPE_NAME);
+ public static final TypeInfo doubleTypeInfo = getPrimitiveTypeInfo(Constants.DOUBLE_TYPE_NAME);
+ public static final TypeInfo byteTypeInfo = getPrimitiveTypeInfo(Constants.TINYINT_TYPE_NAME);
+ public static final TypeInfo shortTypeInfo = getPrimitiveTypeInfo(Constants.SMALLINT_TYPE_NAME);
+
+ public static final TypeInfo unknownTypeInfo = getPrimitiveTypeInfo("unknown");
+
+ public static TypeInfo getPrimitiveTypeInfoFromPrimitiveWritable(Class<?> clazz) {
+ String typeName = PrimitiveObjectInspectorUtils.getTypeNameFromPrimitiveWritable(clazz);
+ if (typeName == null) {
+ throw new RuntimeException("Internal error: Cannot get typeName for " + clazz);
+ }
+ return getPrimitiveTypeInfo(typeName);
+ }
+
+ public static TypeInfo getPrimitiveTypeInfoFromJavaPrimitive(Class<?> clazz) {
+ return getPrimitiveTypeInfo(PrimitiveObjectInspectorUtils.getTypeNameFromPrimitiveJava(clazz));
+ }
+
+ static ConcurrentHashMap<ArrayList<List<?>>, TypeInfo> cachedStructTypeInfo = new ConcurrentHashMap<ArrayList<List<?>>, TypeInfo>();
+
+ public static TypeInfo getStructTypeInfo(List<String> names, List<TypeInfo> typeInfos) {
+ ArrayList<List<?>> signature = new ArrayList<List<?>>(2);
+ signature.add(names);
+ signature.add(typeInfos);
+ TypeInfo result = cachedStructTypeInfo.get(signature);
+ if (result == null) {
+ result = new StructTypeInfo(names, typeInfos);
+ cachedStructTypeInfo.put(signature, result);
+ }
+ return result;
+ }
+
+ static ConcurrentHashMap<List<?>, TypeInfo> cachedUnionTypeInfo = new ConcurrentHashMap<List<?>, TypeInfo>();
+
+ public static TypeInfo getUnionTypeInfo(List<TypeInfo> typeInfos) {
+ TypeInfo result = cachedUnionTypeInfo.get(typeInfos);
+ if (result == null) {
+ result = new UnionTypeInfo(typeInfos);
+ cachedUnionTypeInfo.put(typeInfos, result);
+ }
+ return result;
+ }
+
+ static ConcurrentHashMap<TypeInfo, TypeInfo> cachedListTypeInfo = new ConcurrentHashMap<TypeInfo, TypeInfo>();
+
+ public static TypeInfo getListTypeInfo(TypeInfo elementTypeInfo) {
+ TypeInfo result = cachedListTypeInfo.get(elementTypeInfo);
+ if (result == null) {
+ result = new ListTypeInfo(elementTypeInfo);
+ cachedListTypeInfo.put(elementTypeInfo, result);
+ }
+ return result;
+ }
+
+ static ConcurrentHashMap<ArrayList<TypeInfo>, TypeInfo> cachedMapTypeInfo = new ConcurrentHashMap<ArrayList<TypeInfo>, TypeInfo>();
+
+ public static TypeInfo getMapTypeInfo(TypeInfo keyTypeInfo, TypeInfo valueTypeInfo) {
+ ArrayList<TypeInfo> signature = new ArrayList<TypeInfo>(2);
+ signature.add(keyTypeInfo);
+ signature.add(valueTypeInfo);
+ TypeInfo result = cachedMapTypeInfo.get(signature);
+ if (result == null) {
+ result = new MapTypeInfo(keyTypeInfo, valueTypeInfo);
+ cachedMapTypeInfo.put(signature, result);
+ }
+ return result;
+ };
+
+}
\ No newline at end of file
diff --git a/hivesterix/hivesterix-dist/src/main/resources/scripts/startcc.sh b/hivesterix/hivesterix-dist/src/main/resources/scripts/startcc.sh
index fe2551d..efb79ce 100644
--- a/hivesterix/hivesterix-dist/src/main/resources/scripts/startcc.sh
+++ b/hivesterix/hivesterix-dist/src/main/resources/scripts/startcc.sh
@@ -22,4 +22,4 @@
#Launch hyracks cc script
chmod -R 755 $HYRACKS_HOME
-$HYRACKS_HOME/hyracks-server/target/appassembler/bin/hyrackscc -client-net-ip-address $CCHOST -cluster-net-ip-address $CCHOST -client-net-port $CC_CLIENTPORT -cluster-net-port $CC_CLUSTERPORT -max-heartbeat-lapse-periods 999999 -default-max-job-attempts 0 -job-history-size 3 &> $CCLOGS_DIR/cc.log &
+$HYRACKS_HOME/hyracks-server/target/appassembler/bin/hyrackscc -client-net-ip-address $CCHOST -cluster-net-ip-address $CCHOST -client-net-port $CC_CLIENTPORT -cluster-net-port $CC_CLUSTERPORT -max-heartbeat-lapse-periods 999999 -default-max-job-attempts 0 -job-history-size 0 &> $CCLOGS_DIR/cc.log &
diff --git a/hivesterix/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/operator/filewrite/HiveFileWritePushRuntime.java b/hivesterix/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/operator/filewrite/HiveFileWritePushRuntime.java
index 3a62b2e..81faf38 100644
--- a/hivesterix/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/operator/filewrite/HiveFileWritePushRuntime.java
+++ b/hivesterix/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/operator/filewrite/HiveFileWritePushRuntime.java
@@ -24,124 +24,130 @@
@SuppressWarnings("deprecation")
public class HiveFileWritePushRuntime implements IPushRuntime {
- /**
- * frame tuple accessor to access byte buffer
- */
- private final FrameTupleAccessor accessor;
+ /**
+ * frame tuple accessor to access byte buffer
+ */
+ private final FrameTupleAccessor accessor;
- /**
- * input object inspector
- */
- private final ObjectInspector inputInspector;
+ /**
+ * input object inspector
+ */
+ private final ObjectInspector inputInspector;
- /**
- * cachedInput
- */
- private final LazyColumnar cachedInput;
+ /**
+ * cachedInput
+ */
+ private final LazyColumnar cachedInput;
- /**
- * File sink operator of Hive
- */
- private final FileSinkDesc fileSink;
+ /**
+ * File sink operator of Hive
+ */
+ private final FileSinkDesc fileSink;
- /**
- * job configuration, which contain name node and other configuration
- * information
- */
- private JobConf conf;
+ /**
+ * job configuration, which contain name node and other configuration
+ * information
+ */
+ private JobConf conf;
- /**
- * input object inspector
- */
- private final Schema inputSchema;
+ /**
+ * input object inspector
+ */
+ private final Schema inputSchema;
- /**
- * a copy of hive schema representation
- */
- private RowSchema rowSchema;
+ /**
+ * a copy of hive schema representation
+ */
+ private RowSchema rowSchema;
- /**
- * the Hive file sink operator
- */
- private FileSinkOperator fsOp;
+ /**
+ * the Hive file sink operator
+ */
+ private FileSinkOperator fsOp;
- /**
- * cached tuple object reference
- */
- private FrameTupleReference tuple = new FrameTupleReference();
+ /**
+ * cached tuple object reference
+ */
+ private FrameTupleReference tuple = new FrameTupleReference();
- /**
- * @param spec
- * @param fsProvider
- */
- public HiveFileWritePushRuntime(IHyracksTaskContext context, RecordDescriptor inputRecordDesc, JobConf job,
- FileSinkDesc fs, RowSchema schema, Schema oi) {
- fileSink = fs;
- fileSink.setGatherStats(false);
+ /**
+ * @param spec
+ * @param fsProvider
+ */
+ public HiveFileWritePushRuntime(IHyracksTaskContext context,
+ RecordDescriptor inputRecordDesc, JobConf job, FileSinkDesc fs,
+ RowSchema schema, Schema oi) {
+ fileSink = fs;
+ fileSink.setGatherStats(false);
- rowSchema = schema;
- conf = job;
- inputSchema = oi;
+ rowSchema = schema;
+ conf = job;
+ inputSchema = oi;
- accessor = new FrameTupleAccessor(context.getFrameSize(), inputRecordDesc);
- inputInspector = inputSchema.toObjectInspector();
- cachedInput = new LazyColumnar((LazyColumnarObjectInspector) inputInspector);
- }
+ accessor = new FrameTupleAccessor(context.getFrameSize(),
+ inputRecordDesc);
+ inputInspector = inputSchema.toObjectInspector();
+ cachedInput = new LazyColumnar(
+ (LazyColumnarObjectInspector) inputInspector);
+ }
- @Override
- public void open() throws HyracksDataException {
- fsOp = (FileSinkOperator) OperatorFactory.get(fileSink, rowSchema);
- fsOp.setChildOperators(null);
- fsOp.setParentOperators(null);
- conf.setClassLoader(this.getClass().getClassLoader());
+ @Override
+ public void open() throws HyracksDataException {
+ fsOp = (FileSinkOperator) OperatorFactory.get(fileSink, rowSchema);
+ fsOp.setChildOperators(null);
+ fsOp.setParentOperators(null);
+ conf.setClassLoader(this.getClass().getClassLoader());
- ObjectInspector[] inspectors = new ObjectInspector[1];
- inspectors[0] = inputInspector;
- try {
- fsOp.initialize(conf, inspectors);
- fsOp.setExecContext(null);
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
+ ObjectInspector[] inspectors = new ObjectInspector[1];
+ inspectors[0] = inputInspector;
+ try {
+ fsOp.initialize(conf, inspectors);
+ fsOp.setExecContext(null);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
- @Override
- public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
- accessor.reset(buffer);
- int n = accessor.getTupleCount();
- try {
- for (int i = 0; i < n; ++i) {
- tuple.reset(accessor, i);
- cachedInput.init(tuple);
- fsOp.process(cachedInput, 0);
- }
- } catch (HiveException e) {
- throw new HyracksDataException(e);
- }
- }
+ @Override
+ public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+ accessor.reset(buffer);
+ int n = accessor.getTupleCount();
+ try {
+ for (int i = 0; i < n; ++i) {
+ tuple.reset(accessor, i);
+ cachedInput.init(tuple);
+ fsOp.process(cachedInput, 0);
+ }
+ } catch (HiveException e) {
+ throw new HyracksDataException(e);
+ }
+ }
- @Override
- public void close() throws HyracksDataException {
- try {
- Thread.currentThread().setContextClassLoader(this.getClass().getClassLoader());
- fsOp.closeOp(false);
- } catch (HiveException e) {
- throw new HyracksDataException(e);
- }
- }
+ @Override
+ public void close() throws HyracksDataException {
+ try {
+ Thread.currentThread().setContextClassLoader(
+ this.getClass().getClassLoader());
+ fsOp.closeOp(false);
+ } catch (HiveException e) {
+ throw new HyracksDataException(e);
+ }
+ }
- @Override
- public void setFrameWriter(int index, IFrameWriter writer, RecordDescriptor recordDesc) {
- throw new IllegalStateException();
- }
+ @Override
+ public void setFrameWriter(int index, IFrameWriter writer,
+ RecordDescriptor recordDesc) {
+ throw new IllegalStateException();
+ }
- @Override
- public void setInputRecordDescriptor(int index, RecordDescriptor recordDescriptor) {
- }
+ @Override
+ public void setInputRecordDescriptor(int index,
+ RecordDescriptor recordDescriptor) {
+ }
- @Override
- public void fail() throws HyracksDataException {
+ @Override
+ public void fail() throws HyracksDataException {
- }
+ }
}
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java
index eba5560..7d50fa0 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java
@@ -36,8 +36,8 @@
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
/**
- * The runtime of a SuperActivity, which internally executes a DAG of one-to-one connected
- * activities in a single thread.
+ * The runtime of a SuperActivity, which internally executes a DAG of one-to-one
+ * connected activities in a single thread.
*
* @author yingyib
*/
@@ -62,7 +62,8 @@
this.nPartitions = nPartitions;
/**
- * initialize the writer-relationship for the internal DAG of operator node pushables
+ * initialize the writer-relationship for the internal DAG of operator
+ * node pushables
*/
try {
init();
@@ -72,7 +73,7 @@
}
@Override
- public synchronized void initialize() throws HyracksDataException {
+ public void initialize() throws HyracksDataException {
/**
* initialize operator node pushables in the BFS order
*/
@@ -105,7 +106,8 @@
}
/**
- * Using BFS (breadth-first search) to construct to runtime execution DAG;
+ * Using BFS (breadth-first search) to construct to runtime execution
+ * DAG;
*/
while (childQueue.size() > 0) {
/**
@@ -165,8 +167,7 @@
}
@Override
- public synchronized void setOutputFrameWriter(int clusterOutputIndex, IFrameWriter writer,
- RecordDescriptor recordDesc) {
+ public void setOutputFrameWriter(int clusterOutputIndex, IFrameWriter writer, RecordDescriptor recordDesc) {
/**
* set the right output frame writer
*/
@@ -176,7 +177,7 @@
}
@Override
- public synchronized IFrameWriter getInputFrameWriter(final int index) {
+ public IFrameWriter getInputFrameWriter(final int index) {
/**
* get the right IFrameWriter from the cluster input index
*/
diff --git a/hyracks/hyracks-dist/src/main/resources/bin/getip.sh b/hyracks/hyracks-dist/src/main/resources/bin/getip.sh
index e0cdf73..a691c0f 100755
--- a/hyracks/hyracks-dist/src/main/resources/bin/getip.sh
+++ b/hyracks/hyracks-dist/src/main/resources/bin/getip.sh
@@ -6,6 +6,10 @@
then
#Get IP Address
IPADDR=`/sbin/ifconfig eth0 | grep "inet " | awk '{print $2}' | cut -f 2 -d ':'`
+ if [ "$IPADDR" = "" ]
+ then
+ IPADDR=`/sbin/ifconfig em1 | grep "inet " | awk '{print $2}' | cut -f 2 -d ':'`
+ fi
if [ "$IPADDR" = "" ]
then
IPADDR=`/sbin/ifconfig lo | grep "inet " | awk '{print $2}' | cut -f 2 -d ':'`
diff --git a/hyracks/hyracks-dist/src/main/resources/bin/startcc.sh b/hyracks/hyracks-dist/src/main/resources/bin/startcc.sh
index fe2551d..efb79ce 100755
--- a/hyracks/hyracks-dist/src/main/resources/bin/startcc.sh
+++ b/hyracks/hyracks-dist/src/main/resources/bin/startcc.sh
@@ -22,4 +22,4 @@
#Launch hyracks cc script
chmod -R 755 $HYRACKS_HOME
-$HYRACKS_HOME/hyracks-server/target/appassembler/bin/hyrackscc -client-net-ip-address $CCHOST -cluster-net-ip-address $CCHOST -client-net-port $CC_CLIENTPORT -cluster-net-port $CC_CLUSTERPORT -max-heartbeat-lapse-periods 999999 -default-max-job-attempts 0 -job-history-size 3 &> $CCLOGS_DIR/cc.log &
+$HYRACKS_HOME/hyracks-server/target/appassembler/bin/hyrackscc -client-net-ip-address $CCHOST -cluster-net-ip-address $CCHOST -client-net-port $CC_CLIENTPORT -cluster-net-port $CC_CLUSTERPORT -max-heartbeat-lapse-periods 999999 -default-max-job-attempts 0 -job-history-size 0 &> $CCLOGS_DIR/cc.log &
diff --git a/pregelix/pregelix-core/src/main/resources/scripts/startcc.sh b/pregelix/pregelix-core/src/main/resources/scripts/startcc.sh
index fe2551d..efb79ce 100644
--- a/pregelix/pregelix-core/src/main/resources/scripts/startcc.sh
+++ b/pregelix/pregelix-core/src/main/resources/scripts/startcc.sh
@@ -22,4 +22,4 @@
#Launch hyracks cc script
chmod -R 755 $HYRACKS_HOME
-$HYRACKS_HOME/hyracks-server/target/appassembler/bin/hyrackscc -client-net-ip-address $CCHOST -cluster-net-ip-address $CCHOST -client-net-port $CC_CLIENTPORT -cluster-net-port $CC_CLUSTERPORT -max-heartbeat-lapse-periods 999999 -default-max-job-attempts 0 -job-history-size 3 &> $CCLOGS_DIR/cc.log &
+$HYRACKS_HOME/hyracks-server/target/appassembler/bin/hyrackscc -client-net-ip-address $CCHOST -cluster-net-ip-address $CCHOST -client-net-port $CC_CLIENTPORT -cluster-net-port $CC_CLUSTERPORT -max-heartbeat-lapse-periods 999999 -default-max-job-attempts 0 -job-history-size 0 &> $CCLOGS_DIR/cc.log &