Merged hyracks_asterix_vldb_demo -r 1860:1862

git-svn-id: https://hyracks.googlecode.com/svn/branches/fullstack_staging@1971 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/context/IHyracksRootContext.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/context/IHyracksRootContext.java
index a94c6de..c4989c5 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/context/IHyracksRootContext.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/context/IHyracksRootContext.java
@@ -14,8 +14,13 @@
  */
 package edu.uci.ics.hyracks.api.context;
 
+import java.util.Map;
+
+import edu.uci.ics.hyracks.api.client.NodeControllerInfo;
 import edu.uci.ics.hyracks.api.io.IIOManager;
 
 public interface IHyracksRootContext {
     public IIOManager getIOManager();
+
+    public Map<String, NodeControllerInfo> getNodeControllerInfos() throws Exception;
 }
\ No newline at end of file
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
index 32b031d..5a33891 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
@@ -70,6 +70,7 @@
 import edu.uci.ics.hyracks.control.common.ipc.CCNCFunctions.Function;
 import edu.uci.ics.hyracks.control.common.logs.LogFile;
 import edu.uci.ics.hyracks.control.common.work.IPCResponder;
+import edu.uci.ics.hyracks.control.common.work.IResultCallback;
 import edu.uci.ics.hyracks.control.common.work.WorkQueue;
 import edu.uci.ics.hyracks.ipc.api.IIPCHandle;
 import edu.uci.ics.hyracks.ipc.api.IIPCI;
@@ -352,7 +353,8 @@
 
     private class ClusterControllerIPCI implements IIPCI {
         @Override
-        public void deliverIncomingMessage(IIPCHandle handle, long mid, long rmid, Object payload, Exception exception) {
+        public void deliverIncomingMessage(final IIPCHandle handle, long mid, long rmid, Object payload,
+                Exception exception) {
             CCNCFunctions.Function fn = (Function) payload;
             switch (fn.getFunctionId()) {
                 case REGISTER_NODE: {
@@ -425,6 +427,23 @@
                             .getAppName(), rsf.getNodeId()));
                     return;
                 }
+
+                case GET_NODE_CONTROLLERS_INFO: {
+                    workQueue.schedule(new GetNodeControllersInfoWork(ClusterControllerService.this,
+                            new IResultCallback<Map<String, NodeControllerInfo>>() {
+                                @Override
+                                public void setValue(Map<String, NodeControllerInfo> result) {
+                                    new IPCResponder<CCNCFunctions.GetNodeControllersInfoResponseFunction>(handle, -1)
+                                            .setValue(new CCNCFunctions.GetNodeControllersInfoResponseFunction(result));
+                                }
+
+                                @Override
+                                public void setException(Exception e) {
+
+                                }
+                            }));
+                    return;
+                }
             }
             LOGGER.warning("Unknown function: " + fn.getFunctionId());
         }
diff --git a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/base/IClusterController.java b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/base/IClusterController.java
index a25250a..0c5bb2f 100644
--- a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/base/IClusterController.java
+++ b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/base/IClusterController.java
@@ -18,7 +18,6 @@
 
 import edu.uci.ics.hyracks.api.dataflow.TaskAttemptId;
 import edu.uci.ics.hyracks.api.job.JobId;
-import edu.uci.ics.hyracks.api.messages.IMessage;
 import edu.uci.ics.hyracks.control.common.application.ApplicationStatus;
 import edu.uci.ics.hyracks.control.common.controllers.NodeRegistration;
 import edu.uci.ics.hyracks.control.common.heartbeat.HeartbeatData;
@@ -50,4 +49,6 @@
     public void notifyApplicationStateChange(String nodeId, String appName, ApplicationStatus status) throws Exception;
 
     public void sendApplicationMessageToCC(byte[] data, String appName, String nodeId) throws Exception;
+
+    public void getNodeControllerInfos() throws Exception;
 }
\ No newline at end of file
diff --git a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/CCNCFunctions.java b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/CCNCFunctions.java
index 8f0056f..557a8cb 100644
--- a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/CCNCFunctions.java
+++ b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/CCNCFunctions.java
@@ -28,6 +28,7 @@
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
+import edu.uci.ics.hyracks.api.client.NodeControllerInfo;
 import edu.uci.ics.hyracks.api.comm.NetworkAddress;
 import edu.uci.ics.hyracks.api.dataflow.ActivityId;
 import edu.uci.ics.hyracks.api.dataflow.ConnectorDescriptorId;
