Merge fullstack_asterix_stabilization with fullstack_hyracks_result_distribution.

git-svn-id: https://hyracks.googlecode.com/svn/branches/fullstack_hyracks_result_distribution@3170 123451ca-8445-de46-9d55-352943316053
diff --git a/hivesterix-dist/src/main/java/edu/uci/ics/hivesterix/runtime/exec/HyracksExecutionEngine.java b/hivesterix-dist/src/main/java/edu/uci/ics/hivesterix/runtime/exec/HyracksExecutionEngine.java
index 379737f..e075f09 100644
--- a/hivesterix-dist/src/main/java/edu/uci/ics/hivesterix/runtime/exec/HyracksExecutionEngine.java
+++ b/hivesterix-dist/src/main/java/edu/uci/ics/hivesterix/runtime/exec/HyracksExecutionEngine.java
@@ -558,9 +558,8 @@
             }

         }

 

-        String applicationName = "hivesterix";

         long start = System.currentTimeMillis();

-        JobId jobId = hcc.startJob(applicationName, job);

+        JobId jobId = hcc.startJob(job);

         hcc.waitForCompletion(jobId);

 

         // System.out.println("job finished: " + jobId.toString());

diff --git a/hivesterix-dist/src/main/java/org/apache/hadoop/hive/serde2/lazy/objectinspector/LazyObjectInspectorFactory.java b/hivesterix-dist/src/main/java/org/apache/hadoop/hive/serde2/lazy/objectinspector/LazyObjectInspectorFactory.java
new file mode 100644
index 0000000..7920001
--- /dev/null
+++ b/hivesterix-dist/src/main/java/org/apache/hadoop/hive/serde2/lazy/objectinspector/LazyObjectInspectorFactory.java
@@ -0,0 +1,123 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.serde2.lazy.objectinspector;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.io.Text;
+
+/**
+ * ObjectInspectorFactory is the primary way to create new ObjectInspector
+ * instances.
+ * SerDe classes should call the static functions in this library to create an
+ * ObjectInspector to return to the caller of SerDe2.getObjectInspector().
+ * The reason of having caches here is that ObjectInspectors do not have an
+ * internal state - so ObjectInspectors with the same construction parameters
+ * should result in exactly the same ObjectInspector.
+ */
+public final class LazyObjectInspectorFactory {
+
+    static ConcurrentHashMap<ArrayList<Object>, LazySimpleStructObjectInspector> cachedLazySimpleStructObjectInspector = new ConcurrentHashMap<ArrayList<Object>, LazySimpleStructObjectInspector>();
+
+    public static LazySimpleStructObjectInspector getLazySimpleStructObjectInspector(List<String> structFieldNames,
+            List<ObjectInspector> structFieldObjectInspectors, byte separator, Text nullSequence,
+            boolean lastColumnTakesRest, boolean escaped, byte escapeChar) {
+        ArrayList<Object> signature = new ArrayList<Object>();
+        signature.add(structFieldNames);
+        signature.add(structFieldObjectInspectors);
+        signature.add(Byte.valueOf(separator));
+        signature.add(nullSequence.toString());
+        signature.add(Boolean.valueOf(lastColumnTakesRest));
+        signature.add(Boolean.valueOf(escaped));
+        signature.add(Byte.valueOf(escapeChar));
+        LazySimpleStructObjectInspector result = cachedLazySimpleStructObjectInspector.get(signature);
+        if (result == null) {
+            result = new LazySimpleStructObjectInspector(structFieldNames, structFieldObjectInspectors, separator,
+                    nullSequence, lastColumnTakesRest, escaped, escapeChar);
+            cachedLazySimpleStructObjectInspector.put(signature, result);
+        }
+        return result;
+    }
+
+    static ConcurrentHashMap<ArrayList<Object>, LazyListObjectInspector> cachedLazySimpleListObjectInspector = new ConcurrentHashMap<ArrayList<Object>, LazyListObjectInspector>();
+
+    public static LazyListObjectInspector getLazySimpleListObjectInspector(ObjectInspector listElementObjectInspector,
+            byte separator, Text nullSequence, boolean escaped, byte escapeChar) {
+        ArrayList<Object> signature = new ArrayList<Object>();
+        signature.add(listElementObjectInspector);
+        signature.add(Byte.valueOf(separator));
+        signature.add(nullSequence.toString());
+        signature.add(Boolean.valueOf(escaped));
+        signature.add(Byte.valueOf(escapeChar));
+        LazyListObjectInspector result = cachedLazySimpleListObjectInspector.get(signature);
+        if (result == null) {
+            result = new LazyListObjectInspector(listElementObjectInspector, separator, nullSequence, escaped,
+                    escapeChar);
+            cachedLazySimpleListObjectInspector.put(signature, result);
+        }
+        return result;
+    }
+
+    static ConcurrentHashMap<ArrayList<Object>, LazyMapObjectInspector> cachedLazySimpleMapObjectInspector = new ConcurrentHashMap<ArrayList<Object>, LazyMapObjectInspector>();
+
+    public static LazyMapObjectInspector getLazySimpleMapObjectInspector(ObjectInspector mapKeyObjectInspector,
+            ObjectInspector mapValueObjectInspector, byte itemSeparator, byte keyValueSeparator, Text nullSequence,
+            boolean escaped, byte escapeChar) {
+        ArrayList<Object> signature = new ArrayList<Object>();
+        signature.add(mapKeyObjectInspector);
+        signature.add(mapValueObjectInspector);
+        signature.add(Byte.valueOf(itemSeparator));
+        signature.add(Byte.valueOf(keyValueSeparator));
+        signature.add(nullSequence.toString());
+        signature.add(Boolean.valueOf(escaped));
+        signature.add(Byte.valueOf(escapeChar));
+        LazyMapObjectInspector result = cachedLazySimpleMapObjectInspector.get(signature);
+        if (result == null) {
+            result = new LazyMapObjectInspector(mapKeyObjectInspector, mapValueObjectInspector, itemSeparator,
+                    keyValueSeparator, nullSequence, escaped, escapeChar);
+            cachedLazySimpleMapObjectInspector.put(signature, result);
+        }
+        return result;
+    }
+
+    static ConcurrentHashMap<List<Object>, LazyUnionObjectInspector> cachedLazyUnionObjectInspector = new ConcurrentHashMap<List<Object>, LazyUnionObjectInspector>();
+
+    public static LazyUnionObjectInspector getLazyUnionObjectInspector(List<ObjectInspector> ois, byte separator,
+            Text nullSequence, boolean escaped, byte escapeChar) {
+        List<Object> signature = new ArrayList<Object>();
+        signature.add(ois);
+        signature.add(Byte.valueOf(separator));
+        signature.add(nullSequence.toString());
+        signature.add(Boolean.valueOf(escaped));
+        signature.add(Byte.valueOf(escapeChar));
+        LazyUnionObjectInspector result = cachedLazyUnionObjectInspector.get(signature);
+        if (result == null) {
+            result = new LazyUnionObjectInspector(ois, separator, nullSequence, escaped, escapeChar);
+            cachedLazyUnionObjectInspector.put(signature, result);
+        }
+        return result;
+    }
+
+    private LazyObjectInspectorFactory() {
+        // prevent instantiation
+    }
+}
\ No newline at end of file
diff --git a/hivesterix-dist/src/main/resources/scripts/startnc.sh b/hivesterix-dist/src/main/resources/scripts/startnc.sh
index 6e0f90e..23a4c36 100644
--- a/hivesterix-dist/src/main/resources/scripts/startnc.sh
+++ b/hivesterix-dist/src/main/resources/scripts/startnc.sh
@@ -46,4 +46,4 @@
 cd $NCTMP_DIR
 
 #Launch hyracks nc
