Merged trunk -r 363:437 into branch

git-svn-id: https://hyracks.googlecode.com/svn/branches/hyracks_indexes@468 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-control-nc/.settings/org.eclipse.jdt.core.prefs b/hyracks-control-nc/.settings/org.eclipse.jdt.core.prefs
index 05fa00c..dfac000 100644
--- a/hyracks-control-nc/.settings/org.eclipse.jdt.core.prefs
+++ b/hyracks-control-nc/.settings/org.eclipse.jdt.core.prefs
@@ -1,4 +1,4 @@
-#Tue Sep 28 12:04:33 PDT 2010
+#Fri May 20 19:34:05 PDT 2011
 eclipse.preferences.version=1
 org.eclipse.jdt.core.compiler.codegen.targetPlatform=1.6
 org.eclipse.jdt.core.compiler.compliance=1.6
diff --git a/hyracks-control-nc/pom.xml b/hyracks-control-nc/pom.xml
index a8c9226..b90f4ce 100644
--- a/hyracks-control-nc/pom.xml
+++ b/hyracks-control-nc/pom.xml
@@ -2,12 +2,12 @@
   <modelVersion>4.0.0</modelVersion>
   <groupId>edu.uci.ics.hyracks</groupId>
   <artifactId>hyracks-control-nc</artifactId>
-  <version>0.1.4</version>
+  <version>0.1.5</version>
 
   <parent>
     <groupId>edu.uci.ics.hyracks</groupId>
     <artifactId>hyracks</artifactId>
-    <version>0.1.4</version>
+    <version>0.1.5</version>
   </parent>
 
   <build>
@@ -33,7 +33,7 @@
   	<dependency>
   		<groupId>edu.uci.ics.hyracks</groupId>
   		<artifactId>hyracks-control-common</artifactId>
-  		<version>0.1.4</version>
+  		<version>0.1.5</version>
   		<type>jar</type>
   		<scope>compile</scope>
   	</dependency>
diff --git a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Joblet.java b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Joblet.java
index 6cc7450..413243e 100644
--- a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Joblet.java
+++ b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Joblet.java
@@ -16,6 +16,7 @@
 
 import java.nio.ByteBuffer;
 import java.util.HashMap;
+import java.util.Hashtable;
 import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.Executor;
@@ -65,9 +66,9 @@
         this.appCtx = appCtx;
         this.jobId = jobId;
         this.attempt = attempt;
-        stageletMap = new HashMap<UUID, Stagelet>();
-        envMap = new HashMap<OperatorDescriptorId, Map<Integer, IOperatorEnvironment>>();
-        counterMap = new HashMap<String, Counter>();
+        stageletMap = new Hashtable<UUID, Stagelet>();
+        envMap = new Hashtable<OperatorDescriptorId, Map<Integer, IOperatorEnvironment>>();
+        counterMap = new Hashtable<String, Counter>();
         deallocatableRegistry = new DefaultDeallocatableRegistry();
         fileFactory = new ManagedWorkspaceFileFactory(this, (IOManager) appCtx.getRootContext().getIOManager());
     }
