Merged trunk -r 363:437 into branch
git-svn-id: https://hyracks.googlecode.com/svn/branches/hyracks_indexes@468 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-control-nc/.settings/org.eclipse.jdt.core.prefs b/hyracks-control-nc/.settings/org.eclipse.jdt.core.prefs
index 05fa00c..dfac000 100644
--- a/hyracks-control-nc/.settings/org.eclipse.jdt.core.prefs
+++ b/hyracks-control-nc/.settings/org.eclipse.jdt.core.prefs
@@ -1,4 +1,4 @@
-#Tue Sep 28 12:04:33 PDT 2010
+#Fri May 20 19:34:05 PDT 2011
eclipse.preferences.version=1
org.eclipse.jdt.core.compiler.codegen.targetPlatform=1.6
org.eclipse.jdt.core.compiler.compliance=1.6
diff --git a/hyracks-control-nc/pom.xml b/hyracks-control-nc/pom.xml
index a8c9226..b90f4ce 100644
--- a/hyracks-control-nc/pom.xml
+++ b/hyracks-control-nc/pom.xml
@@ -2,12 +2,12 @@
<modelVersion>4.0.0</modelVersion>
<groupId>edu.uci.ics.hyracks</groupId>
<artifactId>hyracks-control-nc</artifactId>
- <version>0.1.4</version>
+ <version>0.1.5</version>
<parent>
<groupId>edu.uci.ics.hyracks</groupId>
<artifactId>hyracks</artifactId>
- <version>0.1.4</version>
+ <version>0.1.5</version>
</parent>
<build>
@@ -33,7 +33,7 @@
<dependency>
<groupId>edu.uci.ics.hyracks</groupId>
<artifactId>hyracks-control-common</artifactId>
- <version>0.1.4</version>
+ <version>0.1.5</version>
<type>jar</type>
<scope>compile</scope>
</dependency>
diff --git a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Joblet.java b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Joblet.java
index 6cc7450..413243e 100644
--- a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Joblet.java
+++ b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Joblet.java
@@ -16,6 +16,7 @@
import java.nio.ByteBuffer;
import java.util.HashMap;
+import java.util.Hashtable;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.Executor;
@@ -65,9 +66,9 @@
this.appCtx = appCtx;
this.jobId = jobId;
this.attempt = attempt;
- stageletMap = new HashMap<UUID, Stagelet>();
- envMap = new HashMap<OperatorDescriptorId, Map<Integer, IOperatorEnvironment>>();
- counterMap = new HashMap<String, Counter>();
+ stageletMap = new Hashtable<UUID, Stagelet>();
+ envMap = new Hashtable<OperatorDescriptorId, Map<Integer, IOperatorEnvironment>>();
+ counterMap = new Hashtable<String, Counter>();
deallocatableRegistry = new DefaultDeallocatableRegistry();
fileFactory = new ManagedWorkspaceFileFactory(this, (IOManager) appCtx.getRootContext().getIOManager());
}
@@ -78,8 +79,10 @@
}
public IOperatorEnvironment getEnvironment(IOperatorDescriptor hod, int partition) {
- if (!envMap.containsKey(hod.getOperatorId())) {
- envMap.put(hod.getOperatorId(), new HashMap<Integer, IOperatorEnvironment>());
+ synchronized (envMap) {
+ if (!envMap.containsKey(hod.getOperatorId())) {
+ envMap.put(hod.getOperatorId(), new HashMap<Integer, IOperatorEnvironment>());
+ }
}
Map<Integer, IOperatorEnvironment> opEnvMap = envMap.get(hod.getOperatorId());
if (!opEnvMap.containsKey(partition)) {
@@ -118,7 +121,7 @@
return nodeController.getExecutor();
}
- public synchronized void notifyStageletComplete(UUID stageId, int attempt, StageletProfile stats) throws Exception {
+ public void notifyStageletComplete(UUID stageId, int attempt, StageletProfile stats) throws Exception {
stageletMap.remove(stageId);
nodeController.notifyStageComplete(jobId, stageId, attempt, stats);
}
@@ -132,15 +135,19 @@
return nodeController;
}
- public synchronized void dumpProfile(JobletProfile jProfile) {
- Map<String, Long> counters = jProfile.getCounters();
- for (Map.Entry<String, Counter> e : counterMap.entrySet()) {
- counters.put(e.getKey(), e.getValue().get());
+ public void dumpProfile(JobletProfile jProfile) {
+ synchronized (counterMap) {
+ Map<String, Long> counters = jProfile.getCounters();
+ for (Map.Entry<String, Counter> e : counterMap.entrySet()) {
+ counters.put(e.getKey(), e.getValue().get());
+ }
}
- for (Stagelet si : stageletMap.values()) {
- StageletProfile sProfile = new StageletProfile(si.getStageId());
- si.dumpProfile(sProfile);
- jProfile.getStageletProfiles().put(si.getStageId(), sProfile);
+ synchronized (stageletMap) {
+ for (Stagelet si : stageletMap.values()) {
+ StageletProfile sProfile = new StageletProfile(si.getStageId());
+ si.dumpProfile(sProfile);
+ jProfile.getStageletProfiles().put(si.getStageId(), sProfile);
+ }
}
}
@@ -193,12 +200,14 @@
}
@Override
- public synchronized ICounter getCounter(String name, boolean create) {
- Counter counter = counterMap.get(name);
- if (counter == null && create) {
- counter = new Counter(name);
- counterMap.put(name, counter);
+ public ICounter getCounter(String name, boolean create) {
+ synchronized (counterMap) {
+ Counter counter = counterMap.get(name);
+ if (counter == null && create) {
+ counter = new Counter(name);
+ counterMap.put(name, counter);
+ }
+ return counter;
}
- return counter;
}
}
\ No newline at end of file
diff --git a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
index 2702386..be48ae6 100644
--- a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
+++ b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
@@ -129,7 +129,7 @@
}
nodeCapability = computeNodeCapability();
connectionManager = new ConnectionManager(ctx, getIpAddress(ncConfig));
- jobletMap = new HashMap<UUID, Joblet>();
+ jobletMap = new Hashtable<UUID, Joblet>();
timer = new Timer(true);
serverCtx = new ServerContext(ServerContext.ServerType.NODE_CONTROLLER, new File(new File(
NodeControllerService.class.getName()), id));
@@ -251,13 +251,11 @@
for (int i : tasks.get(hanId)) {
IOperatorNodePushable hon = han.createPushRuntime(stagelet, joblet.getEnvironment(op, i), rdp, i,
opNumPartitions.get(op.getOperatorId()));
- OperatorRunnable or = new OperatorRunnable(stagelet, hon);
+ OperatorRunnable or = new OperatorRunnable(stagelet, hon, inputs == null ? 0 : inputs.size(),
+ executor);
stagelet.setOperator(op.getOperatorId(), i, or);
if (inputs != null) {
for (int j = 0; j < inputs.size(); ++j) {
- if (j >= 1) {
- throw new IllegalStateException();
- }
IConnectorDescriptor conn = inputs.get(j);
OperatorDescriptorId producerOpId = plan.getJobSpecification().getProducer(conn)
.getOperatorId();
@@ -276,7 +274,7 @@
portMap.put(piId, endpoint);
IFrameReader reader = createReader(stagelet, conn, drlf, i, plan, stagelet,
opNumPartitions.get(producerOpId), opNumPartitions.get(consumerOpId));
- or.setFrameReader(reader);
+ or.setFrameReader(j, reader);
}
}
honMap.put(new OperatorInstanceId(op.getOperatorId(), i), or);
@@ -438,19 +436,20 @@
si.setEndpointList(null);
}
- private synchronized Joblet getLocalJoblet(UUID jobId) throws Exception {
+ private Joblet getLocalJoblet(UUID jobId) throws Exception {
Joblet ji = jobletMap.get(jobId);
return ji;
}
- private synchronized Joblet getOrCreateLocalJoblet(UUID jobId, int attempt, INCApplicationContext appCtx)
- throws Exception {
- Joblet ji = jobletMap.get(jobId);
- if (ji == null || ji.getAttempt() != attempt) {
- ji = new Joblet(this, jobId, attempt, appCtx);
- jobletMap.put(jobId, ji);
+ private Joblet getOrCreateLocalJoblet(UUID jobId, int attempt, INCApplicationContext appCtx) throws Exception {
+ synchronized (jobletMap) {
+ Joblet ji = jobletMap.get(jobId);
+ if (ji == null || ji.getAttempt() != attempt) {
+ ji = new Joblet(this, jobId, attempt, appCtx);
+ jobletMap.put(jobId, ji);
+ }
+ return ji;
}
- return ji;
}
public Executor getExecutor() {
@@ -458,7 +457,7 @@
}
@Override
- public synchronized void cleanUpJob(UUID jobId) throws Exception {
+ public void cleanUpJob(UUID jobId) throws Exception {
if (LOGGER.isLoggable(Level.INFO)) {
LOGGER.info("Cleaning up after job: " + jobId);
}
@@ -563,7 +562,7 @@
}
@Override
- public synchronized void abortJoblet(UUID jobId, int attempt) throws Exception {
+ public void abortJoblet(UUID jobId, int attempt) throws Exception {
if (LOGGER.isLoggable(Level.INFO)) {
LOGGER.info("Aborting Job: " + jobId + ":" + attempt);
}
@@ -588,7 +587,7 @@
if (applications.containsKey(appName)) {
throw new HyracksException("Duplicate application with name: " + appName + " being created.");
}
- appCtx = new NCApplicationContext(serverCtx, ctx, appName);
+ appCtx = new NCApplicationContext(serverCtx, ctx, appName, id);
applications.put(appName, appCtx);
}
if (deployHar) {
diff --git a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Stagelet.java b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Stagelet.java
index eb283c2..68b2cad 100644
--- a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Stagelet.java
+++ b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Stagelet.java
@@ -146,9 +146,13 @@
});
}
- protected synchronized void notifyOperatorCompletion(OperatorInstanceId opIId) {
- pendingOperators.remove(opIId);
- if (pendingOperators.isEmpty()) {
+ protected void notifyOperatorCompletion(OperatorInstanceId opIId) {
+ boolean done = false;
+ synchronized (pendingOperators) {
+ pendingOperators.remove(opIId);
+ done = pendingOperators.isEmpty();
+ }
+ if (done) {
try {
StageletProfile sProfile = new StageletProfile(stageId);
dumpProfile(sProfile);
@@ -227,7 +231,7 @@
}
@Override
- public ICounter getCounter(String name, boolean create) {
+ public synchronized ICounter getCounter(String name, boolean create) {
Counter counter = counterMap.get(name);
if (counter == null && create) {
counter = new Counter(name);
diff --git a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/application/NCApplicationContext.java b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/application/NCApplicationContext.java
index 349743b..06f4212 100644
--- a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/application/NCApplicationContext.java
+++ b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/application/NCApplicationContext.java
@@ -10,16 +10,23 @@
import edu.uci.ics.hyracks.control.common.context.ServerContext;
public class NCApplicationContext extends ApplicationContext implements INCApplicationContext {
+ private final String nodeId;
private final IHyracksRootContext rootCtx;
private Object appObject;
- public NCApplicationContext(ServerContext serverCtx, IHyracksRootContext rootCtx, String appName)
+ public NCApplicationContext(ServerContext serverCtx, IHyracksRootContext rootCtx, String appName, String nodeId)
throws IOException {
super(serverCtx, appName);
+ this.nodeId = nodeId;
this.rootCtx = rootCtx;
}
@Override
+ public String getNodeId() {
+ return nodeId;
+ }
+
+ @Override
public void setDistributedState(Serializable state) {
distributedState = state;
}
diff --git a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/runtime/OperatorRunnable.java b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/runtime/OperatorRunnable.java
index eca7fd0..8edd992 100644
--- a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/runtime/OperatorRunnable.java
+++ b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/runtime/OperatorRunnable.java
@@ -15,6 +15,8 @@
package edu.uci.ics.hyracks.control.nc.runtime;
import java.nio.ByteBuffer;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Semaphore;
import edu.uci.ics.hyracks.api.comm.IFrameReader;
import edu.uci.ics.hyracks.api.comm.IFrameWriter;
@@ -24,22 +26,27 @@
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
public class OperatorRunnable implements Runnable {
+ private final IHyracksStageletContext ctx;
private final IOperatorNodePushable opNode;
- private IFrameReader reader;
- private ByteBuffer buffer;
+ private final int nInputs;
+ private final Executor executor;
+ private IFrameReader[] readers;
private volatile boolean abort;
- public OperatorRunnable(IHyracksStageletContext ctx, IOperatorNodePushable opNode) {
+ public OperatorRunnable(IHyracksStageletContext ctx, IOperatorNodePushable opNode, int nInputs, Executor executor) {
+ this.ctx = ctx;
this.opNode = opNode;
- buffer = ctx.allocateFrame();
+ this.nInputs = nInputs;
+ this.executor = executor;
+ readers = new IFrameReader[nInputs];
}
public void setFrameWriter(int index, IFrameWriter writer, RecordDescriptor recordDesc) {
opNode.setOutputFrameWriter(index, writer, recordDesc);
}
- public void setFrameReader(IFrameReader reader) {
- this.reader = reader;
+ public void setFrameReader(int inputIdx, IFrameReader reader) {
+ this.readers[inputIdx] = reader;
}
public void abort() {
@@ -50,20 +57,28 @@
public void run() {
try {
opNode.initialize();
- if (reader != null) {
- IFrameWriter writer = opNode.getInputFrameWriter(0);
- writer.open();
- reader.open();
- while (readFrame()) {
- if (abort) {
- break;
- }
- buffer.flip();
- writer.nextFrame(buffer);
- buffer.compact();
+ if (nInputs > 0) {
+ final Semaphore sem = new Semaphore(nInputs - 1);
+ for (int i = 1; i < nInputs; ++i) {
+ final IFrameReader reader = readers[i];
+ final IFrameWriter writer = opNode.getInputFrameWriter(i);
+ sem.acquire();
+ executor.execute(new Runnable() {
+ public void run() {
+ try {
+ pushFrames(reader, writer);
+ } catch (HyracksDataException e) {
+ } finally {
+ sem.release();
+ }
+ }
+ });
}
- reader.close();
- writer.close();
+ try {
+ pushFrames(readers[0], opNode.getInputFrameWriter(0));
+ } finally {
+ sem.acquire(nInputs - 1);
+ }
}
opNode.deinitialize();
} catch (Exception e) {
@@ -71,8 +86,20 @@
}
}
- protected boolean readFrame() throws HyracksDataException {
- return reader.nextFrame(buffer);
+ private void pushFrames(IFrameReader reader, IFrameWriter writer) throws HyracksDataException {
+ ByteBuffer buffer = ctx.allocateFrame();
+ writer.open();
+ reader.open();
+ while (reader.nextFrame(buffer)) {
+ if (abort) {
+ break;
+ }
+ buffer.flip();
+ writer.nextFrame(buffer);
+ buffer.compact();
+ }
+ reader.close();
+ writer.close();
}
@Override