-$HYRACKS_HOME/hyracks-server/target/appassembler/bin/hyracksnc -cc-host $CCHOST -cc-port $CC_CLUSTERPORT -cluster-net-ip-address $IPADDR  -data-ip-address $IPADDR -node-id $NODEID -iodevices "${IO_DIRS}" &> $NCLOGS_DIR/$NODEID.log &
+$HYRACKS_HOME/hyracks-server/target/appassembler/bin/hyracksnc -cc-host $CCHOST -cc-port $CC_CLUSTERPORT -cluster-net-ip-address $IPADDR  -data-ip-address $IPADDR -result-ip-address $IPADDR -node-id $NODEID -iodevices "${IO_DIRS}" &> $NCLOGS_DIR/$NODEID.log &
diff --git a/hivesterix-dist/src/main/resources/scripts/stopnc.sh b/hivesterix-dist/src/main/resources/scripts/stopnc.sh
index 03ce4e7..35c4794 100644
--- a/hivesterix-dist/src/main/resources/scripts/stopnc.sh
+++ b/hivesterix-dist/src/main/resources/scripts/stopnc.sh
@@ -5,6 +5,10 @@
 PID=`ps -ef|grep ${USER}|grep java|grep 'Dapp.name=hyracksnc'|awk '{print $2}'`
 
 if [ "$PID" == "" ]; then