@@ -77,6 +78,8 @@
         DESTROY_APPLICATION,
         REPORT_PARTITION_AVAILABILITY,
         SEND_APPLICATION_MESSAGE,
+        GET_NODE_CONTROLLERS_INFO,
+        GET_NODE_CONTROLLERS_INFO_RESPONSE,
 
         OTHER
     }
@@ -643,6 +646,34 @@
         }
     }
 
+    public static class GetNodeControllersInfoFunction extends Function {
+        private static final long serialVersionUID = 1L;
+
+        @Override
+        public FunctionId getFunctionId() {
+            return FunctionId.GET_NODE_CONTROLLERS_INFO;
+        }
+    }
+
+    public static class GetNodeControllersInfoResponseFunction extends Function {
+        private static final long serialVersionUID = 1L;
+
+        private final Map<String, NodeControllerInfo> ncInfos;
+
+        public GetNodeControllersInfoResponseFunction(Map<String, NodeControllerInfo> ncInfos) {
+            this.ncInfos = ncInfos;
+        }
+
+        @Override
+        public FunctionId getFunctionId() {
+            return FunctionId.GET_NODE_CONTROLLERS_INFO_RESPONSE;
+        }
+
+        public Map<String, NodeControllerInfo> getNodeControllerInfos() {
+            return ncInfos;
+        }
+    }
+
     public static class ReportPartitionAvailabilityFunction extends Function {
         private static final long serialVersionUID = 1L;
 
diff --git a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
index d789768..bbaab4e 100644
--- a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
+++ b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
@@ -103,7 +103,13 @@
 
     @Override
     public void sendApplicationMessageToCC(byte[] data, String appName, String nodeId) throws Exception {
-        CCNCFunctions.SendApplicationMessageFunction fn = new CCNCFunctions.SendApplicationMessageFunction(data, appName, nodeId);
+        CCNCFunctions.SendApplicationMessageFunction fn = new CCNCFunctions.SendApplicationMessageFunction(data,
+                appName, nodeId);
         ipcHandle.send(-1, fn, null);
     }
+
+    @Override
+    public void getNodeControllerInfos() throws Exception {
+        ipcHandle.send(-1, new CCNCFunctions.GetNodeControllersInfoFunction(), null);
+    }
 }
\ No newline at end of file
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
index 58a173c..0195143 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
@@ -40,6 +40,10 @@
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
+import org.apache.commons.lang3.mutable.Mutable;
+import org.apache.commons.lang3.mutable.MutableObject;
+
+import edu.uci.ics.hyracks.api.client.NodeControllerInfo;
 import edu.uci.ics.hyracks.api.context.IHyracksRootContext;
 import edu.uci.ics.hyracks.api.io.IODeviceHandle;
 import edu.uci.ics.hyracks.api.job.JobId;
@@ -122,6 +126,8 @@
 
     private final OperatingSystemMXBean osMXBean;
 
+    private final Mutable<FutureValue<Map<String, NodeControllerInfo>>> getNodeControllerInfosAcceptor;
+
     public NodeControllerService(NCConfig ncConfig) throws Exception {
         this.ncConfig = ncConfig;
         id = ncConfig.nodeId;
@@ -129,7 +135,7 @@
         NodeControllerIPCI ipci = new NodeControllerIPCI();
         ipc = new IPCSystem(new InetSocketAddress(ncConfig.clusterNetIPAddress, 0), ipci,
                 new CCNCFunctions.SerializerDeserializer());
-        this.ctx = new RootHyracksContext(new IOManager(getDevices(ncConfig.ioDevices), executor));
+        this.ctx = new RootHyracksContext(this, new IOManager(getDevices(ncConfig.ioDevices), executor));
         if (id == null) {
             throw new Exception("id not set");
         }
@@ -148,6 +154,7 @@
         runtimeMXBean = ManagementFactory.getRuntimeMXBean();
         osMXBean = ManagementFactory.getOperatingSystemMXBean();
         registrationPending = true;
+        getNodeControllerInfosAcceptor = new MutableObject<FutureValue<Map<String, NodeControllerInfo>>>();
     }
 
     public IHyracksRootContext getRootContext() {
@@ -171,6 +178,28 @@
         notifyAll();
     }
 