@@ -78,8 +79,10 @@
     }
 
     public IOperatorEnvironment getEnvironment(IOperatorDescriptor hod, int partition) {
-        if (!envMap.containsKey(hod.getOperatorId())) {
-            envMap.put(hod.getOperatorId(), new HashMap<Integer, IOperatorEnvironment>());
+        synchronized (envMap) {
+            if (!envMap.containsKey(hod.getOperatorId())) {
+                envMap.put(hod.getOperatorId(), new HashMap<Integer, IOperatorEnvironment>());
+            }
         }
         Map<Integer, IOperatorEnvironment> opEnvMap = envMap.get(hod.getOperatorId());
         if (!opEnvMap.containsKey(partition)) {
@@ -118,7 +121,7 @@
         return nodeController.getExecutor();
     }
 
-    public synchronized void notifyStageletComplete(UUID stageId, int attempt, StageletProfile stats) throws Exception {
+    public void notifyStageletComplete(UUID stageId, int attempt, StageletProfile stats) throws Exception {
         stageletMap.remove(stageId);
         nodeController.notifyStageComplete(jobId, stageId, attempt, stats);
     }
@@ -132,15 +135,19 @@
         return nodeController;
     }
 
-    public synchronized void dumpProfile(JobletProfile jProfile) {
-        Map<String, Long> counters = jProfile.getCounters();
-        for (Map.Entry<String, Counter> e : counterMap.entrySet()) {
-            counters.put(e.getKey(), e.getValue().get());
+    public void dumpProfile(JobletProfile jProfile) {
+        synchronized (counterMap) {
+            Map<String, Long> counters = jProfile.getCounters();
+            for (Map.Entry<String, Counter> e : counterMap.entrySet()) {
+                counters.put(e.getKey(), e.getValue().get());
+            }
         }
-        for (Stagelet si : stageletMap.values()) {
-            StageletProfile sProfile = new StageletProfile(si.getStageId());
-            si.dumpProfile(sProfile);
-            jProfile.getStageletProfiles().put(si.getStageId(), sProfile);
+        synchronized (stageletMap) {
+            for (Stagelet si : stageletMap.values()) {
+                StageletProfile sProfile = new StageletProfile(si.getStageId());
+                si.dumpProfile(sProfile);
+                jProfile.getStageletProfiles().put(si.getStageId(), sProfile);
+            }
         }
     }
 
@@ -193,12 +200,14 @@
     }
 
     @Override
-    public synchronized ICounter getCounter(String name, boolean create) {
-        Counter counter = counterMap.get(name);
-        if (counter == null && create) {
-            counter = new Counter(name);
-            counterMap.put(name, counter);
+    public ICounter getCounter(String name, boolean create) {
+        synchronized (counterMap) {
+            Counter counter = counterMap.get(name);
+            if (counter == null && create) {
+                counter = new Counter(name);
+                counterMap.put(name, counter);
+            }
+            return counter;
         }
-        return counter;
     }
 }
\ No newline at end of file
diff --git a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
index 2702386..be48ae6 100644
--- a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
+++ b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
@@ -129,7 +129,7 @@
         }
         nodeCapability = computeNodeCapability();
         connectionManager = new ConnectionManager(ctx, getIpAddress(ncConfig));
-        jobletMap = new HashMap<UUID, Joblet>();
+        jobletMap = new Hashtable<UUID, Joblet>();
         timer = new Timer(true);
         serverCtx = new ServerContext(ServerContext.ServerType.NODE_CONTROLLER, new File(new File(
                 NodeControllerService.class.getName()), id));
@@ -251,13 +251,11 @@
                 for (int i : tasks.get(hanId)) {
                     IOperatorNodePushable hon = han.createPushRuntime(stagelet, joblet.getEnvironment(op, i), rdp, i,
                             opNumPartitions.get(op.getOperatorId()));
-                    OperatorRunnable or = new OperatorRunnable(stagelet, hon);
+                    OperatorRunnable or = new OperatorRunnable(stagelet, hon, inputs == null ? 0 : inputs.size(),
+                            executor);
                     stagelet.setOperator(op.getOperatorId(), i, or);
                     if (inputs != null) {
                         for (int j = 0; j < inputs.size(); ++j) {
-                            if (j >= 1) {
-                                throw new IllegalStateException();
-                            }
                             IConnectorDescriptor conn = inputs.get(j);
                             OperatorDescriptorId producerOpId = plan.getJobSpecification().getProducer(conn)
                                     .getOperatorId();
@@ -276,7 +274,7 @@
                             portMap.put(piId, endpoint);
                             IFrameReader reader = createReader(stagelet, conn, drlf, i, plan, stagelet,
                                     opNumPartitions.get(producerOpId), opNumPartitions.get(consumerOpId));
-                            or.setFrameReader(reader);
+                            or.setFrameReader(j, reader);
                         }
                     }
                     honMap.put(new OperatorInstanceId(op.getOperatorId(), i), or);
@@ -438,19 +436,20 @@
         si.setEndpointList(null);
     }
 
-    private synchronized Joblet getLocalJoblet(UUID jobId) throws Exception {
+    private Joblet getLocalJoblet(UUID jobId) throws Exception {
         Joblet ji = jobletMap.get(jobId);
         return ji;
     }
 
-    private synchronized Joblet getOrCreateLocalJoblet(UUID jobId, int attempt, INCApplicationContext appCtx)
-            throws Exception {
-        Joblet ji = jobletMap.get(jobId);
-        if (ji == null || ji.getAttempt() != attempt) {
-            ji = new Joblet(this, jobId, attempt, appCtx);
-            jobletMap.put(jobId, ji);
+    private Joblet getOrCreateLocalJoblet(UUID jobId, int attempt, INCApplicationContext appCtx) throws Exception {
+        synchronized (jobletMap) {
+            Joblet ji = jobletMap.get(jobId);
+            if (ji == null || ji.getAttempt() != attempt) {
+                ji = new Joblet(this, jobId, attempt, appCtx);
+                jobletMap.put(jobId, ji);
+            }
+            return ji;
         }
-        return ji;
     }
 
     public Executor getExecutor() {
@@ -458,7 +457,7 @@
     }
 
     @Override
-    public synchronized void cleanUpJob(UUID jobId) throws Exception {
+    public void cleanUpJob(UUID jobId) throws Exception {
         if (LOGGER.isLoggable(Level.INFO)) {
             LOGGER.info("Cleaning up after job: " + jobId);
         }
@@ -563,7 +562,7 @@
     }
 
     @Override
-    public synchronized void abortJoblet(UUID jobId, int attempt) throws Exception {
+    public void abortJoblet(UUID jobId, int attempt) throws Exception {
         if (LOGGER.isLoggable(Level.INFO)) {
             LOGGER.info("Aborting Job: " + jobId + ":" + attempt);
         }
@@ -588,7 +587,7 @@
             if (applications.containsKey(appName)) {
                 throw new HyracksException("Duplicate application with name: " + appName + " being created.");
             }
-            appCtx = new NCApplicationContext(serverCtx, ctx, appName);
+            appCtx = new NCApplicationContext(serverCtx, ctx, appName, id);
             applications.put(appName, appCtx);
         }
         if (deployHar) {
diff --git a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Stagelet.java b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Stagelet.java
index eb283c2..68b2cad 100644
--- a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Stagelet.java
+++ b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Stagelet.java
@@ -146,9 +146,13 @@
         });
     }
 
-    protected synchronized void notifyOperatorCompletion(OperatorInstanceId opIId) {
-        pendingOperators.remove(opIId);
-        if (pendingOperators.isEmpty()) {
+    protected void notifyOperatorCompletion(OperatorInstanceId opIId) {
+        boolean done = false;
+        synchronized (pendingOperators) {
+            pendingOperators.remove(opIId);
+            done = pendingOperators.isEmpty();
+        }
+        if (done) {
             try {
                 StageletProfile sProfile = new StageletProfile(stageId);
                 dumpProfile(sProfile);
@@ -227,7 +231,7 @@
     }
 
     @Override
-    public ICounter getCounter(String name, boolean create) {
+    public synchronized ICounter getCounter(String name, boolean create) {
         Counter counter = counterMap.get(name);
         if (counter == null && create) {
             counter = new Counter(name);
diff --git a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/application/NCApplicationContext.java b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/application/NCApplicationContext.java
index 349743b..06f4212 100644
--- a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/application/NCApplicationContext.java
+++ b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/application/NCApplicationContext.java
@@ -10,16 +10,23 @@
 import edu.uci.ics.hyracks.control.common.context.ServerContext;
 
 public class NCApplicationContext extends ApplicationContext implements INCApplicationContext {
+    private final String nodeId;
     private final IHyracksRootContext rootCtx;
     private Object appObject;
 
-    public NCApplicationContext(ServerContext serverCtx, IHyracksRootContext rootCtx, String appName)
+    public NCApplicationContext(ServerContext serverCtx, IHyracksRootContext rootCtx, String appName, String nodeId)
             throws IOException {
         super(serverCtx, appName);
+        this.nodeId = nodeId;
         this.rootCtx = rootCtx;
     }
 
     @Override
+    public String getNodeId() {
+        return nodeId;
+    }
+
+    @Override
     public void setDistributedState(Serializable state) {
         distributedState = state;
     }
diff --git a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/runtime/OperatorRunnable.java b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/runtime/OperatorRunnable.java
index eca7fd0..8edd992 100644
--- a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/runtime/OperatorRunnable.java
+++ b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/runtime/OperatorRunnable.java
@@ -15,6 +15,8 @@
 package edu.uci.ics.hyracks.control.nc.runtime;
 
 import java.nio.ByteBuffer;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Semaphore;
 
 import edu.uci.ics.hyracks.api.comm.IFrameReader;
 import edu.uci.ics.hyracks.api.comm.IFrameWriter;
@@ -24,22 +26,27 @@
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 
 public class OperatorRunnable implements Runnable {
+    private final IHyracksStageletContext ctx;
     private final IOperatorNodePushable opNode;
-    private IFrameReader reader;
-    private ByteBuffer buffer;
+    private final int nInputs;
+    private final Executor executor;
+    private IFrameReader[] readers;
     private volatile boolean abort;
 
-    public OperatorRunnable(IHyracksStageletContext ctx, IOperatorNodePushable opNode) {
+    public OperatorRunnable(IHyracksStageletContext ctx, IOperatorNodePushable opNode, int nInputs, Executor executor) {
+        this.ctx = ctx;
         this.opNode = opNode;
-        buffer = ctx.allocateFrame();
+        this.nInputs = nInputs;
+        this.executor = executor;
+        readers = new IFrameReader[nInputs];
     }
 
     public void setFrameWriter(int index, IFrameWriter writer, RecordDescriptor recordDesc) {
         opNode.setOutputFrameWriter(index, writer, recordDesc);
     }
 
-    public void setFrameReader(IFrameReader reader) {
-        this.reader = reader;
+    public void setFrameReader(int inputIdx, IFrameReader reader) {
+        this.readers[inputIdx] = reader;
     }
 
     public void abort() {
@@ -50,20 +57,28 @@
     public void run() {
         try {
             opNode.initialize();
-            if (reader != null) {
-                IFrameWriter writer = opNode.getInputFrameWriter(0);
-                writer.open();
-                reader.open();
-                while (readFrame()) {
-                    if (abort) {
-                        break;
-                    }
-                    buffer.flip();
-                    writer.nextFrame(buffer);
-                    buffer.compact();
+            if (nInputs > 0) {
+                final Semaphore sem = new Semaphore(nInputs - 1);
+                for (int i = 1; i < nInputs; ++i) {
+                    final IFrameReader reader = readers[i];
+                    final IFrameWriter writer = opNode.getInputFrameWriter(i);
+                    sem.acquire();
+                    executor.execute(new Runnable() {
+                        public void run() {
+                            try {
+                                pushFrames(reader, writer);
+                            } catch (HyracksDataException e) {
+                            } finally {
+                                sem.release();
+                            }
+                        }
+                    });
                 }
-                reader.close();
-                writer.close();
+                try {
+                    pushFrames(readers[0], opNode.getInputFrameWriter(0));
+                } finally {
+                    sem.acquire(nInputs - 1);
+                }
             }
             opNode.deinitialize();
         } catch (Exception e) {
@@ -71,8 +86,20 @@
         }
     }
 
-    protected boolean readFrame() throws HyracksDataException {
-        return reader.nextFrame(buffer);
+    private void pushFrames(IFrameReader reader, IFrameWriter writer) throws HyracksDataException {
+        ByteBuffer buffer = ctx.allocateFrame();
+        writer.open();
+        reader.open();
+        while (reader.nextFrame(buffer)) {
+            if (abort) {
+                break;
+            }
+            buffer.flip();
+            writer.nextFrame(buffer);
+            buffer.compact();
+        }
+        reader.close();
+        writer.close();
     }
 
     @Override