+  PID=`ps -ef|grep ${USER}|grep java|grep 'hyracks'|awk '{print $2}'`
+fi
+
+if [ "$PID" == "" ]; then
   USERID=`id | sed 's/^uid=//;s/(.*$//'`
   PID=`ps -ef|grep ${USERID}|grep java|grep 'Dapp.name=hyracksnc'|awk '{print $2}'`
 fi
diff --git a/hivesterix-dist/src/test/java/edu/uci/ics/hivesterix/perf/base/AbstractPerfTestSuiteClass.java b/hivesterix-dist/src/test/java/edu/uci/ics/hivesterix/perf/base/AbstractPerfTestSuiteClass.java
index c882742..393378f 100644
--- a/hivesterix-dist/src/test/java/edu/uci/ics/hivesterix/perf/base/AbstractPerfTestSuiteClass.java
+++ b/hivesterix-dist/src/test/java/edu/uci/ics/hivesterix/perf/base/AbstractPerfTestSuiteClass.java
@@ -22,8 +22,6 @@
 import org.apache.hadoop.mapred.MiniMRCluster;

 

 import edu.uci.ics.hivesterix.common.config.ConfUtil;

-import edu.uci.ics.hyracks.api.client.HyracksConnection;

-import edu.uci.ics.hyracks.api.client.IHyracksClientConnection;

 import edu.uci.ics.hyracks.control.cc.ClusterControllerService;

 import edu.uci.ics.hyracks.control.common.controllers.CCConfig;

 import edu.uci.ics.hyracks.control.common.controllers.NCConfig;

@@ -93,7 +91,6 @@
         String ipAddress = hconf.get("hive.hyracks.host");

         int clientPort = Integer.parseInt(hconf.get("hive.hyracks.port"));

         int clusterPort = clientPort;

-        String applicationName = hconf.get("hive.hyracks.app");

 

         // start hyracks cc

         CCConfig ccConfig = new CCConfig();

@@ -119,9 +116,6 @@
             nc.start();

             ncs.put(ncConfig.nodeId, nc);

         }

-

-        IHyracksClientConnection hcc = new HyracksConnection(ccConfig.clientNetIpAddress, clientPort);

-        hcc.createApplication(applicationName, null);

     }

 

     protected void makeDir(String path) throws IOException {

diff --git a/hivesterix-dist/src/test/java/edu/uci/ics/hivesterix/test/base/AbstractTestSuiteClass.java b/hivesterix-dist/src/test/java/edu/uci/ics/hivesterix/test/base/AbstractTestSuiteClass.java
index b258221..72c406f 100644
--- a/hivesterix-dist/src/test/java/edu/uci/ics/hivesterix/test/base/AbstractTestSuiteClass.java
+++ b/hivesterix-dist/src/test/java/edu/uci/ics/hivesterix/test/base/AbstractTestSuiteClass.java
@@ -28,8 +28,6 @@
 import org.apache.hadoop.mapred.MiniMRCluster;

 

 import edu.uci.ics.hivesterix.common.config.ConfUtil;

-import edu.uci.ics.hyracks.api.client.HyracksConnection;

-import edu.uci.ics.hyracks.api.client.IHyracksClientConnection;

 import edu.uci.ics.hyracks.control.cc.ClusterControllerService;

 import edu.uci.ics.hyracks.control.common.controllers.CCConfig;

 import edu.uci.ics.hyracks.control.common.controllers.NCConfig;

@@ -122,7 +120,6 @@
         }

         int clientPort = Integer.parseInt(clusterProps.getProperty("CC_CLIENTPORT"));

         int netPort = Integer.parseInt(clusterProps.getProperty("CC_CLUSTERPORT"));