+    public Map<String, NodeControllerInfo> getNodeControllersInfo() throws Exception {
+        FutureValue<Map<String, NodeControllerInfo>> fv = new FutureValue<Map<String, NodeControllerInfo>>();
+        synchronized (getNodeControllerInfosAcceptor) {
+            while (getNodeControllerInfosAcceptor.getValue() != null) {
+                getNodeControllerInfosAcceptor.wait();
+            }
+            getNodeControllerInfosAcceptor.setValue(fv);
+        }
+        ccs.getNodeControllerInfos();
+        return fv.get();
+    }
+
+    private void setNodeControllersInfo(Map<String, NodeControllerInfo> ncInfos) {
+        FutureValue<Map<String, NodeControllerInfo>> fv;
+        synchronized (getNodeControllerInfosAcceptor) {
+            fv = getNodeControllerInfosAcceptor.getValue();
+            getNodeControllerInfosAcceptor.setValue(null);
+            getNodeControllerInfosAcceptor.notifyAll();
+        }
+        fv.setValue(ncInfos);
+    }
+
     @Override
     public void start() throws Exception {
         LOGGER.log(Level.INFO, "Starting NodeControllerService");
@@ -418,6 +447,12 @@
                     setNodeRegistrationResult(nrrf.getNodeParameters(), nrrf.getException());
                     return;
                 }
+
+                case GET_NODE_CONTROLLERS_INFO_RESPONSE: {
+                    CCNCFunctions.GetNodeControllersInfoResponseFunction gncirf = (CCNCFunctions.GetNodeControllersInfoResponseFunction) fn;
+                    setNodeControllersInfo(gncirf.getNodeControllerInfos());
+                    return;
+                }
             }
             throw new IllegalArgumentException("Unknown function: " + fn.getFunctionId());
 
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/runtime/RootHyracksContext.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/runtime/RootHyracksContext.java
index 9e42277..4651149 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/runtime/RootHyracksContext.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/runtime/RootHyracksContext.java
@@ -14,13 +14,20 @@
  */
 package edu.uci.ics.hyracks.control.nc.runtime;
 
+import java.util.Map;
+
+import edu.uci.ics.hyracks.api.client.NodeControllerInfo;
 import edu.uci.ics.hyracks.api.context.IHyracksRootContext;
 import edu.uci.ics.hyracks.api.io.IIOManager;
+import edu.uci.ics.hyracks.control.nc.NodeControllerService;
 
 public class RootHyracksContext implements IHyracksRootContext {
+    private final NodeControllerService ncs;
+
     private final IIOManager ioManager;
 
-    public RootHyracksContext(IIOManager ioManager) {
+    public RootHyracksContext(NodeControllerService ncs, IIOManager ioManager) {
+        this.ncs = ncs;
         this.ioManager = ioManager;
     }
 
@@ -28,4 +35,9 @@
     public IIOManager getIOManager() {
         return ioManager;
     }
+
+    @Override
+    public Map<String, NodeControllerInfo> getNodeControllerInfos() throws Exception {
+        return ncs.getNodeControllersInfo();
+    }
 }
\ No newline at end of file
diff --git a/hyracks/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/data/parsers/LongParserFactory.java b/hyracks/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/data/parsers/LongParserFactory.java
index 927d21c..6a9dab7 100644
--- a/hyracks/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/data/parsers/LongParserFactory.java
+++ b/hyracks/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/data/parsers/LongParserFactory.java
@@ -48,6 +48,8 @@
 
                         case '-':
                             sign = -1;
+                            pre = false;
+                            break;
 
                         case '0':
                         case '1':
@@ -60,6 +62,7 @@
                         case '8':
                         case '9':
                             pre = false;
+                            n = n * 10 + (ch - '0');
                             break;
 
                         default:
diff --git a/hyracks/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/test/support/TestRootContext.java b/hyracks/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/test/support/TestRootContext.java
index e195036..2fe5ded 100644
--- a/hyracks/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/test/support/TestRootContext.java
+++ b/hyracks/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/test/support/TestRootContext.java
@@ -17,8 +17,10 @@
 import java.io.File;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.Executors;
 
+import edu.uci.ics.hyracks.api.client.NodeControllerInfo;
 import edu.uci.ics.hyracks.api.context.IHyracksRootContext;
 import edu.uci.ics.hyracks.api.exceptions.HyracksException;
 import edu.uci.ics.hyracks.api.io.IIOManager;
@@ -38,4 +40,9 @@
     public IIOManager getIOManager() {
         return ioManager;
     }
+
+    @Override
+    public Map<String, NodeControllerInfo> getNodeControllerInfos() throws Exception {
+        return null;
+    }
 }
\ No newline at end of file