-        String applicationName = "hivesterix";

 

         // start hyracks cc

         CCConfig ccConfig = new CCConfig();

@@ -149,9 +146,6 @@
             nc.start();

             ncs.put(ncConfig.nodeId, nc);

         }

-

-        IHyracksClientConnection hcc = new HyracksConnection(ccConfig.clientNetIpAddress, clientPort);

-        hcc.createApplication(applicationName, null);

     }

 

     protected void makeDir(String path) throws IOException {

diff --git a/hivesterix-dist/src/test/resources/runtimefunctionts/hive/conf/topology.xml b/hivesterix-dist/src/test/resources/runtimefunctionts/hive/conf/topology.xml
index 4aac091..4710706 100644
--- a/hivesterix-dist/src/test/resources/runtimefunctionts/hive/conf/topology.xml
+++ b/hivesterix-dist/src/test/resources/runtimefunctionts/hive/conf/topology.xml
@@ -1,12 +1,7 @@
 <cluster-topology>
-    <network-switch name="switch1">
-        <network-switch name="switch2">
-            <terminal name="nc0"/>
-            <terminal name="nc3"/>
-        </network-switch>
-        <network-switch name="switch3">
-            <terminal name="nc1"/>
-            <terminal name="nc4"/>
+    <network-switch name="Global">
+        <network-switch name="local">
+            <terminal name="127.0.0.1"/>
         </network-switch>
     </network-switch>
 </cluster-topology>
\ No newline at end of file
diff --git a/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/jobgen/HiveMetaDataProvider.java b/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/jobgen/HiveMetaDataProvider.java
index 282bfb8..ce8cc77 100644
--- a/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/jobgen/HiveMetaDataProvider.java
+++ b/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/jobgen/HiveMetaDataProvider.java
@@ -102,6 +102,13 @@
     }

 

     @Override

+    public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getResultHandleRuntime(IDataSink sink,

+            int[] printColumns, IPrinterFactory[] printerFactories, RecordDescriptor inputDesc, boolean ordered,

+            JobSpecification spec) throws AlgebricksException {

+        return null;

+    }

+

+    @Override

     public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getWriteResultRuntime(IDataSource<S> arg0,

             IOperatorSchema arg1, List<LogicalVariable> arg2, LogicalVariable arg3, JobGenContext arg4,

             JobSpecification arg5) throws AlgebricksException {

diff --git a/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/jobgen/HiveScanRuntimeGenerator.java b/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/jobgen/HiveScanRuntimeGenerator.java
index 0d2c78a..5e4e21e 100644
--- a/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/jobgen/HiveScanRuntimeGenerator.java
+++ b/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/jobgen/HiveScanRuntimeGenerator.java
@@ -25,6 +25,7 @@
 import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;

 import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;

 import edu.uci.ics.hyracks.api.job.JobSpecification;

+import edu.uci.ics.hyracks.api.topology.ClusterTopology;

 import edu.uci.ics.hyracks.hdfs.dataflow.HDFSReadOperatorDescriptor;

 import edu.uci.ics.hyracks.hdfs.scheduler.Scheduler;

 

@@ -86,7 +87,8 @@
             JobConf conf = ConfUtil.getJobConf(fileDesc.getInputFileFormatClass(), filePath);

             String[] locConstraints = ConfUtil.getNCs();

             Map<String, NodeControllerInfo> ncNameToNcInfos = ConfUtil.getNodeControllerInfo();

-            Scheduler scheduler = new Scheduler(ncNameToNcInfos);

+            ClusterTopology topology = ConfUtil.getClusterTopology();

+            Scheduler scheduler = new Scheduler(ncNameToNcInfos, topology);

             InputSplit[] splits = conf.getInputFormat().getSplits(conf, locConstraints.length);

             String[] schedule = scheduler.getLocationConstraints(splits);

             IOperatorDescriptor scanner = new HDFSReadOperatorDescriptor(jobSpec, recDescriptor, conf, splits,