Renamed hyracks-core to hyracks-runtime

git-svn-id: https://hyracks.googlecode.com/svn/trunk/hyracks@41 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-runtime/.classpath b/hyracks-runtime/.classpath
new file mode 100644
index 0000000..2fb3f21
--- /dev/null
+++ b/hyracks-runtime/.classpath
@@ -0,0 +1,9 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<classpath>
+	<classpathentry kind="src" path="src/main/java"/>
+	<classpathentry kind="src" path="src/test/java"/>
+	<classpathentry kind="src" path="src/main/resources"/>
+	<classpathentry kind="con" path="org.eclipse.jdt.launching.JRE_CONTAINER"/>
+	<classpathentry kind="con" path="org.maven.ide.eclipse.MAVEN2_CLASSPATH_CONTAINER"/>
+	<classpathentry kind="output" path="target/classes"/>
+</classpath>
diff --git a/hyracks-runtime/.project b/hyracks-runtime/.project
new file mode 100644
index 0000000..48ac49b
--- /dev/null
+++ b/hyracks-runtime/.project
@@ -0,0 +1,23 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<projectDescription>
+	<name>hyracks-runtime</name>
+	<comment></comment>
+	<projects>
+	</projects>
+	<buildSpec>
+		<buildCommand>
+			<name>org.eclipse.jdt.core.javabuilder</name>
+			<arguments>
+			</arguments>
+		</buildCommand>
+		<buildCommand>
+			<name>org.maven.ide.eclipse.maven2Builder</name>
+			<arguments>
+			</arguments>
+		</buildCommand>
+	</buildSpec>
+	<natures>
+		<nature>org.maven.ide.eclipse.maven2Nature</nature>
+		<nature>org.eclipse.jdt.core.javanature</nature>
+	</natures>
+</projectDescription>
diff --git a/hyracks-runtime/pom.xml b/hyracks-runtime/pom.xml
new file mode 100644
index 0000000..c779372
--- /dev/null
+++ b/hyracks-runtime/pom.xml
@@ -0,0 +1,86 @@
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+  <groupId>edu.uci.ics.hyracks</groupId>
+  <artifactId>hyracks-runtime</artifactId>
+  <version>0.1.0</version>
+  <build>
+    <plugins>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-compiler-plugin</artifactId>
+        <version>2.0.2</version>
+        <configuration>
+          <source>1.6</source>
+          <target>1.6</target>
+        </configuration>
+      </plugin>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-surefire-plugin</artifactId>
+        <configuration>
+          <forkMode>pertest</forkMode>
+          <argLine>-enableassertions</argLine>
+        </configuration>
+      </plugin>
+    </plugins>
+  </build>
+  <dependencies>
+  	<dependency>
+  		<groupId>org.eclipse.jetty</groupId>
+  		<artifactId>jetty-server</artifactId>
+  		<version>8.0.0.M0</version>
+  		<type>jar</type>
+  		<scope>compile</scope>
+  	</dependency>
+  	<dependency>
+  		<groupId>junit</groupId>
+  		<artifactId>junit</artifactId>
+  		<version>4.8.1</version>
+  		<type>jar</type>
+  		<scope>test</scope>
+  	</dependency>
+  	<dependency>
+  		<groupId>args4j</groupId>
+  		<artifactId>args4j</artifactId>
+  		<version>2.0.12</version>
+  		<type>jar</type>
+  		<scope>compile</scope>
+  	</dependency>
+  	<dependency>
+  		<groupId>edu.uci.ics.dcache</groupId>
+  		<artifactId>dcache-client</artifactId>
+  		<version>0.0.1</version>
+  		<scope>compile</scope>
+  	</dependency>
+  	<dependency>
+  		<groupId>org.apache.wicket</groupId>
+  		<artifactId>wicket</artifactId>
+  		<version>1.4.7</version>
+  		<type>jar</type>
+  		<scope>compile</scope>
+  	</dependency>
+  	<dependency>
+  		<groupId>jol</groupId>
+  		<artifactId>jol</artifactId>
+  		<version>1.0.0</version>
+  		<type>jar</type>
+  		<scope>compile</scope>
+  	</dependency>
+  	<dependency>
+  		<groupId>edu.uci.ics.hyracks</groupId>
+  		<artifactId>hyracks-api</artifactId>
+  		<version>0.1.0</version>
+  		<type>jar</type>
+  		<scope>compile</scope>
+  	</dependency>
+  </dependencies>
+  <reporting>
+    <plugins>
+      <plugin>
+        <groupId>org.codehaus.mojo</groupId>
+        <artifactId>findbugs-maven-plugin</artifactId>
+        <version>2.0.1</version>
+      </plugin>
+    </plugins>
+  </reporting>
+</project>
diff --git a/hyracks-runtime/src/main/java/edu/uci/ics/hyracks/comm/ConnectionEntry.java b/hyracks-runtime/src/main/java/edu/uci/ics/hyracks/comm/ConnectionEntry.java
new file mode 100644
index 0000000..bfc79e7
--- /dev/null
+++ b/hyracks-runtime/src/main/java/edu/uci/ics/hyracks/comm/ConnectionEntry.java
@@ -0,0 +1,190 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.comm;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.SocketChannel;
+import java.util.UUID;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import edu.uci.ics.hyracks.api.comm.IConnectionEntry;
+import edu.uci.ics.hyracks.api.comm.IDataReceiveListener;
+import edu.uci.ics.hyracks.api.context.IHyracksContext;
+
+public class ConnectionEntry implements IConnectionEntry {
+    private static final Logger LOGGER = Logger.getLogger(ConnectionEntry.class.getName());
+
+    private SocketChannel socketChannel;
+
+    private final ByteBuffer readBuffer;
+
+    private final ByteBuffer writeBuffer;
+
+    private IDataReceiveListener recvListener;
+
+    private Object attachment;
+
+    private final SelectionKey key;
+
+    private UUID jobId;
+
+    private UUID stageId;
+
+    private boolean aborted;
+
+    public ConnectionEntry(IHyracksContext ctx, SocketChannel socketChannel, SelectionKey key) {
+        this.socketChannel = socketChannel;
+        readBuffer = ctx.getResourceManager().allocateFrame();
+        readBuffer.clear();
+        writeBuffer = ctx.getResourceManager().allocateFrame();
+        writeBuffer.clear();
+        this.key = key;
+    }
+
+    public SocketChannel getSocketChannel() {
+        return socketChannel;
+    }
+
+    public boolean dispatch(SelectionKey key) throws IOException {
+        if (aborted) {
+            recvListener.dataReceived(this);
+        } else {
+            if (key.isReadable()) {
+                if (LOGGER.isLoggable(Level.FINER)) {
+                    LOGGER.finer("Before read: " + readBuffer.position() + " " + readBuffer.limit());
+                }
+                int bytesRead = socketChannel.read(readBuffer);
+                if (bytesRead < 0) {
+                    recvListener.eos(this);
+                    return true;
+                }
+                if (LOGGER.isLoggable(Level.FINER)) {
+                    LOGGER.finer("After read: " + readBuffer.position() + " " + readBuffer.limit());
+                }
+                recvListener.dataReceived(this);
+            } else if (key.isWritable()) {
+                synchronized (this) {
+                    writeBuffer.flip();
+                    if (LOGGER.isLoggable(Level.FINER)) {
+                        LOGGER.finer("Before write: " + writeBuffer.position() + " " + writeBuffer.limit());
+                    }
+                    int bytesWritten = socketChannel.write(writeBuffer);
+                    if (bytesWritten < 0) {
+                        return true;
+                    }
+                    if (LOGGER.isLoggable(Level.FINER)) {
+                        LOGGER.finer("After write: " + writeBuffer.position() + " " + writeBuffer.limit());
+                    }
+                    if (writeBuffer.remaining() <= 0) {
+                        int ops = key.interestOps();
+                        key.interestOps(ops & ~SelectionKey.OP_WRITE);
+                    }
+                    writeBuffer.compact();
+                    notifyAll();
+                }
+            } else {
+                LOGGER.warning("Spurious event triggered: " + key.readyOps());
+                return true;
+            }
+        }
+        return false;
+    }
+
+    @Override
+    public ByteBuffer getReadBuffer() {
+        return readBuffer;
+    }
+
+    @Override
+    public synchronized void write(ByteBuffer buffer) {
+        while (buffer.remaining() > 0) {
+            while (writeBuffer.remaining() <= 0) {
+                try {
+                    wait();
+                } catch (InterruptedException e) {
+                }
+            }
+            int oldLimit = buffer.limit();
+            buffer.limit(Math.min(oldLimit, writeBuffer.remaining()));
+            writeBuffer.put(buffer);
+            buffer.limit(oldLimit);
+            int ops = key.interestOps();
+            key.interestOps(ops | SelectionKey.OP_WRITE);
+            key.selector().wakeup();
+        }
+    }
+
+    @Override
+    public void setDataReceiveListener(IDataReceiveListener listener) {
+        this.recvListener = listener;
+    }
+
+    @Override
+    public void attach(Object attachment) {
+        this.attachment = attachment;
+    }
+
+    @Override
+    public Object getAttachment() {
+        return attachment;
+    }
+
+    @Override
+    public void close() {
+        try {
+            socketChannel.close();
+        } catch (IOException e) {
+            e.printStackTrace();
+        }
+    }
+
+    @Override
+    public SelectionKey getSelectionKey() {
+        return key;
+    }
+
+    @Override
+    public UUID getJobId() {
+        return jobId;
+    }
+
+    @Override
+    public void setJobId(UUID jobId) {
+        this.jobId = jobId;
+    }
+
+    @Override
+    public UUID getStageId() {
+        return stageId;
+    }
+
+    @Override
+    public void setStageId(UUID stageId) {
+        this.stageId = stageId;
+    }
+
+    @Override
+    public void abort() {
+        aborted = true;
+    }
+
+    @Override
+    public boolean aborted() {
+        return aborted;
+    }
+}
\ No newline at end of file
diff --git a/hyracks-runtime/src/main/java/edu/uci/ics/hyracks/comm/ConnectionManager.java b/hyracks-runtime/src/main/java/edu/uci/ics/hyracks/comm/ConnectionManager.java
new file mode 100644
index 0000000..45c7221
--- /dev/null
+++ b/hyracks-runtime/src/main/java/edu/uci/ics/hyracks/comm/ConnectionManager.java
@@ -0,0 +1,394 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.comm;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.ServerSocket;
+import java.nio.ByteBuffer;
+import java.nio.channels.AsynchronousCloseException;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.Selector;
+import java.nio.channels.ServerSocketChannel;
+import java.nio.channels.SocketChannel;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import edu.uci.ics.hyracks.api.comm.FrameConstants;
+import edu.uci.ics.hyracks.api.comm.FrameHelper;
+import edu.uci.ics.hyracks.api.comm.IConnectionEntry;
+import edu.uci.ics.hyracks.api.comm.IDataReceiveListener;
+import edu.uci.ics.hyracks.api.comm.IDataReceiveListenerFactory;
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksContext;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+
+public class ConnectionManager {
+    private static final Logger LOGGER = Logger.getLogger(ConnectionManager.class.getName());
+
+    private static final int INITIAL_MESSAGE_LEN = 20;
+
+    private NetworkAddress networkAddress;
+
+    private ServerSocketChannel serverSocketChannel;
+
+    private final IHyracksContext ctx;
+
+    private final Map<UUID, IDataReceiveListenerFactory> pendingConnectionReceivers;
+
+    private final ConnectionListenerThread connectionListenerThread;
+
+    private final DataListenerThread dataListenerThread;
+
+    private final IDataReceiveListener initialDataReceiveListener;
+
+    private final Set<IConnectionEntry> connections;
+
+    private volatile boolean stopped;
+
+    private ByteBuffer emptyFrame;
+
+    public ConnectionManager(IHyracksContext ctx, InetAddress address) throws IOException {
+        this.ctx = ctx;
+        serverSocketChannel = ServerSocketChannel.open();
+        ServerSocket serverSocket = serverSocketChannel.socket();
+        serverSocket.bind(new InetSocketAddress(address, 0));
+
+        networkAddress = new NetworkAddress(serverSocket.getInetAddress(), serverSocket.getLocalPort());
+
+        if (LOGGER.isLoggable(Level.INFO)) {
+            LOGGER.info("Connection manager listening on " + serverSocket.getInetAddress() + ":"
+                    + serverSocket.getLocalPort());
+        }
+
+        pendingConnectionReceivers = new HashMap<UUID, IDataReceiveListenerFactory>();
+        dataListenerThread = new DataListenerThread();
+        connectionListenerThread = new ConnectionListenerThread();
+        initialDataReceiveListener = new InitialDataReceiveListener();
+        emptyFrame = ctx.getResourceManager().allocateFrame();
+        emptyFrame.putInt(FrameHelper.getTupleCountOffset(ctx), 0);
+        connections = new HashSet<IConnectionEntry>();
+    }
+
+    public synchronized void dumpStats() {
+        if (LOGGER.isLoggable(Level.INFO)) {
+            LOGGER.info("Number of pendingConnectionReceivers: " + pendingConnectionReceivers.size());
+            LOGGER.info("Number of selectable keys: " + dataListenerThread.selector.keys().size());
+        }
+    }
+
+    public NetworkAddress getNetworkAddress() {
+        return networkAddress;
+    }
+
+    public void start() {
+        stopped = false;
+        connectionListenerThread.start();
+        dataListenerThread.start();
+    }
+
+    public void stop() {
+        try {
+            stopped = true;
+            serverSocketChannel.close();
+        } catch (IOException e) {
+            e.printStackTrace();
+        }
+    }
+
+    public IFrameWriter connect(NetworkAddress address, UUID id, int senderId) throws HyracksDataException {
+        try {
+            SocketChannel channel = SocketChannel
+                    .open(new InetSocketAddress(address.getIpAddress(), address.getPort()));
+            byte[] initialFrame = new byte[INITIAL_MESSAGE_LEN];
+            ByteBuffer buffer = ByteBuffer.wrap(initialFrame);
+            buffer.clear();
+            buffer.putLong(id.getMostSignificantBits());
+            buffer.putLong(id.getLeastSignificantBits());
+            buffer.putInt(senderId);
+            buffer.flip();
+            int bytesWritten = 0;
+            while (bytesWritten < INITIAL_MESSAGE_LEN) {
+                int n = channel.write(buffer);
+                if (n < 0) {
+                    throw new HyracksDataException("Stream closed prematurely");
+                }
+                bytesWritten += n;
+            }
+            if (LOGGER.isLoggable(Level.FINE)) {
+                LOGGER.fine("Send Initial message: " + id + ":" + senderId);
+            }
+            buffer.clear();
+            buffer.limit(FrameConstants.SIZE_LEN);
+            int bytesRead = 0;
+            while (bytesRead < FrameConstants.SIZE_LEN) {
+                int n = channel.read(buffer);
+                if (n < 0) {
+                    throw new HyracksDataException("Stream closed prematurely");
+                }
+                bytesRead += n;
+            }
+            buffer.flip();
+            int frameLen = buffer.getInt();
+            if (frameLen != FrameConstants.SIZE_LEN) {
+                throw new IllegalStateException("Received illegal framelen = " + frameLen);
+            }
+            if (LOGGER.isLoggable(Level.FINE)) {
+                LOGGER.fine("Got Ack message: " + id + ":" + senderId);
+            }
+            return new NetworkFrameWriter(channel);
+        } catch (IOException e) {
+            throw new HyracksDataException(e);
+        }
+    }
+
+    public synchronized void acceptConnection(UUID id, IDataReceiveListenerFactory receiver) {
+        if (LOGGER.isLoggable(Level.FINE)) {
+            LOGGER.info("Connection manager accepting " + id);
+        }
+        pendingConnectionReceivers.put(id, receiver);
+    }
+
+    public synchronized void unacceptConnection(UUID id) {
+        if (LOGGER.isLoggable(Level.FINE)) {
+            LOGGER.info("Connection manager unaccepting " + id);
+        }
+        pendingConnectionReceivers.remove(id);
+    }
+
+    public synchronized void abortConnections(UUID jobId, UUID stageId) {
+        List<IConnectionEntry> abortConnections = new ArrayList<IConnectionEntry>();
+        synchronized (this) {
+            for (IConnectionEntry ce : connections) {
+                if (ce.getJobId().equals(jobId) && ce.getStageId().equals(stageId)) {
+                    abortConnections.add(ce);
+                }
+            }
+        }
+        dataListenerThread.addPendingAbortConnections(abortConnections);
+    }
+
+    private final class NetworkFrameWriter implements IFrameWriter {
+        private SocketChannel channel;
+
+        NetworkFrameWriter(SocketChannel channel) {
+            this.channel = channel;
+        }
+
+        @Override
+        public void close() throws HyracksDataException {
+            try {
+                synchronized (emptyFrame) {
+                    emptyFrame.position(0);
+                    emptyFrame.limit(emptyFrame.capacity());
+                    channel.write(emptyFrame);
+                }
+                channel.close();
+            } catch (IOException e) {
+                throw new HyracksDataException(e);
+            }
+        }
+
+        @Override
+        public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+            try {
+                if (LOGGER.isLoggable(Level.FINER)) {
+                    int frameLen = buffer.getInt(buffer.position());
+                    LOGGER.finer("ConnectionManager.NetworkFrameWriter: frameLen = " + frameLen);
+                }
+                while (buffer.remaining() > 0) {
+                    channel.write(buffer);
+                }
+            } catch (IOException e) {
+                throw new HyracksDataException(e);
+            }
+        }
+
+        @Override
+        public void open() throws HyracksDataException {
+        }
+    }
+
+    private final class ConnectionListenerThread extends Thread {
+        public ConnectionListenerThread() {
+            super("Hyracks Connection Listener Thread");
+        }
+
+        @Override
+        public void run() {
+            while (!stopped) {
+                try {
+                    SocketChannel sc = serverSocketChannel.accept();
+                    dataListenerThread.addSocketChannel(sc);
+                } catch (AsynchronousCloseException e) {
+                    // do nothing
+                } catch (IOException e) {
+                    e.printStackTrace();
+                }
+            }
+        }
+    }
+
+    private final class DataListenerThread extends Thread {
+        private Selector selector;
+
+        private List<SocketChannel> pendingNewSockets;
+        private List<IConnectionEntry> pendingAbortConnections;
+
+        public DataListenerThread() {
+            super("Hyracks Data Listener Thread");
+            try {
+                selector = Selector.open();
+            } catch (IOException e) {
+                throw new RuntimeException(e);
+            }
+            pendingNewSockets = new ArrayList<SocketChannel>();
+            pendingAbortConnections = new ArrayList<IConnectionEntry>();
+        }
+
+        synchronized void addSocketChannel(SocketChannel sc) throws IOException {
+            pendingNewSockets.add(sc);
+            selector.wakeup();
+        }
+
+        synchronized void addPendingAbortConnections(List<IConnectionEntry> abortConnections) {
+            pendingAbortConnections.addAll(abortConnections);
+            selector.wakeup();
+        }
+
+        @Override
+        public void run() {
+            while (!stopped) {
+                try {
+                    if (LOGGER.isLoggable(Level.FINE)) {
+                        LOGGER.fine("Starting Select");
+                    }
+                    int n = selector.select();
+                    synchronized (this) {
+                        if (!pendingNewSockets.isEmpty()) {
+                            for (SocketChannel sc : pendingNewSockets) {
+                                sc.configureBlocking(false);
+                                SelectionKey scKey = sc.register(selector, SelectionKey.OP_READ);
+                                ConnectionEntry entry = new ConnectionEntry(ctx, sc, scKey);
+                                entry.setDataReceiveListener(initialDataReceiveListener);
+                                scKey.attach(entry);
+                                if (LOGGER.isLoggable(Level.FINE)) {
+                                    LOGGER.fine("Woke up selector");
+                                }
+                            }
+                            pendingNewSockets.clear();
+                        }
+                        if (!pendingAbortConnections.isEmpty()) {
+                            for (IConnectionEntry ce : pendingAbortConnections) {
+                                SelectionKey key = ce.getSelectionKey();
+                                ce.abort();
+                                ((ConnectionEntry) ce).dispatch(key);
+                                key.cancel();
+                                ce.close();
+                                synchronized (ConnectionManager.this) {
+                                    connections.remove(ce);
+                                }
+                            }
+                            pendingAbortConnections.clear();
+                        }
+                        if (LOGGER.isLoggable(Level.FINE)) {
+                            LOGGER.fine("Selector: " + n);
+                        }
+                        if (n > 0) {
+                            for (Iterator<SelectionKey> i = selector.selectedKeys().iterator(); i.hasNext();) {
+                                SelectionKey key = i.next();
+                                i.remove();
+                                ConnectionEntry entry = (ConnectionEntry) key.attachment();
+                                boolean close = false;
+                                try {
+                                    close = entry.dispatch(key);
+                                } catch (IOException e) {
+                                    e.printStackTrace();
+                                    close = true;
+                                }
+                                if (close) {
+                                    key.cancel();
+                                    entry.close();
+                                    synchronized (ConnectionManager.this) {
+                                        connections.remove(entry);
+                                    }
+                                }
+                            }
+                        }
+                    }
+                } catch (Exception e) {
+                    e.printStackTrace();
+                }
+            }
+        }
+    }
+
+    private class InitialDataReceiveListener implements IDataReceiveListener {
+        @Override
+        public void dataReceived(IConnectionEntry entry) throws IOException {
+            ByteBuffer buffer = entry.getReadBuffer();
+            buffer.flip();
+            IDataReceiveListener newListener = null;
+            if (buffer.remaining() >= INITIAL_MESSAGE_LEN) {
+                long msb = buffer.getLong();
+                long lsb = buffer.getLong();
+                UUID endpointID = new UUID(msb, lsb);
+                int senderId = buffer.getInt();
+                if (LOGGER.isLoggable(Level.FINE)) {
+                    LOGGER.fine("Initial Frame received: " + endpointID + ":" + senderId);
+                }
+                IDataReceiveListenerFactory connectionReceiver;
+                synchronized (ConnectionManager.this) {
+                    connectionReceiver = pendingConnectionReceivers.get(endpointID);
+                    if (connectionReceiver == null) {
+                        entry.close();
+                        return;
+                    }
+                }
+
+                newListener = connectionReceiver.getDataReceiveListener(endpointID, entry, senderId);
+                entry.setDataReceiveListener(newListener);
+                entry.setJobId(connectionReceiver.getJobId());
+                entry.setStageId(connectionReceiver.getStageId());
+                synchronized (ConnectionManager.this) {
+                    connections.add(entry);
+                }
+                byte[] ack = new byte[4];
+                ByteBuffer ackBuffer = ByteBuffer.wrap(ack);
+                ackBuffer.clear();
+                ackBuffer.putInt(FrameConstants.SIZE_LEN);
+                ackBuffer.flip();
+                entry.write(ackBuffer);
+            }
+            buffer.compact();
+            if (newListener != null && buffer.remaining() > 0) {
+                newListener.dataReceived(entry);
+            }
+        }
+
+        @Override
+        public void eos(IConnectionEntry entry) {
+        }
+    }
+}
\ No newline at end of file
diff --git a/hyracks-runtime/src/main/java/edu/uci/ics/hyracks/comm/DemuxDataReceiveListenerFactory.java b/hyracks-runtime/src/main/java/edu/uci/ics/hyracks/comm/DemuxDataReceiveListenerFactory.java
new file mode 100644
index 0000000..403d4e8
--- /dev/null
+++ b/hyracks-runtime/src/main/java/edu/uci/ics/hyracks/comm/DemuxDataReceiveListenerFactory.java
@@ -0,0 +1,151 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.comm;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.SelectionKey;
+import java.util.Arrays;
+import java.util.BitSet;
+import java.util.UUID;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import edu.uci.ics.hyracks.api.comm.IConnectionDemultiplexer;
+import edu.uci.ics.hyracks.api.comm.IConnectionEntry;
+import edu.uci.ics.hyracks.api.comm.IDataReceiveListener;
+import edu.uci.ics.hyracks.api.comm.IDataReceiveListenerFactory;
+import edu.uci.ics.hyracks.api.context.IHyracksContext;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+
+public class DemuxDataReceiveListenerFactory implements IDataReceiveListenerFactory, IConnectionDemultiplexer,
+        IDataReceiveListener {
+    private static final Logger LOGGER = Logger.getLogger(DemuxDataReceiveListenerFactory.class.getName());
+
+    private final IHyracksContext ctx;
+    private final BitSet readyBits;
+    private IConnectionEntry senders[];
+    private int openSenderCount;
+    private UUID jobId;
+    private UUID stageId;
+
+    public DemuxDataReceiveListenerFactory(IHyracksContext ctx, UUID jobId, UUID stageId) {
+        this.ctx = ctx;
+        this.jobId = jobId;
+        this.stageId = stageId;
+        readyBits = new BitSet();
+        senders = null;
+        openSenderCount = 0;
+    }
+
+    @Override
+    public IDataReceiveListener getDataReceiveListener(UUID endpointUUID, IConnectionEntry entry, int senderIndex) {
+        entry.attach(senderIndex);
+        addSender(senderIndex, entry);
+        return this;
+    }
+
+    @Override
+    public synchronized void dataReceived(IConnectionEntry entry) throws IOException {
+        int senderIndex = (Integer) entry.getAttachment();
+        ByteBuffer buffer = entry.getReadBuffer();
+        buffer.flip();
+        int dataLen = buffer.remaining();
+        if (dataLen >= ctx.getFrameSize() || entry.aborted()) {
+            if (LOGGER.isLoggable(Level.FINEST)) {
+                LOGGER.finest("NonDeterministicDataReceiveListener: frame received: sender = " + senderIndex);
+            }
+            SelectionKey key = entry.getSelectionKey();
+            if (key.isValid()) {
+                int ops = key.interestOps();
+                key.interestOps(ops & ~SelectionKey.OP_READ);
+            }
+            readyBits.set(senderIndex);
+            notifyAll();
+            return;
+        }
+        buffer.compact();
+    }
+
+    @Override
+    public void eos(IConnectionEntry entry) {
+    }
+
+    private synchronized void addSender(int senderIndex, IConnectionEntry entry) {
+        readyBits.clear(senderIndex);
+        if (senders == null) {
+            senders = new IConnectionEntry[senderIndex + 1];
+        } else if (senders.length <= senderIndex) {
+            senders = Arrays.copyOf(senders, senderIndex + 1);
+        }
+        senders[senderIndex] = entry;
+        ++openSenderCount;
+    }
+
+    @Override
+    public synchronized IConnectionEntry findNextReadyEntry(int lastReadSender) {
+        while (openSenderCount > 0 && readyBits.isEmpty()) {
+            try {
+                wait();
+            } catch (InterruptedException e) {
+            }
+        }
+        lastReadSender = readyBits.nextSetBit(lastReadSender);
+        if (lastReadSender < 0) {
+            lastReadSender = readyBits.nextSetBit(0);
+        }
+        return senders[lastReadSender];
+    }
+
+    @Override
+    public synchronized void unreadyEntry(int index) {
+        readyBits.clear(index);
+        IConnectionEntry entry = senders[index];
+        SelectionKey key = entry.getSelectionKey();
+        if (key.isValid()) {
+            int ops = key.interestOps();
+            key.interestOps(ops | SelectionKey.OP_READ);
+            key.selector().wakeup();
+        }
+    }
+
+    @Override
+    public synchronized int closeEntry(int index) throws HyracksDataException {
+        IConnectionEntry entry = senders[index];
+        SelectionKey key = entry.getSelectionKey();
+        key.cancel();
+        try {
+            entry.close();
+        } catch (IOException e) {
+            throw new HyracksDataException(e);
+        }
+        return --openSenderCount;
+    }
+
+    @Override
+    public synchronized int getSenderCount() {
+        return senders.length;
+    }
+
+    @Override
+    public UUID getJobId() {
+        return jobId;
+    }
+
+    @Override
+    public UUID getStageId() {
+        return stageId;
+    }
+}
\ No newline at end of file
diff --git a/hyracks-runtime/src/main/java/edu/uci/ics/hyracks/comm/Endpoint.java b/hyracks-runtime/src/main/java/edu/uci/ics/hyracks/comm/Endpoint.java
new file mode 100644
index 0000000..245c777
--- /dev/null
+++ b/hyracks-runtime/src/main/java/edu/uci/ics/hyracks/comm/Endpoint.java
@@ -0,0 +1,65 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.comm;
+
+import java.io.Serializable;
+import java.util.UUID;
+
+public final class Endpoint implements Serializable {
+    private static final long serialVersionUID = 1L;
+
+    private final UUID id;
+
+    private final NetworkAddress address;
+
+    private final int receiverIndex;
+
+    public Endpoint(NetworkAddress address, int receiverIndex) throws Exception {
+        id = UUID.randomUUID();
+        this.address = address;
+        this.receiverIndex = receiverIndex;
+    }
+
+    public UUID getEndpointId() {
+        return id;
+    }
+
+    public NetworkAddress getNetworkAddress() {
+        return address;
+    }
+
+    public int getReceiverIndex() {
+        return receiverIndex;
+    }
+
+    @Override
+    public int hashCode() {
+        return id.hashCode() + address.hashCode();
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (!(o instanceof Endpoint)) {
+            return false;
+        }
+        Endpoint oe = (Endpoint) o;
+        return oe.id.equals(id) && oe.address.equals(address);
+    }
+
+    @Override
+    public String toString() {
+        return "[" + address + ":" + id + "]";
+    }
+}
\ No newline at end of file
diff --git a/hyracks-runtime/src/main/java/edu/uci/ics/hyracks/comm/IReceiverProtocolStack.java b/hyracks-runtime/src/main/java/edu/uci/ics/hyracks/comm/IReceiverProtocolStack.java
new file mode 100644
index 0000000..b7effd5
--- /dev/null
+++ b/hyracks-runtime/src/main/java/edu/uci/ics/hyracks/comm/IReceiverProtocolStack.java
@@ -0,0 +1,24 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.comm;
+
+import edu.uci.ics.hyracks.api.comm.IDataReceiveListenerFactory;
+import edu.uci.ics.hyracks.api.comm.IFrameReader;
+
+public interface IReceiverProtocolStack {
+    public IDataReceiveListenerFactory createDataReceiveListenerFactory();
+
+    public IFrameReader createFrameReader();
+}
\ No newline at end of file
diff --git a/hyracks-runtime/src/main/java/edu/uci/ics/hyracks/comm/ISenderProtocolStack.java b/hyracks-runtime/src/main/java/edu/uci/ics/hyracks/comm/ISenderProtocolStack.java
new file mode 100644
index 0000000..e038379
--- /dev/null
+++ b/hyracks-runtime/src/main/java/edu/uci/ics/hyracks/comm/ISenderProtocolStack.java
@@ -0,0 +1,22 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.comm;
+
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+
+public interface ISenderProtocolStack {
+    public IFrameWriter createFrameWriter(Endpoint endpoint) throws HyracksDataException;
+}
\ No newline at end of file
diff --git a/hyracks-runtime/src/main/java/edu/uci/ics/hyracks/comm/NetworkAddress.java b/hyracks-runtime/src/main/java/edu/uci/ics/hyracks/comm/NetworkAddress.java
new file mode 100644
index 0000000..15db351
--- /dev/null
+++ b/hyracks-runtime/src/main/java/edu/uci/ics/hyracks/comm/NetworkAddress.java
@@ -0,0 +1,58 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.comm;
+
+import java.io.Serializable;
+import java.net.InetAddress;
+
+public final class NetworkAddress implements Serializable {
+    private static final long serialVersionUID = 1L;
+
+    private final InetAddress ipAddress;
+
+    private final int port;
+
+    public NetworkAddress(InetAddress ipAddress, int port) {
+        this.ipAddress = ipAddress;
+        this.port = port;
+    }
+
+    public InetAddress getIpAddress() {
+        return ipAddress;
+    }
+
+    public int getPort() {
+        return port;
+    }
+
+    @Override
+    public String toString() {
+        return ipAddress + ":" + port;
+    }
+
+    @Override
+    public int hashCode() {
+        return ipAddress.hashCode() + port;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (!(o instanceof NetworkAddress)) {
+            return false;
+        }
+        NetworkAddress on = (NetworkAddress) o;
+        return on.port == port && on.ipAddress.equals(ipAddress);
+    }
+}
\ No newline at end of file
diff --git a/hyracks-runtime/src/main/java/edu/uci/ics/hyracks/config/CCConfig.java b/hyracks-runtime/src/main/java/edu/uci/ics/hyracks/config/CCConfig.java
new file mode 100644
index 0000000..3d3652a
--- /dev/null
+++ b/hyracks-runtime/src/main/java/edu/uci/ics/hyracks/config/CCConfig.java
@@ -0,0 +1,34 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.config;
+
+import org.kohsuke.args4j.Option;
+
+public class CCConfig {
+    @Option(name = "-port", usage = "Sets the port to listen for connections from node controllers (default 1099)")
+    public int port = 1099;
+
+    @Option(name = "-http-port", usage = "Sets the http port for the admin console")
+    public int httpPort;
+
+    @Option(name = "-heartbeat-period", usage = "Sets the time duration between two heartbeats from each node controller in milliseconds (default: 10000)")
+    public int heartbeatPeriod = 10000;
+
+    @Option(name = "-max-heartbeat-lapse-periods", usage = "Sets the maximum number of missed heartbeats before a node is marked as dead (default: 5)")
+    public int maxHeartbeatLapsePeriods = 5;
+
+    @Option(name = "-use-jol", usage = "Forces Hyracks to use the JOL based scheduler (default: false)")
+    public boolean useJOL = false;
+}
\ No newline at end of file
diff --git a/hyracks-runtime/src/main/java/edu/uci/ics/hyracks/config/NCConfig.java b/hyracks-runtime/src/main/java/edu/uci/ics/hyracks/config/NCConfig.java
new file mode 100644
index 0000000..c95f1ad
--- /dev/null
+++ b/hyracks-runtime/src/main/java/edu/uci/ics/hyracks/config/NCConfig.java
@@ -0,0 +1,45 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.config;
+
+import java.io.Serializable;
+
+import org.kohsuke.args4j.Option;
+
+public class NCConfig implements Serializable{
+    @Option(name = "-cc-host", usage = "Cluster Controller host name")
+    public String ccHost;
+
+    @Option(name = "-cc-port", usage = "Cluster Controller port (default: 1099)")
+    public int ccPort = 1099;
+
+    @Option(name = "-node-id", usage = "Logical name of node controller unique within the cluster")
+    public String nodeId;
+
+    @Option(name = "-data-ip-address", usage = "IP Address to bind data listener")
+    public String dataIPAddress;
+
+    @Option(name = "-frame-size", usage = "Frame Size to use for data communication (default: 32768)")
+    public int frameSize = 32768;
+
+    @Option(name = "-dcache-client-servers", usage = "Sets the list of DCache servers in the format host1:port1,host2:port2,... (default localhost:54583)")
+    public String dcacheClientServers = "localhost:54583";
+
+    @Option(name = "-dcache-client-server-local", usage = "Sets the local DCache server, if one is available, in the format host:port (default not set)")
+    public String dcacheClientServerLocal;
+
+    @Option(name = "-dcache-client-path", usage = "Sets the path to store the files retrieved from the DCache server (default /tmp/dcache-client)")
+    public String dcacheClientPath = "/tmp/dcache-client";
+}
\ No newline at end of file
diff --git a/hyracks-runtime/src/main/java/edu/uci/ics/hyracks/context/HyracksContext.java b/hyracks-runtime/src/main/java/edu/uci/ics/hyracks/context/HyracksContext.java
new file mode 100644
index 0000000..b061919
--- /dev/null
+++ b/hyracks-runtime/src/main/java/edu/uci/ics/hyracks/context/HyracksContext.java
@@ -0,0 +1,39 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.context;
+
+import edu.uci.ics.hyracks.api.context.IHyracksContext;
+import edu.uci.ics.hyracks.api.resources.IResourceManager;
+import edu.uci.ics.hyracks.resources.ResourceManager;
+
+public class HyracksContext implements IHyracksContext {
+    private final IResourceManager resourceManager;
+    private final int frameSize;
+
+    public HyracksContext(int frameSize) {
+        resourceManager = new ResourceManager(this);
+        this.frameSize = frameSize;
+    }
+
+    @Override
+    public IResourceManager getResourceManager() {
+        return resourceManager;
+    }
+
+    @Override
+    public int getFrameSize() {
+        return frameSize;
+    }
+}
\ No newline at end of file
diff --git a/hyracks-runtime/src/main/java/edu/uci/ics/hyracks/controller/AbstractRemoteService.java b/hyracks-runtime/src/main/java/edu/uci/ics/hyracks/controller/AbstractRemoteService.java
new file mode 100644
index 0000000..9c8f3dd
--- /dev/null
+++ b/hyracks-runtime/src/main/java/edu/uci/ics/hyracks/controller/AbstractRemoteService.java
@@ -0,0 +1,27 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.controller;
+
+import java.rmi.RemoteException;
+import java.rmi.server.UnicastRemoteObject;
+
+import edu.uci.ics.hyracks.service.IService;
+
+public abstract class AbstractRemoteService extends UnicastRemoteObject implements IService {
+    private static final long serialVersionUID = 1L;
+
+    public AbstractRemoteService() throws RemoteException {
+    }
+}
diff --git a/hyracks-runtime/src/main/java/edu/uci/ics/hyracks/controller/NodeCapability.java b/hyracks-runtime/src/main/java/edu/uci/ics/hyracks/controller/NodeCapability.java
new file mode 100644
index 0000000..65fcd44
--- /dev/null
+++ b/hyracks-runtime/src/main/java/edu/uci/ics/hyracks/controller/NodeCapability.java
@@ -0,0 +1,31 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.controller;
+
+import java.io.Serializable;
+
+public class NodeCapability implements Serializable {
+    private static final long serialVersionUID = 1L;
+
+    private int cpuCount;
+
+    public int getCPUCount() {
+        return cpuCount;
+    }
+
+    public void setCPUCount(int cpuCount) {
+        this.cpuCount = cpuCount;
+    }
+}
\ No newline at end of file
diff --git a/hyracks-runtime/src/main/java/edu/uci/ics/hyracks/controller/NodeParameters.java b/hyracks-runtime/src/main/java/edu/uci/ics/hyracks/controller/NodeParameters.java
new file mode 100644
index 0000000..b04ad9d
--- /dev/null
+++ b/hyracks-runtime/src/main/java/edu/uci/ics/hyracks/controller/NodeParameters.java
@@ -0,0 +1,31 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.controller;
+
+import java.io.Serializable;
+
+public class NodeParameters implements Serializable {
+    private static final long serialVersionUID = 1L;
+
+    private int heartbeatPeriod;
+
+    public int getHeartbeatPeriod() {
+        return heartbeatPeriod;
+    }
+
+    public void setHeartbeatPeriod(int heartbeatPeriod) {
+        this.heartbeatPeriod = heartbeatPeriod;
+    }
+}
\ No newline at end of file
diff --git a/hyracks-runtime/src/main/java/edu/uci/ics/hyracks/controller/clustercontroller/ClusterControllerService.java b/hyracks-runtime/src/main/java/edu/uci/ics/hyracks/controller/clustercontroller/ClusterControllerService.java
new file mode 100644
index 0000000..94413f4
--- /dev/null
+++ b/hyracks-runtime/src/main/java/edu/uci/ics/hyracks/controller/clustercontroller/ClusterControllerService.java
@@ -0,0 +1,562 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.controller.clustercontroller;
+
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.net.InetAddress;
+import java.rmi.registry.LocateRegistry;
+import java.rmi.registry.Registry;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.UUID;
+import java.util.Vector;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Semaphore;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import javax.servlet.ServletException;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+
+import jol.core.Runtime;
+import jol.core.Runtime.DebugLevel;
+
+import org.eclipse.jetty.server.Handler;
+import org.eclipse.jetty.server.Request;
+import org.eclipse.jetty.server.handler.AbstractHandler;
+import org.eclipse.jetty.server.handler.ContextHandler;
+
+import edu.uci.ics.hyracks.api.client.IHyracksClientInterface;
+import edu.uci.ics.hyracks.api.dataflow.ActivityNodeId;
+import edu.uci.ics.hyracks.api.dataflow.OperatorDescriptorId;
+import edu.uci.ics.hyracks.api.dataflow.PortInstanceId;
+import edu.uci.ics.hyracks.api.job.JobFlag;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.api.job.JobStatus;
+import edu.uci.ics.hyracks.api.job.statistics.JobStatistics;
+import edu.uci.ics.hyracks.api.job.statistics.StageletStatistics;
+import edu.uci.ics.hyracks.comm.Endpoint;
+import edu.uci.ics.hyracks.config.CCConfig;
+import edu.uci.ics.hyracks.controller.AbstractRemoteService;
+import edu.uci.ics.hyracks.controller.NodeParameters;
+import edu.uci.ics.hyracks.controller.nodecontroller.INodeController;
+import edu.uci.ics.hyracks.job.JobPlan;
+import edu.uci.ics.hyracks.web.WebServer;
+
+public class ClusterControllerService extends AbstractRemoteService implements IClusterController {
+    private static final long serialVersionUID = 1L;
+
+    private CCConfig ccConfig;
+
+    private static Logger LOGGER = Logger.getLogger(ClusterControllerService.class.getName());
+
+    private final Map<String, NodeControllerState> nodeRegistry;
+
+    private WebServer webServer;
+
+    private final IJobManager jobManager;
+
+    private final Executor taskExecutor;
+
+    private final Timer timer;
+
+    private Runtime jolRuntime;
+
+    public ClusterControllerService(CCConfig ccConfig) throws Exception {
+        this.ccConfig = ccConfig;
+        nodeRegistry = new LinkedHashMap<String, NodeControllerState>();
+        Set<DebugLevel> jolDebugLevel = LOGGER.isLoggable(Level.FINE) ? Runtime.DEBUG_ALL : new HashSet<DebugLevel>();
+        jolRuntime = (Runtime) Runtime.create(jolDebugLevel, System.err);
+        jobManager = new JOLJobManagerImpl(this, jolRuntime);
+        taskExecutor = Executors.newCachedThreadPool();
+        webServer = new WebServer(new Handler[] { getAdminConsoleHandler(), getApplicationDataHandler() });
+        this.timer = new Timer(true);
+    }
+
+    @Override
+    public void start() throws Exception {
+        LOGGER.log(Level.INFO, "Starting ClusterControllerService");
+        Registry registry = LocateRegistry.createRegistry(ccConfig.port);
+        registry.rebind(IHyracksClientInterface.class.getName(), this);
+        registry.rebind(IClusterController.class.getName(), this);
+        webServer.setPort(ccConfig.httpPort);
+        webServer.start();
+        timer.schedule(new DeadNodeSweeper(), 0, ccConfig.heartbeatPeriod);
+        LOGGER.log(Level.INFO, "Started ClusterControllerService");
+    }
+
+    @Override
+    public void stop() throws Exception {
+        LOGGER.log(Level.INFO, "Stopping ClusterControllerService");
+        webServer.stop();
+        LOGGER.log(Level.INFO, "Stopped ClusterControllerService");
+    }
+
+    @Override
+    public UUID createJob(JobSpecification jobSpec) throws Exception {
+        return jobManager.createJob(jobSpec, EnumSet.noneOf(JobFlag.class));
+    }
+
+    @Override
+    public UUID createJob(JobSpecification jobSpec, EnumSet<JobFlag> jobFlags) throws Exception {
+        return jobManager.createJob(jobSpec, jobFlags);
+    }
+
+    @Override
+    public NodeParameters registerNode(INodeController nodeController) throws Exception {
+        String id = nodeController.getId();
+        NodeControllerState state = new NodeControllerState(nodeController);
+        synchronized (this) {
+            if (nodeRegistry.containsKey(id)) {
+                throw new Exception("Node with this name already registered.");
+            }
+            nodeRegistry.put(id, state);
+        }
+        nodeController.notifyRegistration(this);
+        jobManager.registerNode(id);
+        LOGGER.log(Level.INFO, "Registered INodeController: id = " + id);
+        NodeParameters params = new NodeParameters();
+        params.setHeartbeatPeriod(ccConfig.heartbeatPeriod);
+        return params;
+    }
+
+    @Override
+    public void unregisterNode(INodeController nodeController) throws Exception {
+        String id = nodeController.getId();
+        synchronized (this) {
+            nodeRegistry.remove(id);
+        }
+        LOGGER.log(Level.INFO, "Unregistered INodeController");
+    }
+
+    public synchronized NodeControllerState lookupNode(String id) throws Exception {
+        return nodeRegistry.get(id);
+    }
+
+    public Executor getExecutor() {
+        return taskExecutor;
+    }
+
+    public synchronized void notifyJobComplete(final UUID jobId) {
+        for (final NodeControllerState ns : nodeRegistry.values()) {
+            taskExecutor.execute(new Runnable() {
+                @Override
+                public void run() {
+                    try {
+                        ns.getNodeController().cleanUpJob(jobId);
+                    } catch (Exception e) {
+                    }
+                }
+
+            });
+        }
+    }
+
+    @Override
+    public void notifyStageletComplete(UUID jobId, UUID stageId, int attempt, String nodeId,
+            StageletStatistics statistics) throws Exception {
+        jobManager.notifyStageletComplete(jobId, stageId, attempt, nodeId, statistics);
+    }
+
+    @Override
+    public void notifyStageletFailure(UUID jobId, UUID stageId, int attempt, String nodeId) throws Exception {
+        jobManager.notifyStageletFailure(jobId, stageId, attempt, nodeId);
+    }
+
+    @Override
+    public JobStatus getJobStatus(UUID jobId) throws Exception {
+        return jobManager.getJobStatus(jobId);
+    }
+
+    @Override
+    public void start(UUID jobId) throws Exception {
+        jobManager.start(jobId);
+    }
+
+    @Override
+    public JobStatistics waitForCompletion(UUID jobId) throws Exception {
+        return jobManager.waitForCompletion(jobId);
+    }
+
+    private Handler getAdminConsoleHandler() {
+        ContextHandler handler = new ContextHandler("/admin");
+        handler.setHandler(new AbstractHandler() {
+            @Override
+            public void handle(String target, Request baseRequest, HttpServletRequest request,
+                    HttpServletResponse response) throws IOException, ServletException {
+                if (!"/".equals(target)) {
+                    return;
+                }
+                response.setContentType("text/html;charset=utf-8");
+                response.setStatus(HttpServletResponse.SC_OK);
+                baseRequest.setHandled(true);
+                PrintWriter writer = response.getWriter();
+                writer.println("<html><head><title>Hyracks Admin Console</title></head><body>");
+                writer.println("<h1>Hyracks Admin Console</h1>");
+                writer.println("<h2>Node Controllers</h2>");
+                writer.println("<table><tr><td>Node Id</td><td>Host</td></tr>");
+                synchronized (ClusterControllerService.this) {
+                    for (Map.Entry<String, NodeControllerState> e : nodeRegistry.entrySet()) {
+                        try {
+                            writer.print("<tr><td>");
+                            writer.print(e.getKey());
+                            writer.print("</td><td>");
+                            writer.print(e.getValue().getLastHeartbeatDuration());
+                            writer.print("</td></tr>");
+                        } catch (Exception ex) {
+                        }
+                    }
+                }
+                writer.println("</table>");
+                writer.println("</body></html>");
+                writer.flush();
+            }
+        });
+        return handler;
+    }
+
+    private Handler getApplicationDataHandler() {
+        ContextHandler handler = new ContextHandler("/applications");
+        handler.setHandler(new AbstractHandler() {
+            @Override
+            public void handle(String target, Request baseRequest, HttpServletRequest request,
+                    HttpServletResponse response) throws IOException, ServletException {
+            }
+        });
+        return handler;
+    }
+
+    @Override
+    public Map<String, InetAddress[]> getRegistry() throws Exception {
+        Map<String, INodeController> map = new HashMap<String, INodeController>();
+        for (Map.Entry<String, NodeControllerState> e : nodeRegistry.entrySet()) {
+            map.put(e.getKey(), e.getValue().getNodeController());
+        }
+        // return map;
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public synchronized void nodeHeartbeat(String id) throws Exception {
+        if (LOGGER.isLoggable(Level.INFO)) {
+            LOGGER.info("Heartbeat from: " + id);
+        }
+        NodeControllerState ns = nodeRegistry.get(id);
+        if (ns != null) {
+            ns.notifyHeartbeat();
+        }
+    }
+
+    private void killNode(String nodeId) throws Exception {
+        nodeRegistry.remove(nodeId);
+        jobManager.notifyNodeFailure(nodeId);
+    }
+
+    private class DeadNodeSweeper extends TimerTask {
+        @Override
+        public void run() {
+            Set<String> deadNodes = new HashSet<String>();
+            synchronized (ClusterControllerService.this) {
+                for (Map.Entry<String, NodeControllerState> e : nodeRegistry.entrySet()) {
+                    NodeControllerState state = e.getValue();
+                    if (state.incrementLastHeartbeatDuration() >= ccConfig.maxHeartbeatLapsePeriods) {
+                        deadNodes.add(e.getKey());
+                    }
+                }
+                for (String deadNode : deadNodes) {
+                    try {
+                        if (LOGGER.isLoggable(Level.INFO)) {
+                            LOGGER.info("Killing node: " + deadNode);
+                        }
+                        killNode(deadNode);
+                    } catch (Exception e) {
+                        e.printStackTrace();
+                    }
+                }
+            }
+        }
+    }
+
+    interface RemoteOp<T> {
+        public String getNodeId();
+
+        public T execute(INodeController node) throws Exception;
+    }
+
+    interface Accumulator<T, R> {
+        public void accumulate(T o);
+
+        public R getResult();
+    }
+
+    <T, R> R runRemote(final RemoteOp<T>[] remoteOps, final Accumulator<T, R> accumulator) throws Exception {
+        final Semaphore installComplete = new Semaphore(remoteOps.length);
+        final List<Exception> errors = new Vector<Exception>();
+        for (final RemoteOp<T> remoteOp : remoteOps) {
+            NodeControllerState nodeState = lookupNode(remoteOp.getNodeId());
+            final INodeController node = nodeState.getNodeController();
+
+            installComplete.acquire();
+            Runnable remoteRunner = new Runnable() {
+                @Override
+                public void run() {
+                    try {
+                        T t = remoteOp.execute(node);
+                        if (accumulator != null) {
+                            synchronized (accumulator) {
+                                accumulator.accumulate(t);
+                            }
+                        }
+                    } catch (Exception e) {
+                        errors.add(e);
+                    } finally {
+                        installComplete.release();
+                    }
+                }
+            };
+
+            getExecutor().execute(remoteRunner);
+        }
+        installComplete.acquire(remoteOps.length);
+        if (!errors.isEmpty()) {
+            throw errors.get(0);
+        }
+        return accumulator == null ? null : accumulator.getResult();
+    }
+
+    static class Phase1Installer implements RemoteOp<Map<PortInstanceId, Endpoint>> {
+        private String nodeId;
+        private UUID jobId;
+        private JobPlan plan;
+        private UUID stageId;
+        private int attempt;
+        private Map<ActivityNodeId, Set<Integer>> tasks;
+        private Map<OperatorDescriptorId, Set<Integer>> opPartitions;
+
+        public Phase1Installer(String nodeId, UUID jobId, JobPlan plan, UUID stageId, int attempt,
+                Map<ActivityNodeId, Set<Integer>> tasks, Map<OperatorDescriptorId, Set<Integer>> opPartitions) {
+            this.nodeId = nodeId;
+            this.jobId = jobId;
+            this.plan = plan;
+            this.stageId = stageId;
+            this.attempt = attempt;
+            this.tasks = tasks;
+            this.opPartitions = opPartitions;
+        }
+
+        @Override
+        public Map<PortInstanceId, Endpoint> execute(INodeController node) throws Exception {
+            return node.initializeJobletPhase1(jobId, plan, stageId, attempt, tasks, opPartitions);
+        }
+
+        @Override
+        public String toString() {
+            return jobId + " Distribution Phase 1";
+        }
+
+        @Override
+        public String getNodeId() {
+            return nodeId;
+        }
+    }
+
+    static class Phase2Installer implements RemoteOp<Void> {
+        private String nodeId;
+        private UUID jobId;
+        private JobPlan plan;
+        private UUID stageId;
+        private Map<ActivityNodeId, Set<Integer>> tasks;
+        private Map<OperatorDescriptorId, Set<Integer>> opPartitions;
+        private Map<PortInstanceId, Endpoint> globalPortMap;
+
+        public Phase2Installer(String nodeId, UUID jobId, JobPlan plan, UUID stageId,
+                Map<ActivityNodeId, Set<Integer>> tasks, Map<OperatorDescriptorId, Set<Integer>> opPartitions,
+                Map<PortInstanceId, Endpoint> globalPortMap) {
+            this.nodeId = nodeId;
+            this.jobId = jobId;
+            this.plan = plan;
+            this.stageId = stageId;
+            this.tasks = tasks;
+            this.opPartitions = opPartitions;
+            this.globalPortMap = globalPortMap;
+        }
+
+        @Override
+        public Void execute(INodeController node) throws Exception {
+            node.initializeJobletPhase2(jobId, plan, stageId, tasks, opPartitions, globalPortMap);
+            return null;
+        }
+
+        @Override
+        public String toString() {
+            return jobId + " Distribution Phase 2";
+        }
+
+        @Override
+        public String getNodeId() {
+            return nodeId;
+        }
+    }
+
+    static class Phase3Installer implements RemoteOp<Void> {
+        private String nodeId;
+        private UUID jobId;
+        private UUID stageId;
+
+        public Phase3Installer(String nodeId, UUID jobId, UUID stageId) {
+            this.nodeId = nodeId;
+            this.jobId = jobId;
+            this.stageId = stageId;
+        }
+
+        @Override
+        public Void execute(INodeController node) throws Exception {
+            node.commitJobletInitialization(jobId, stageId);
+            return null;
+        }
+
+        @Override
+        public String toString() {
+            return jobId + " Distribution Phase 3";
+        }
+
+        @Override
+        public String getNodeId() {
+            return nodeId;
+        }
+    }
+
+    static class StageStarter implements RemoteOp<Void> {
+        private String nodeId;
+        private UUID jobId;
+        private UUID stageId;
+
+        public StageStarter(String nodeId, UUID jobId, UUID stageId) {
+            this.nodeId = nodeId;
+            this.jobId = jobId;
+            this.stageId = stageId;
+        }
+
+        @Override
+        public Void execute(INodeController node) throws Exception {
+            node.startStage(jobId, stageId);
+            return null;
+        }
+
+        @Override
+        public String toString() {
+            return jobId + " Started Stage: " + stageId;
+        }
+
+        @Override
+        public String getNodeId() {
+            return nodeId;
+        }
+    }
+
+    static class JobletAborter implements RemoteOp<Void> {
+        private String nodeId;
+        private UUID jobId;
+        private UUID stageId;
+
+        public JobletAborter(String nodeId, UUID jobId, UUID stageId, int attempt) {
+            this.nodeId = nodeId;
+            this.jobId = jobId;
+            this.stageId = stageId;
+        }
+
+        @Override
+        public Void execute(INodeController node) throws Exception {
+            node.abortJoblet(jobId, stageId);
+            return null;
+        }
+
+        @Override
+        public String toString() {
+            return jobId + " Aborting";
+        }
+
+        @Override
+        public String getNodeId() {
+            return nodeId;
+        }
+    }
+
+    static class JobCompleteNotifier implements RemoteOp<Void> {
+        private String nodeId;
+        private UUID jobId;
+
+        public JobCompleteNotifier(String nodeId, UUID jobId) {
+            this.nodeId = nodeId;
+            this.jobId = jobId;
+        }
+
+        @Override
+        public Void execute(INodeController node) throws Exception {
+            node.cleanUpJob(jobId);
+            return null;
+        }
+
+        @Override
+        public String toString() {
+            return jobId + " Cleaning Up";
+        }
+
+        @Override
+        public String getNodeId() {
+            return nodeId;
+        }
+    }
+
+    static class PortMapMergingAccumulator implements
+            Accumulator<Map<PortInstanceId, Endpoint>, Map<PortInstanceId, Endpoint>> {
+        Map<PortInstanceId, Endpoint> portMap = new HashMap<PortInstanceId, Endpoint>();
+
+        @Override
+        public void accumulate(Map<PortInstanceId, Endpoint> o) {
+            portMap.putAll(o);
+        }
+
+        @Override
+        public Map<PortInstanceId, Endpoint> getResult() {
+            return portMap;
+        }
+    }
+
+    @Override
+    public void createApplication(String appName) throws Exception {
+
+    }
+
+    @Override
+    public void destroyApplication(String appName) throws Exception {
+
+    }
+
+    @Override
+    public void startApplication(String appName) throws Exception {
+
+    }
+}
\ No newline at end of file
diff --git a/hyracks-runtime/src/main/java/edu/uci/ics/hyracks/controller/clustercontroller/IClusterController.java b/hyracks-runtime/src/main/java/edu/uci/ics/hyracks/controller/clustercontroller/IClusterController.java
new file mode 100644
index 0000000..f59e712
--- /dev/null
+++ b/hyracks-runtime/src/main/java/edu/uci/ics/hyracks/controller/clustercontroller/IClusterController.java
@@ -0,0 +1,36 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.controller.clustercontroller;
+
+import java.rmi.Remote;
+import java.util.UUID;
+
+import edu.uci.ics.hyracks.api.client.IHyracksClientInterface;
+import edu.uci.ics.hyracks.api.job.statistics.StageletStatistics;
+import edu.uci.ics.hyracks.controller.NodeParameters;
+import edu.uci.ics.hyracks.controller.nodecontroller.INodeController;
+
+public interface IClusterController extends Remote, IHyracksClientInterface {
+    public NodeParameters registerNode(INodeController nodeController) throws Exception;
+
+    public void unregisterNode(INodeController nodeController) throws Exception;
+
+    public void notifyStageletComplete(UUID jobId, UUID stageId, int attempt, String nodeId,
+            StageletStatistics statistics) throws Exception;
+
+    public void notifyStageletFailure(UUID jobId, UUID stageId, int attempt, String nodeId) throws Exception;
+
+    public void nodeHeartbeat(String id) throws Exception;
+}
\ No newline at end of file
diff --git a/hyracks-runtime/src/main/java/edu/uci/ics/hyracks/controller/clustercontroller/IJobManager.java b/hyracks-runtime/src/main/java/edu/uci/ics/hyracks/controller/clustercontroller/IJobManager.java
new file mode 100644
index 0000000..e364423
--- /dev/null
+++ b/hyracks-runtime/src/main/java/edu/uci/ics/hyracks/controller/clustercontroller/IJobManager.java
@@ -0,0 +1,43 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.controller.clustercontroller;
+
+import java.util.EnumSet;
+import java.util.UUID;
+
+import edu.uci.ics.hyracks.api.job.JobFlag;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.api.job.JobStatus;
+import edu.uci.ics.hyracks.api.job.statistics.JobStatistics;
+import edu.uci.ics.hyracks.api.job.statistics.StageletStatistics;
+
+public interface IJobManager {
+    public UUID createJob(JobSpecification jobSpec, EnumSet<JobFlag> jobFlags) throws Exception;
+
+    public void start(UUID jobId) throws Exception;
+
+    public void notifyStageletComplete(UUID jobId, UUID stageId, int attempt, String nodeId,
+        StageletStatistics statistics) throws Exception;
+
+    public void notifyStageletFailure(UUID jobId, UUID stageId, int attempt, String nodeId) throws Exception;
+
+    public JobStatus getJobStatus(UUID jobId);
+
+    public JobStatistics waitForCompletion(UUID jobId) throws Exception;
+
+    public void notifyNodeFailure(String nodeId) throws Exception;
+
+    public void registerNode(String nodeId) throws Exception;
+}
\ No newline at end of file
diff --git a/hyracks-runtime/src/main/java/edu/uci/ics/hyracks/controller/clustercontroller/JOLJobManagerImpl.java b/hyracks-runtime/src/main/java/edu/uci/ics/hyracks/controller/clustercontroller/JOLJobManagerImpl.java
new file mode 100644
index 0000000..11d5ba5
--- /dev/null
+++ b/hyracks-runtime/src/main/java/edu/uci/ics/hyracks/controller/clustercontroller/JOLJobManagerImpl.java
@@ -0,0 +1,1052 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.controller.clustercontroller;
+
+import java.util.ArrayList;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.logging.Logger;
+
+import jol.core.Runtime;
+import jol.types.basic.BasicTupleSet;
+import jol.types.basic.Tuple;
+import jol.types.basic.TupleSet;
+import jol.types.exception.BadKeyException;
+import jol.types.exception.UpdateException;
+import jol.types.table.BasicTable;
+import jol.types.table.EventTable;
+import jol.types.table.Function;
+import jol.types.table.Key;
+import jol.types.table.TableName;
+import edu.uci.ics.hyracks.api.constraints.AbsoluteLocationConstraint;
+import edu.uci.ics.hyracks.api.constraints.ChoiceLocationConstraint;
+import edu.uci.ics.hyracks.api.constraints.ExplicitPartitionConstraint;
+import edu.uci.ics.hyracks.api.constraints.LocationConstraint;
+import edu.uci.ics.hyracks.api.constraints.PartitionConstraint;
+import edu.uci.ics.hyracks.api.constraints.PartitionCountConstraint;
+import edu.uci.ics.hyracks.api.dataflow.ActivityNodeId;
+import edu.uci.ics.hyracks.api.dataflow.ConnectorDescriptorId;
+import edu.uci.ics.hyracks.api.dataflow.Direction;
+import edu.uci.ics.hyracks.api.dataflow.IActivityGraphBuilder;
+import edu.uci.ics.hyracks.api.dataflow.IActivityNode;
+import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
+import edu.uci.ics.hyracks.api.dataflow.OperatorDescriptorId;
+import edu.uci.ics.hyracks.api.dataflow.PortInstanceId;
+import edu.uci.ics.hyracks.api.job.JobFlag;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.api.job.JobStatus;
+import edu.uci.ics.hyracks.api.job.statistics.JobStatistics;
+import edu.uci.ics.hyracks.api.job.statistics.StageStatistics;
+import edu.uci.ics.hyracks.api.job.statistics.StageletStatistics;
+import edu.uci.ics.hyracks.comm.Endpoint;
+import edu.uci.ics.hyracks.job.JobPlan;
+
+public class JOLJobManagerImpl implements IJobManager {
+    private static final Logger LOGGER = Logger.getLogger(JOLJobManagerImpl.class.getName());
+
+    public static final String JOL_SCOPE = "hyrackscc";
+
+    private static final String SCHEDULER_OLG_FILE = "edu/uci/ics/hyracks/controller/clustercontroller/scheduler.olg";
+
+    private final Runtime jolRuntime;
+
+    private final LinkedBlockingQueue<Runnable> jobQueue;
+
+    private final JobTable jobTable;
+
+    private final JobQueueThread jobQueueThread;
+
+    private final OperatorDescriptorTable odTable;
+
+    private final OperatorLocationTable olTable;
+
+    private final OperatorCloneCountTable ocTable;
+
+    private final ConnectorDescriptorTable cdTable;
+
+    private final ActivityNodeTable anTable;
+
+    private final ActivityConnectionTable acTable;
+
+    private final ActivityBlockedTable abTable;
+
+    private final JobStartTable jobStartTable;
+
+    private final JobCleanUpTable jobCleanUpTable;
+
+    private final JobCleanUpCompleteTable jobCleanUpCompleteTable;
+
+    private final StartMessageTable startMessageTable;
+
+    private final StageletCompleteTable stageletCompleteTable;
+
+    private final StageletFailureTable stageletFailureTable;
+
+    private final AvailableNodesTable availableNodesTable;
+
+    private final RankedAvailableNodesTable rankedAvailableNodesTable;
+
+    private final FailedNodesTable failedNodesTable;
+
+    private final AbortMessageTable abortMessageTable;
+
+    private final AbortNotifyTable abortNotifyTable;
+
+    private final ExpandPartitionCountConstraintTableFunction expandPartitionCountConstraintFunction;
+
+    private final List<String> rankedAvailableNodes;
+
+    public JOLJobManagerImpl(final ClusterControllerService ccs, final Runtime jolRuntime) throws Exception {
+        this.jolRuntime = jolRuntime;
+        jobQueue = new LinkedBlockingQueue<Runnable>();
+        jobQueueThread = new JobQueueThread();
+        jobQueueThread.start();
+
+        this.jobTable = new JobTable(jolRuntime);
+        this.odTable = new OperatorDescriptorTable(jolRuntime);
+        this.olTable = new OperatorLocationTable(jolRuntime);
+        this.ocTable = new OperatorCloneCountTable(jolRuntime);
+        this.cdTable = new ConnectorDescriptorTable(jolRuntime);
+        this.anTable = new ActivityNodeTable(jolRuntime);
+        this.acTable = new ActivityConnectionTable(jolRuntime);
+        this.abTable = new ActivityBlockedTable(jolRuntime);
+        this.jobStartTable = new JobStartTable();
+        this.jobCleanUpTable = new JobCleanUpTable(jolRuntime);
+        this.jobCleanUpCompleteTable = new JobCleanUpCompleteTable();
+        this.startMessageTable = new StartMessageTable(jolRuntime);
+        this.stageletCompleteTable = new StageletCompleteTable(jolRuntime);
+        this.stageletFailureTable = new StageletFailureTable(jolRuntime);
+        this.availableNodesTable = new AvailableNodesTable(jolRuntime);
+        this.rankedAvailableNodesTable = new RankedAvailableNodesTable(jolRuntime);
+        this.failedNodesTable = new FailedNodesTable(jolRuntime);
+        this.abortMessageTable = new AbortMessageTable(jolRuntime);
+        this.abortNotifyTable = new AbortNotifyTable(jolRuntime);
+        this.expandPartitionCountConstraintFunction = new ExpandPartitionCountConstraintTableFunction();
+        this.rankedAvailableNodes = new ArrayList<String>();
+
+        jolRuntime.catalog().register(jobTable);
+        jolRuntime.catalog().register(odTable);
+        jolRuntime.catalog().register(olTable);
+        jolRuntime.catalog().register(ocTable);
+        jolRuntime.catalog().register(cdTable);
+        jolRuntime.catalog().register(anTable);
+        jolRuntime.catalog().register(acTable);
+        jolRuntime.catalog().register(abTable);
+        jolRuntime.catalog().register(jobStartTable);
+        jolRuntime.catalog().register(jobCleanUpTable);
+        jolRuntime.catalog().register(jobCleanUpCompleteTable);
+        jolRuntime.catalog().register(startMessageTable);
+        jolRuntime.catalog().register(stageletCompleteTable);
+        jolRuntime.catalog().register(stageletFailureTable);
+        jolRuntime.catalog().register(availableNodesTable);
+        jolRuntime.catalog().register(rankedAvailableNodesTable);
+        jolRuntime.catalog().register(failedNodesTable);
+        jolRuntime.catalog().register(abortMessageTable);
+        jolRuntime.catalog().register(abortNotifyTable);
+        jolRuntime.catalog().register(expandPartitionCountConstraintFunction);
+
+        jobTable.register(new JobTable.Callback() {
+            @Override
+            public void deletion(TupleSet arg0) {
+                jobTable.notifyAll();
+            }
+
+            @Override
+            public void insertion(TupleSet arg0) {
+                jobTable.notifyAll();
+            }
+        });
+
+        startMessageTable.register(new StartMessageTable.Callback() {
+            @Override
+            public void deletion(TupleSet tuples) {
+
+            }
+
+            @SuppressWarnings("unchecked")
+            @Override
+            public void insertion(TupleSet tuples) {
+                for (final Tuple t : tuples) {
+                    jobQueue.add(new Runnable() {
+                        @Override
+                        public void run() {
+                            try {
+                                Object[] data = t.toArray();
+                                UUID jobId = (UUID) data[0];
+                                UUID stageId = (UUID) data[1];
+                                Integer attempt = (Integer) data[2];
+                                JobPlan plan = (JobPlan) data[3];
+                                Set<List> ts = (Set<List>) data[4];
+                                Map<OperatorDescriptorId, Set<Integer>> opPartitions = new HashMap<OperatorDescriptorId, Set<Integer>>();
+                                for (List t2 : ts) {
+                                    Object[] t2Data = t2.toArray();
+                                    Set<List> activityInfoSet = (Set<List>) t2Data[1];
+                                    for (List l : activityInfoSet) {
+                                        Object[] lData = l.toArray();
+                                        ActivityNodeId aid = (ActivityNodeId) lData[0];
+                                        Set<Integer> opParts = opPartitions.get(aid.getOperatorDescriptorId());
+                                        if (opParts == null) {
+                                            opParts = new HashSet<Integer>();
+                                            opPartitions.put(aid.getOperatorDescriptorId(), opParts);
+                                        }
+                                        opParts.add((Integer) lData[1]);
+                                    }
+                                }
+                                ClusterControllerService.Phase1Installer[] p1is = new ClusterControllerService.Phase1Installer[ts
+                                        .size()];
+                                int i = 0;
+                                for (List t2 : ts) {
+                                    Object[] t2Data = t2.toArray();
+                                    Map<ActivityNodeId, Set<Integer>> tasks = new HashMap<ActivityNodeId, Set<Integer>>();
+                                    Set<List> activityInfoSet = (Set<List>) t2Data[1];
+                                    for (List l : activityInfoSet) {
+                                        Object[] lData = l.toArray();
+                                        ActivityNodeId aid = (ActivityNodeId) lData[0];
+                                        Set<Integer> aParts = tasks.get(aid);
+                                        if (aParts == null) {
+                                            aParts = new HashSet<Integer>();
+                                            tasks.put(aid, aParts);
+                                        }
+                                        aParts.add((Integer) lData[1]);
+                                    }
+                                    p1is[i++] = new ClusterControllerService.Phase1Installer((String) t2Data[0], jobId,
+                                            plan, stageId, attempt, tasks, opPartitions);
+                                }
+                                LOGGER.info("Stage start - Phase 1");
+                                Map<PortInstanceId, Endpoint> globalPortMap = ccs.runRemote(p1is,
+                                        new ClusterControllerService.PortMapMergingAccumulator());
+
+                                ClusterControllerService.Phase2Installer[] p2is = new ClusterControllerService.Phase2Installer[ts
+                                        .size()];
+                                ClusterControllerService.Phase3Installer[] p3is = new ClusterControllerService.Phase3Installer[ts
+                                        .size()];
+                                ClusterControllerService.StageStarter[] ss = new ClusterControllerService.StageStarter[ts
+                                        .size()];
+                                i = 0;
+                                for (List t2 : ts) {
+                                    Object[] t2Data = t2.toArray();
+                                    Map<ActivityNodeId, Set<Integer>> tasks = new HashMap<ActivityNodeId, Set<Integer>>();
+                                    Set<List> activityInfoSet = (Set<List>) t2Data[1];
+                                    for (List l : activityInfoSet) {
+                                        Object[] lData = l.toArray();
+                                        ActivityNodeId aid = (ActivityNodeId) lData[0];
+                                        Set<Integer> aParts = tasks.get(aid);
+                                        if (aParts == null) {
+                                            aParts = new HashSet<Integer>();
+                                            tasks.put(aid, aParts);
+                                        }
+                                        aParts.add((Integer) lData[1]);
+                                    }
+                                    p2is[i] = new ClusterControllerService.Phase2Installer((String) t2Data[0], jobId,
+                                            plan, stageId, tasks, opPartitions, globalPortMap);
+                                    p3is[i] = new ClusterControllerService.Phase3Installer((String) t2Data[0], jobId,
+                                            stageId);
+                                    ss[i] = new ClusterControllerService.StageStarter((String) t2Data[0], jobId,
+                                            stageId);
+                                    ++i;
+                                }
+                                LOGGER.info("Stage start - Phase 2");
+                                ccs.runRemote(p2is, null);
+                                LOGGER.info("Stage start - Phase 3");
+                                ccs.runRemote(p3is, null);
+                                LOGGER.info("Stage start");
+                                ccs.runRemote(ss, null);
+                                LOGGER.info("Stage started");
+                            } catch (Exception e) {
+                            }
+                        }
+                    });
+                }
+            }
+        });
+
+        jobCleanUpTable.register(new JobCleanUpTable.Callback() {
+            @Override
+            public void deletion(TupleSet tuples) {
+            }
+
+            @SuppressWarnings("unchecked")
+            @Override
+            public void insertion(TupleSet tuples) {
+                for (final Tuple t : tuples) {
+                    jobQueue.add(new Runnable() {
+                        @Override
+                        public void run() {
+                            try {
+                                Object[] data = t.toArray();
+                                UUID jobId = (UUID) data[0];
+                                Set<String> ts = (Set<String>) data[1];
+                                ClusterControllerService.JobCompleteNotifier[] jcns = new ClusterControllerService.JobCompleteNotifier[ts
+                                        .size()];
+                                int i = 0;
+                                for (String n : ts) {
+                                    jcns[i++] = new ClusterControllerService.JobCompleteNotifier(n, jobId);
+                                }
+                                try {
+                                    ccs.runRemote(jcns, null);
+                                } finally {
+                                    BasicTupleSet jccTuples = new BasicTupleSet(JobCleanUpCompleteTable
+                                            .createTuple(jobId));
+                                    jolRuntime.schedule(JOL_SCOPE, JobCleanUpCompleteTable.TABLE_NAME, jccTuples, null);
+                                    jolRuntime.evaluate();
+                                }
+                            } catch (Exception e) {
+                            }
+                        }
+                    });
+                }
+            }
+        });
+
+        abortMessageTable.register(new AbortMessageTable.Callback() {
+            @Override
+            public void deletion(TupleSet tuples) {
+
+            }
+
+            @SuppressWarnings("unchecked")
+            @Override
+            public void insertion(TupleSet tuples) {
+                for (final Tuple t : tuples) {
+                    jobQueue.add(new Runnable() {
+                        @Override
+                        public void run() {
+                            try {
+                                Object[] data = t.toArray();
+                                UUID jobId = (UUID) data[0];
+                                UUID stageId = (UUID) data[1];
+                                Integer attempt = (Integer) data[2];
+                                Set<List> ts = (Set<List>) data[4];
+                                ClusterControllerService.JobletAborter[] jas = new ClusterControllerService.JobletAborter[ts
+                                        .size()];
+                                int i = 0;
+                                BasicTupleSet notificationTuples = new BasicTupleSet();
+                                for (List t2 : ts) {
+                                    Object[] t2Data = t2.toArray();
+                                    String nodeId = (String) t2Data[0];
+                                    jas[i++] = new ClusterControllerService.JobletAborter(nodeId, jobId, stageId,
+                                            attempt);
+                                    notificationTuples.add(AbortNotifyTable
+                                            .createTuple(jobId, stageId, nodeId, attempt));
+                                }
+                                try {
+                                    ccs.runRemote(jas, null);
+                                } finally {
+                                    jolRuntime.schedule(JOL_SCOPE, AbortNotifyTable.TABLE_NAME, notificationTuples,
+                                            null);
+                                    jolRuntime.evaluate();
+                                }
+                            } catch (Exception e) {
+                            }
+                        }
+                    });
+                }
+            }
+        });
+
+        jolRuntime.install(JOL_SCOPE, ClassLoader.getSystemResource(SCHEDULER_OLG_FILE));
+        jolRuntime.evaluate();
+    }
+
+    @Override
+    public UUID createJob(JobSpecification jobSpec, EnumSet<JobFlag> jobFlags) throws Exception {
+        final UUID jobId = UUID.randomUUID();
+
+        final JobPlanBuilder builder = new JobPlanBuilder();
+        builder.init(jobSpec, jobFlags);
+
+        final BasicTupleSet anTuples = new BasicTupleSet();
+        final BasicTupleSet acTuples = new BasicTupleSet();
+        final BasicTupleSet abTuples = new BasicTupleSet();
+        IActivityGraphBuilder gBuilder = new IActivityGraphBuilder() {
+            @Override
+            public void addTask(IActivityNode task) {
+                anTuples.add(ActivityNodeTable.createTuple(jobId, task));
+                builder.addTask(task);
+            }
+
+            @Override
+            public void addTargetEdge(int operatorOutputIndex, IActivityNode task, int taskOutputIndex) {
+                acTuples.add(ActivityConnectionTable.createTuple(jobId, task, Direction.OUTPUT, operatorOutputIndex,
+                        taskOutputIndex));
+                builder.addTargetEdge(operatorOutputIndex, task, taskOutputIndex);
+            }
+
+            @Override
+            public void addSourceEdge(int operatorInputIndex, IActivityNode task, int taskInputIndex) {
+                acTuples.add(ActivityConnectionTable.createTuple(jobId, task, Direction.INPUT, operatorInputIndex,
+                        taskInputIndex));
+                builder.addSourceEdge(operatorInputIndex, task, taskInputIndex);
+            }
+
+            @Override
+            public void addBlockingEdge(IActivityNode blocker, IActivityNode blocked) {
+                abTuples.add(ActivityBlockedTable.createTuple(jobId, blocker, blocked));
+                builder.addBlockingEdge(blocker, blocked);
+            }
+        };
+
+        BasicTupleSet odTuples = new BasicTupleSet();
+        BasicTupleSet olTuples = new BasicTupleSet();
+        BasicTupleSet ocTuples = new BasicTupleSet();
+        for (Map.Entry<OperatorDescriptorId, IOperatorDescriptor> e : jobSpec.getOperatorMap().entrySet()) {
+            IOperatorDescriptor od = e.getValue();
+            int nPartitions = addPartitionConstraintTuples(jobId, od, olTuples, ocTuples);
+            odTuples.add(OperatorDescriptorTable.createTuple(jobId, nPartitions, od));
+            od.contributeTaskGraph(gBuilder);
+        }
+
+        BasicTupleSet cdTuples = new BasicTupleSet();
+        for (Map.Entry<ConnectorDescriptorId, IConnectorDescriptor> e : jobSpec.getConnectorMap().entrySet()) {
+            cdTuples.add(ConnectorDescriptorTable.createTuple(jobId, jobSpec, e.getValue()));
+        }
+
+        BasicTupleSet jobTuples = new BasicTupleSet(JobTable.createInitialJobTuple(jobId, jobSpec, builder.getPlan()));
+
+        jolRuntime.schedule(JOL_SCOPE, JobTable.TABLE_NAME, jobTuples, null);
+        jolRuntime.schedule(JOL_SCOPE, OperatorDescriptorTable.TABLE_NAME, odTuples, null);
+        jolRuntime.schedule(JOL_SCOPE, OperatorLocationTable.TABLE_NAME, olTuples, null);
+        jolRuntime.schedule(JOL_SCOPE, OperatorCloneCountTable.TABLE_NAME, ocTuples, null);
+        jolRuntime.schedule(JOL_SCOPE, ConnectorDescriptorTable.TABLE_NAME, cdTuples, null);
+        jolRuntime.schedule(JOL_SCOPE, ActivityNodeTable.TABLE_NAME, anTuples, null);
+        jolRuntime.schedule(JOL_SCOPE, ActivityConnectionTable.TABLE_NAME, acTuples, null);
+        jolRuntime.schedule(JOL_SCOPE, ActivityBlockedTable.TABLE_NAME, abTuples, null);
+
+        jolRuntime.evaluate();
+
+        return jobId;
+    }
+
+    private int addPartitionConstraintTuples(UUID jobId, IOperatorDescriptor od, BasicTupleSet olTuples,
+            BasicTupleSet ocTuples) {
+        PartitionConstraint pc = od.getPartitionConstraint();
+
+        switch (pc.getPartitionConstraintType()) {
+            case COUNT:
+                int count = ((PartitionCountConstraint) pc).getCount();
+                ocTuples.add(OperatorCloneCountTable.createTuple(jobId, od.getOperatorId(), count));
+                return count;
+
+            case EXPLICIT:
+                LocationConstraint[] locationConstraints = ((ExplicitPartitionConstraint) pc).getLocationConstraints();
+                for (int i = 0; i < locationConstraints.length; ++i) {
+                    addLocationConstraintTuple(olTuples, jobId, od.getOperatorId(), i, locationConstraints[i], 0);
+                }
+                return locationConstraints.length;
+        }
+        throw new IllegalArgumentException();
+    }
+
+    private void addLocationConstraintTuple(BasicTupleSet olTuples, UUID jobId, OperatorDescriptorId opId, int i,
+            LocationConstraint locationConstraint, int benefit) {
+        switch (locationConstraint.getConstraintType()) {
+            case ABSOLUTE:
+                String nodeId = ((AbsoluteLocationConstraint) locationConstraint).getLocationId();
+                olTuples.add(OperatorLocationTable.createTuple(jobId, opId, nodeId, i, benefit));
+                break;
+
+            case CHOICE:
+                int index = 0;
+                for (LocationConstraint lc : ((ChoiceLocationConstraint) locationConstraint).getChoices()) {
+                    addLocationConstraintTuple(olTuples, jobId, opId, i, lc, benefit - index);
+                    index++;
+                }
+        }
+    }
+
+    @Override
+    public JobStatus getJobStatus(UUID jobId) {
+        synchronized (jobTable) {
+            try {
+                Tuple jobTuple = jobTable.lookupJob(jobId);
+                if (jobTuple == null) {
+                    return null;
+                }
+                return (JobStatus) jobTuple.value(1);
+            } catch (BadKeyException e) {
+                throw new RuntimeException(e);
+            }
+        }
+    }
+
+    @Override
+    public synchronized void notifyNodeFailure(String nodeId) throws Exception {
+        int len = rankedAvailableNodes.size();
+        int delIndex = -1;
+        for (int i = 0; i < len; ++i) {
+            if (nodeId.equals(rankedAvailableNodes.get(i))) {
+                delIndex = i;
+                break;
+            }
+        }
+        if (delIndex < 0) {
+            return;
+        }
+        BasicTupleSet delRANTuples = new BasicTupleSet();
+        delRANTuples.add(RankedAvailableNodesTable.createTuple(nodeId, delIndex));
+
+        BasicTupleSet insRANTuples = new BasicTupleSet();
+        for (int i = delIndex + 1; i < len; ++i) {
+            insRANTuples.add(RankedAvailableNodesTable.createTuple(rankedAvailableNodes.get(i), i - 1));
+        }
+
+        rankedAvailableNodes.remove(delIndex);
+
+        jolRuntime.schedule(JOL_SCOPE, RankedAvailableNodesTable.TABLE_NAME, insRANTuples, delRANTuples);
+
+        BasicTupleSet unavailableTuples = new BasicTupleSet(AvailableNodesTable.createTuple(nodeId));
+
+        jolRuntime.schedule(JOL_SCOPE, AvailableNodesTable.TABLE_NAME, null, unavailableTuples);
+
+        jolRuntime.evaluate();
+
+        BasicTupleSet failedTuples = new BasicTupleSet(FailedNodesTable.createTuple(nodeId));
+
+        jolRuntime.schedule(JOL_SCOPE, FailedNodesTable.TABLE_NAME, failedTuples, null);
+
+        jolRuntime.evaluate();
+    }
+
+    @Override
+    public synchronized void notifyStageletComplete(UUID jobId, UUID stageId, int attempt, String nodeId,
+            StageletStatistics statistics) throws Exception {
+        BasicTupleSet scTuples = new BasicTupleSet();
+        scTuples.add(StageletCompleteTable.createTuple(jobId, stageId, nodeId, attempt, statistics));
+
+        jolRuntime.schedule(JOL_SCOPE, StageletCompleteTable.TABLE_NAME, scTuples, null);
+
+        jolRuntime.evaluate();
+    }
+
+    @Override
+    public synchronized void notifyStageletFailure(UUID jobId, UUID stageId, int attempt, String nodeId)
+            throws Exception {
+        BasicTupleSet sfTuples = new BasicTupleSet();
+        sfTuples.add(StageletFailureTable.createTuple(jobId, stageId, nodeId, attempt));
+
+        jolRuntime.schedule(JOL_SCOPE, StageletFailureTable.TABLE_NAME, sfTuples, null);
+
+        jolRuntime.evaluate();
+    }
+
+    @Override
+    public void start(UUID jobId) throws Exception {
+        BasicTupleSet jsTuples = new BasicTupleSet();
+        jsTuples.add(JobStartTable.createTuple(jobId, System.currentTimeMillis()));
+
+        jolRuntime.schedule(JOL_SCOPE, JobStartTable.TABLE_NAME, jsTuples, null);
+
+        jolRuntime.evaluate();
+    }
+
+    @Override
+    public synchronized void registerNode(String nodeId) throws Exception {
+        rankedAvailableNodes.add(nodeId);
+        BasicTupleSet insRANTuples = new BasicTupleSet();
+        insRANTuples.add(RankedAvailableNodesTable.createTuple(nodeId, rankedAvailableNodes.size() - 1));
+
+        jolRuntime.schedule(JOL_SCOPE, RankedAvailableNodesTable.TABLE_NAME, insRANTuples, null);
+
+        BasicTupleSet availableTuples = new BasicTupleSet(AvailableNodesTable.createTuple(nodeId));
+
+        jolRuntime.schedule(JOL_SCOPE, AvailableNodesTable.TABLE_NAME, availableTuples, null);
+
+        jolRuntime.evaluate();
+
+        BasicTupleSet unfailedTuples = new BasicTupleSet(FailedNodesTable.createTuple(nodeId));
+
+        jolRuntime.schedule(JOL_SCOPE, FailedNodesTable.TABLE_NAME, null, unfailedTuples);
+
+        jolRuntime.evaluate();
+    }
+
+    @Override
+    public JobStatistics waitForCompletion(UUID jobId) throws Exception {
+        synchronized (jobTable) {
+            Tuple jobTuple = null;
+            while ((jobTuple = jobTable.lookupJob(jobId)) != null && jobTuple.value(1) != JobStatus.TERMINATED) {
+                jobTable.wait();
+            }
+            return jobTuple == null ? null : jobTable.buildJobStatistics(jobTuple);
+        }
+    }
+
+    /*
+     * declare(job, keys(0), {JobId, Status, JobSpec, JobPlan})
+     */
+    private static class JobTable extends BasicTable {
+        private static TableName TABLE_NAME = new TableName(JOL_SCOPE, "job");
+
+        private static Key PRIMARY_KEY = new Key(0);
+
+        @SuppressWarnings("unchecked")
+        private static final Class[] SCHEMA = new Class[] { UUID.class, JobStatus.class, JobSpecification.class,
+                JobPlan.class, Set.class };
+
+        public JobTable(Runtime context) {
+            super(context, TABLE_NAME, PRIMARY_KEY, SCHEMA);
+        }
+
+        @SuppressWarnings("unchecked")
+        static Tuple createInitialJobTuple(UUID jobId, JobSpecification jobSpec, JobPlan plan) {
+            return new Tuple(jobId, JobStatus.INITIALIZED, jobSpec, plan, new HashSet());
+        }
+
+        @SuppressWarnings("unchecked")
+        JobStatistics buildJobStatistics(Tuple jobTuple) {
+            Set<Set<StageletStatistics>> statsSet = (Set<Set<StageletStatistics>>) jobTuple.value(4);
+            JobStatistics stats = new JobStatistics();
+            if (statsSet != null) {
+                for (Set<StageletStatistics> stageStatsSet : statsSet) {
+                    StageStatistics stageStats = new StageStatistics();
+                    for (StageletStatistics stageletStats : stageStatsSet) {
+                        stageStats.addStageletStatistics(stageletStats);
+                    }
+                    stats.addStageStatistics(stageStats);
+                }
+            }
+            return stats;
+        }
+
+        Tuple lookupJob(UUID jobId) throws BadKeyException {
+            TupleSet set = primary().lookupByKey(jobId);
+            if (set.isEmpty()) {
+                return null;
+            }
+            return (Tuple) set.toArray()[0];
+        }
+    }
+
+    /*
+     * declare(operatordescriptor, keys(0, 1), {JobId, ODId, OperatorDescriptor})
+     */
+    private static class OperatorDescriptorTable extends BasicTable {
+        private static TableName TABLE_NAME = new TableName(JOL_SCOPE, "operatordescriptor");
+
+        private static Key PRIMARY_KEY = new Key(0, 1);
+
+        @SuppressWarnings("unchecked")
+        private static final Class[] SCHEMA = new Class[] { UUID.class, OperatorDescriptorId.class, Integer.class,
+                IOperatorDescriptor.class };
+
+        public OperatorDescriptorTable(Runtime context) {
+            super(context, TABLE_NAME, PRIMARY_KEY, SCHEMA);
+        }
+
+        static Tuple createTuple(UUID jobId, int nPartitions, IOperatorDescriptor od) {
+            return new Tuple(jobId, od.getOperatorId(), nPartitions, od);
+        }
+    }
+
+    /*
+     * declare(operatorlocation, keys(0, 1), {JobId, ODId, NodeId})
+     */
+    private static class OperatorLocationTable extends BasicTable {
+        private static TableName TABLE_NAME = new TableName(JOL_SCOPE, "operatorlocation");
+
+        private static Key PRIMARY_KEY = new Key();
+
+        @SuppressWarnings("unchecked")
+        private static final Class[] SCHEMA = new Class[] { UUID.class, OperatorDescriptorId.class, String.class,
+                Integer.class, Integer.class };
+
+        public OperatorLocationTable(Runtime context) {
+            super(context, TABLE_NAME, PRIMARY_KEY, SCHEMA);
+        }
+
+        static Tuple createTuple(UUID jobId, OperatorDescriptorId opId, String nodeId, int partition, int benefit) {
+            return new Tuple(jobId, opId, nodeId, partition, benefit);
+        }
+    }
+
+    /*
+     * declare(operatorclonecount, keys(0, 1), {JobId, ODId, Count})
+     */
+    private static class OperatorCloneCountTable extends BasicTable {
+        private static TableName TABLE_NAME = new TableName(JOL_SCOPE, "operatorclonecount");
+
+        private static Key PRIMARY_KEY = new Key();
+
+        @SuppressWarnings("unchecked")
+        private static final Class[] SCHEMA = new Class[] { UUID.class, OperatorDescriptorId.class, Integer.class };
+
+        public OperatorCloneCountTable(Runtime context) {
+            super(context, TABLE_NAME, PRIMARY_KEY, SCHEMA);
+        }
+
+        static Tuple createTuple(UUID jobId, OperatorDescriptorId opId, int cloneCount) {
+            return new Tuple(jobId, opId, cloneCount);
+        }
+    }
+
+    /*
+     * declare(connectordescriptor, keys(0, 1), {JobId, CDId, SrcODId, SrcPort, DestODId, DestPort, ConnectorDescriptor})
+     */
+    private static class ConnectorDescriptorTable extends BasicTable {
+        private static TableName TABLE_NAME = new TableName(JOL_SCOPE, "connectordescriptor");
+
+        private static Key PRIMARY_KEY = new Key(0, 1);
+
+        @SuppressWarnings("unchecked")
+        private static final Class[] SCHEMA = new Class[] { UUID.class, ConnectorDescriptorId.class,
+                OperatorDescriptorId.class, Integer.class, OperatorDescriptorId.class, Integer.class,
+                IConnectorDescriptor.class };
+
+        public ConnectorDescriptorTable(Runtime context) {
+            super(context, TABLE_NAME, PRIMARY_KEY, SCHEMA);
+        }
+
+        static Tuple createTuple(UUID jobId, JobSpecification jobSpec, IConnectorDescriptor conn) {
+            IOperatorDescriptor srcOD = jobSpec.getProducer(conn);
+            int srcPort = jobSpec.getProducerOutputIndex(conn);
+            IOperatorDescriptor destOD = jobSpec.getConsumer(conn);
+            int destPort = jobSpec.getConsumerInputIndex(conn);
+            Tuple cdTuple = new Tuple(jobId, conn.getConnectorId(), srcOD.getOperatorId(), srcPort,
+                    destOD.getOperatorId(), destPort, conn);
+            return cdTuple;
+        }
+    }
+
+    /*
+     * declare(activitynode, keys(0, 1, 2), {JobId, OperatorId, ActivityId, ActivityNode})
+     */
+    private static class ActivityNodeTable extends BasicTable {
+        private static TableName TABLE_NAME = new TableName(JOL_SCOPE, "activitynode");
+
+        private static Key PRIMARY_KEY = new Key(0, 1, 2);
+
+        @SuppressWarnings("unchecked")
+        private static final Class[] SCHEMA = new Class[] { UUID.class, OperatorDescriptorId.class,
+                ActivityNodeId.class, IActivityNode.class };
+
+        public ActivityNodeTable(Runtime context) {
+            super(context, TABLE_NAME, PRIMARY_KEY, SCHEMA);
+        }
+
+        static Tuple createTuple(UUID jobId, IActivityNode aNode) {
+            return new Tuple(jobId, aNode.getActivityNodeId().getOperatorDescriptorId(), aNode.getActivityNodeId(),
+                    aNode);
+        }
+    }
+
+    /*
+     * declare(activityconnection, keys(0, 1, 2, 3), {JobId, OperatorId, Integer, Direction, ActivityNodeId, Integer})
+     */
+    private static class ActivityConnectionTable extends BasicTable {
+        private static TableName TABLE_NAME = new TableName(JOL_SCOPE, "activityconnection");
+
+        private static Key PRIMARY_KEY = new Key(0, 1, 2, 3);
+
+        @SuppressWarnings("unchecked")
+        private static final Class[] SCHEMA = new Class[] { UUID.class, OperatorDescriptorId.class, Integer.class,
+                Direction.class, ActivityNodeId.class, Integer.class };
+
+        public ActivityConnectionTable(Runtime context) {
+            super(context, TABLE_NAME, PRIMARY_KEY, SCHEMA);
+        }
+
+        static Tuple createTuple(UUID jobId, IActivityNode aNode, Direction direction, int odPort, int activityPort) {
+            return new Tuple(jobId, aNode.getActivityNodeId().getOperatorDescriptorId(), odPort, direction,
+                    aNode.getActivityNodeId(), activityPort);
+        }
+    }
+
+    /*
+     * declare(activityblocked, keys(0, 1, 2, 3), {JobId, OperatorId, BlockerActivityId, BlockedActivityId})
+     */
+    private static class ActivityBlockedTable extends BasicTable {
+        private static TableName TABLE_NAME = new TableName(JOL_SCOPE, "activityblocked");
+
+        private static Key PRIMARY_KEY = new Key(0, 1, 2, 3, 4);
+
+        @SuppressWarnings("unchecked")
+        private static final Class[] SCHEMA = new Class[] { UUID.class, OperatorDescriptorId.class,
+                ActivityNodeId.class, OperatorDescriptorId.class, ActivityNodeId.class };
+
+        public ActivityBlockedTable(Runtime context) {
+            super(context, TABLE_NAME, PRIMARY_KEY, SCHEMA);
+        }
+
+        static Tuple createTuple(UUID jobId, IActivityNode blocker, IActivityNode blocked) {
+            ActivityNodeId blockerANId = blocker.getActivityNodeId();
+            OperatorDescriptorId blockerODId = blockerANId.getOperatorDescriptorId();
+            ActivityNodeId blockedANId = blocked.getActivityNodeId();
+            OperatorDescriptorId blockedODId = blockedANId.getOperatorDescriptorId();
+            return new Tuple(jobId, blockerODId, blockerANId, blockedODId, blockedANId);
+        }
+    }
+
+    /*
+     * declare(jobstart, keys(0), {JobId, SubmitTime})
+     */
+    private static class JobStartTable extends EventTable {
+        private static TableName TABLE_NAME = new TableName(JOL_SCOPE, "jobstart");
+
+        @SuppressWarnings("unchecked")
+        private static final Class[] SCHEMA = new Class[] { UUID.class, Long.class };
+
+        public JobStartTable() {
+            super(TABLE_NAME, SCHEMA);
+        }
+
+        static Tuple createTuple(UUID jobId, long submitTime) {
+            return new Tuple(jobId, submitTime);
+        }
+    }
+
+    /*
+     * declare(startmessage, keys(0, 1), {JobId, StageId, JobPlan, TupleSet})
+     */
+    private static class StartMessageTable extends BasicTable {
+        private static TableName TABLE_NAME = new TableName(JOL_SCOPE, "startmessage");
+
+        private static Key PRIMARY_KEY = new Key(0, 1, 2);
+
+        @SuppressWarnings("unchecked")
+        private static final Class[] SCHEMA = new Class[] { UUID.class, UUID.class, Integer.class, JobPlan.class,
+                Set.class };
+
+        public StartMessageTable(Runtime context) {
+            super(context, TABLE_NAME, PRIMARY_KEY, SCHEMA);
+        }
+    }
+
+    /*
+     * declare(jobcleanup, keys(0), {JobId, Set<NodeId>})
+     */
+    private static class JobCleanUpTable extends BasicTable {
+        private static TableName TABLE_NAME = new TableName(JOL_SCOPE, "jobcleanup");
+
+        private static Key PRIMARY_KEY = new Key(0);
+
+        @SuppressWarnings("unchecked")
+        private static final Class[] SCHEMA = new Class[] { UUID.class, Set.class };
+
+        public JobCleanUpTable(Runtime context) {
+            super(context, TABLE_NAME, PRIMARY_KEY, SCHEMA);
+        }
+    }
+
+    /*
+     * declare(jobcleanupcomplete, keys(0), {JobId})
+     */
+    private static class JobCleanUpCompleteTable extends EventTable {
+        private static TableName TABLE_NAME = new TableName(JOL_SCOPE, "jobcleanupcomplete");
+
+        @SuppressWarnings("unchecked")
+        private static final Class[] SCHEMA = new Class[] { UUID.class };
+
+        public JobCleanUpCompleteTable() {
+            super(TABLE_NAME, SCHEMA);
+        }
+
+        public static Tuple createTuple(UUID jobId) {
+            return new Tuple(jobId);
+        }
+    }
+
+    /*
+     * declare(stageletcomplete, keys(0, 1, 2, 3), {JobId, StageId, NodeId, Attempt, StageletStatistics})
+     */
+    private static class StageletCompleteTable extends BasicTable {
+        private static TableName TABLE_NAME = new TableName(JOL_SCOPE, "stageletcomplete");
+
+        private static Key PRIMARY_KEY = new Key(0, 1, 2, 3);
+
+        @SuppressWarnings("unchecked")
+        private static final Class[] SCHEMA = new Class[] { UUID.class, UUID.class, String.class, Integer.class,
+                StageletStatistics.class };
+
+        public StageletCompleteTable(Runtime context) {
+            super(context, TABLE_NAME, PRIMARY_KEY, SCHEMA);
+        }
+
+        public static Tuple createTuple(UUID jobId, UUID stageId, String nodeId, int attempt,
+                StageletStatistics statistics) {
+            return new Tuple(jobId, stageId, nodeId, attempt, statistics);
+        }
+    }
+
+    /*
+     * declare(stageletfailure, keys(0, 1, 2, 3), {JobId, StageId, NodeId, Attempt})
+     */
+    private static class StageletFailureTable extends BasicTable {
+        private static TableName TABLE_NAME = new TableName(JOL_SCOPE, "stageletfailure");
+
+        private static Key PRIMARY_KEY = new Key(0, 1, 2, 3);
+
+        @SuppressWarnings("unchecked")
+        private static final Class[] SCHEMA = new Class[] { UUID.class, UUID.class, String.class, Integer.class };
+
+        public StageletFailureTable(Runtime context) {
+            super(context, TABLE_NAME, PRIMARY_KEY, SCHEMA);
+        }
+
+        public static Tuple createTuple(UUID jobId, UUID stageId, String nodeId, int attempt) {
+            return new Tuple(jobId, stageId, nodeId, attempt);
+        }
+    }
+
+    /*
+     * declare(availablenodes, keys(0), {NodeId})
+     */
+    private static class AvailableNodesTable extends BasicTable {
+        private static TableName TABLE_NAME = new TableName(JOL_SCOPE, "availablenodes");
+
+        private static Key PRIMARY_KEY = new Key(0);
+
+        @SuppressWarnings("unchecked")
+        private static final Class[] SCHEMA = new Class[] { String.class };
+
+        public AvailableNodesTable(Runtime context) {
+            super(context, TABLE_NAME, PRIMARY_KEY, SCHEMA);
+        }
+
+        public static Tuple createTuple(String nodeId) {
+            return new Tuple(nodeId);
+        }
+    }
+
+    /*
+     * declare(rankedavailablenodes, keys(0), {NodeId, Integer})
+     */
+    private static class RankedAvailableNodesTable extends BasicTable {
+        private static TableName TABLE_NAME = new TableName(JOL_SCOPE, "rankedavailablenodes");
+
+        private static Key PRIMARY_KEY = new Key(0);
+
+        @SuppressWarnings("unchecked")
+        private static final Class[] SCHEMA = new Class[] { String.class, Integer.class };
+
+        public RankedAvailableNodesTable(Runtime context) {
+            super(context, TABLE_NAME, PRIMARY_KEY, SCHEMA);
+        }
+
+        public static Tuple createTuple(String nodeId, int rank) {
+            return new Tuple(nodeId, rank);
+        }
+    }
+
+    /*
+     * declare(failednodes, keys(0), {NodeId})
+     */
+    private static class FailedNodesTable extends BasicTable {
+        private static TableName TABLE_NAME = new TableName(JOL_SCOPE, "failednodes");
+
+        private static Key PRIMARY_KEY = new Key(0);
+
+        @SuppressWarnings("unchecked")
+        private static final Class[] SCHEMA = new Class[] { String.class };
+
+        public FailedNodesTable(Runtime context) {
+            super(context, TABLE_NAME, PRIMARY_KEY, SCHEMA);
+        }
+
+        public static Tuple createTuple(String nodeId) {
+            return new Tuple(nodeId);
+        }
+    }
+
+    /*
+     * declare(abortmessage, keys(0, 1), {JobId, StageId, Attempt, JobPlan, TupleSet})
+     */
+    private static class AbortMessageTable extends BasicTable {
+        private static TableName TABLE_NAME = new TableName(JOL_SCOPE, "abortmessage");
+
+        private static Key PRIMARY_KEY = new Key(0, 1, 2);
+
+        @SuppressWarnings("unchecked")
+        private static final Class[] SCHEMA = new Class[] { UUID.class, UUID.class, Integer.class, JobPlan.class,
+                Set.class };
+
+        public AbortMessageTable(Runtime context) {
+            super(context, TABLE_NAME, PRIMARY_KEY, SCHEMA);
+        }
+    }
+
+    /*
+     * declare(abortnotify, keys(0, 1, 2, 3), {JobId, StageId, NodeId, Attempt, StageletStatistics})
+     */
+    private static class AbortNotifyTable extends BasicTable {
+        private static TableName TABLE_NAME = new TableName(JOL_SCOPE, "abortnotify");
+
+        private static Key PRIMARY_KEY = new Key(0, 1, 2, 3);
+
+        @SuppressWarnings("unchecked")
+        private static final Class[] SCHEMA = new Class[] { UUID.class, UUID.class, String.class, Integer.class };
+
+        public AbortNotifyTable(Runtime context) {
+            super(context, TABLE_NAME, PRIMARY_KEY, SCHEMA);
+        }
+
+        public static Tuple createTuple(UUID jobId, UUID stageId, String nodeId, int attempt) {
+            return new Tuple(jobId, stageId, nodeId, attempt);
+        }
+    }
+
+    private static class ExpandPartitionCountConstraintTableFunction extends Function {
+        private static final String TABLE_NAME = "expandpartitioncountconstraint";
+
+        @SuppressWarnings("unchecked")
+        private static final Class[] SCHEMA = new Class[] { UUID.class, OperatorDescriptorId.class, Integer.class,
+                Integer.class };
+
+        public ExpandPartitionCountConstraintTableFunction() {
+            super(TABLE_NAME, SCHEMA);
+        }
+
+        @Override
+        public TupleSet insert(TupleSet tuples, TupleSet conflicts) throws UpdateException {
+            TupleSet result = new BasicTupleSet();
+            int counter = 0;
+            for (Tuple t : tuples) {
+                int nPartitions = (Integer) t.value(2);
+                for (int i = 0; i < nPartitions; ++i) {
+                    result.add(new Tuple(t.value(0), t.value(1), i, counter++));
+                }
+            }
+            return result;
+        }
+    }
+
+    private class JobQueueThread extends Thread {
+        public JobQueueThread() {
+            setDaemon(true);
+        }
+
+        public void run() {
+            Runnable r;
+            while (true) {
+                try {
+                    r = jobQueue.take();
+                } catch (InterruptedException e) {
+                    continue;
+                }
+                try {
+                    r.run();
+                } catch (Exception e) {
+                    e.printStackTrace();
+                }
+            }
+        }
+    }
+}
\ No newline at end of file
diff --git a/hyracks-runtime/src/main/java/edu/uci/ics/hyracks/controller/clustercontroller/JobPlanBuilder.java b/hyracks-runtime/src/main/java/edu/uci/ics/hyracks/controller/clustercontroller/JobPlanBuilder.java
new file mode 100644
index 0000000..8781233
--- /dev/null
+++ b/hyracks-runtime/src/main/java/edu/uci/ics/hyracks/controller/clustercontroller/JobPlanBuilder.java
@@ -0,0 +1,90 @@
+package edu.uci.ics.hyracks.controller.clustercontroller;
+
+import java.util.ArrayList;
+import java.util.EnumSet;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import edu.uci.ics.hyracks.api.dataflow.IActivityGraphBuilder;
+import edu.uci.ics.hyracks.api.dataflow.IActivityNode;
+import edu.uci.ics.hyracks.api.job.JobFlag;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.job.JobPlan;
+
+public class JobPlanBuilder implements IActivityGraphBuilder {
+    private static final Logger LOGGER = Logger.getLogger(JobPlanBuilder.class.getName());
+
+    private JobPlan plan;
+
+    @Override
+    public void addBlockingEdge(IActivityNode blocker, IActivityNode blocked) {
+        addToValueSet(plan.getBlocker2BlockedMap(), blocker.getActivityNodeId(), blocked.getActivityNodeId());
+        addToValueSet(plan.getBlocked2BlockerMap(), blocked.getActivityNodeId(), blocker.getActivityNodeId());
+    }
+
+    @Override
+    public void addSourceEdge(int operatorInputIndex, IActivityNode task, int taskInputIndex) {
+        if (LOGGER.isLoggable(Level.FINEST)) {
+            LOGGER.finest("Adding source edge: " + task.getOwner().getOperatorId() + ":" + operatorInputIndex + " -> "
+                + task.getActivityNodeId() + ":" + taskInputIndex);
+        }
+        insertIntoIndexedMap(plan.getTaskInputMap(), task.getActivityNodeId(), taskInputIndex, operatorInputIndex);
+        insertIntoIndexedMap(plan.getOperatorInputMap(), task.getOwner().getOperatorId(), operatorInputIndex, task
+            .getActivityNodeId());
+    }
+
+    @Override
+    public void addTargetEdge(int operatorOutputIndex, IActivityNode task, int taskOutputIndex) {
+        if (LOGGER.isLoggable(Level.FINEST)) {
+            LOGGER.finest("Adding target edge: " + task.getOwner().getOperatorId() + ":" + operatorOutputIndex + " -> "
+                + task.getActivityNodeId() + ":" + taskOutputIndex);
+        }
+        insertIntoIndexedMap(plan.getTaskOutputMap(), task.getActivityNodeId(), taskOutputIndex, operatorOutputIndex);
+        insertIntoIndexedMap(plan.getOperatorOutputMap(), task.getOwner().getOperatorId(), operatorOutputIndex, task
+            .getActivityNodeId());
+    }
+
+    @Override
+    public void addTask(IActivityNode task) {
+        plan.getActivityNodeMap().put(task.getActivityNodeId(), task);
+        addToValueSet(plan.getOperatorTaskMap(), task.getOwner().getOperatorId(), task.getActivityNodeId());
+    }
+
+    private <K, V> void addToValueSet(Map<K, Set<V>> map, K n1, V n2) {
+        Set<V> targets = map.get(n1);
+        if (targets == null) {
+            targets = new HashSet<V>();
+            map.put(n1, targets);
+        }
+        targets.add(n2);
+    }
+
+    private <T> void extend(List<T> list, int index) {
+        int n = list.size();
+        for (int i = n; i <= index; ++i) {
+            list.add(null);
+        }
+    }
+
+    public void init(JobSpecification jobSpec, EnumSet<JobFlag> jobFlags) {
+        plan = new JobPlan(jobSpec, jobFlags);
+    }
+
+    private <K, V> void insertIntoIndexedMap(Map<K, List<V>> map, K key, int index, V value) {
+        List<V> vList = map.get(key);
+        if (vList == null) {
+            vList = new ArrayList<V>();
+            map.put(key, vList);
+        }
+        extend(vList, index);
+        vList.set(index, value);
+    }
+
+    public JobPlan getPlan() {
+        return plan;
+    }
+}
\ No newline at end of file
diff --git a/hyracks-runtime/src/main/java/edu/uci/ics/hyracks/controller/clustercontroller/JobPlanner.java b/hyracks-runtime/src/main/java/edu/uci/ics/hyracks/controller/clustercontroller/JobPlanner.java
new file mode 100644
index 0000000..314c600
--- /dev/null
+++ b/hyracks-runtime/src/main/java/edu/uci/ics/hyracks/controller/clustercontroller/JobPlanner.java
@@ -0,0 +1,169 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.controller.clustercontroller;
+
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import edu.uci.ics.hyracks.api.dataflow.ActivityNodeId;
+import edu.uci.ics.hyracks.api.dataflow.IActivityNode;
+import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
+import edu.uci.ics.hyracks.api.dataflow.OperatorDescriptorId;
+import edu.uci.ics.hyracks.api.job.JobFlag;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.api.util.Pair;
+import edu.uci.ics.hyracks.job.IOperatorDescriptorVisitor;
+import edu.uci.ics.hyracks.job.JobPlan;
+import edu.uci.ics.hyracks.job.JobStage;
+import edu.uci.ics.hyracks.job.PlanUtils;
+
+public class JobPlanner {
+    private static final Logger LOGGER = Logger.getLogger(JobPlanner.class.getName());
+
+    private Pair<ActivityNodeId, ActivityNodeId> findMergePair(JobPlan plan, JobSpecification spec, Set<JobStage> eqSets) {
+        Map<ActivityNodeId, IActivityNode> activityNodeMap = plan.getActivityNodeMap();
+        for (JobStage eqSet : eqSets) {
+            for (ActivityNodeId t : eqSet.getTasks()) {
+                IOperatorDescriptor owner = activityNodeMap.get(t).getOwner();
+                List<Integer> inputList = plan.getTaskInputMap().get(t);
+                if (inputList != null) {
+                    for (Integer idx : inputList) {
+                        IConnectorDescriptor conn = spec.getInputConnectorDescriptor(owner, idx);
+                        OperatorDescriptorId producerId = spec.getProducer(conn).getOperatorId();
+                        int producerOutputIndex = spec.getProducerOutputIndex(conn);
+                        ActivityNodeId inTask = plan.getOperatorOutputMap().get(producerId).get(producerOutputIndex);
+                        if (!eqSet.getTasks().contains(inTask)) {
+                            return new Pair<ActivityNodeId, ActivityNodeId>(t, inTask);
+                        }
+                    }
+                }
+                List<Integer> outputList = plan.getTaskOutputMap().get(t);
+                if (outputList != null) {
+                    for (Integer idx : outputList) {
+                        IConnectorDescriptor conn = spec.getOutputConnectorDescriptor(owner, idx);
+                        OperatorDescriptorId consumerId = spec.getConsumer(conn).getOperatorId();
+                        int consumerInputIndex = spec.getConsumerInputIndex(conn);
+                        ActivityNodeId outTask = plan.getOperatorInputMap().get(consumerId).get(consumerInputIndex);
+                        if (!eqSet.getTasks().contains(outTask)) {
+                            return new Pair<ActivityNodeId, ActivityNodeId>(t, outTask);
+                        }
+                    }
+                }
+            }
+        }
+        return null;
+    }
+
+    private JobStage inferStages(JobPlan plan) throws Exception {
+        JobSpecification spec = plan.getJobSpecification();
+
+        /*
+         * Build initial equivalence sets map. We create a map such that for each IOperatorTask, t -> { t }
+         */
+        Map<ActivityNodeId, JobStage> stageMap = new HashMap<ActivityNodeId, JobStage>();
+        Set<JobStage> stages = new HashSet<JobStage>();
+        for (Set<ActivityNodeId> taskIds : plan.getOperatorTaskMap().values()) {
+            for (ActivityNodeId taskId : taskIds) {
+                Set<ActivityNodeId> eqSet = new HashSet<ActivityNodeId>();
+                eqSet.add(taskId);
+                JobStage stage = new JobStage(eqSet);
+                stageMap.put(taskId, stage);
+                stages.add(stage);
+            }
+        }
+
+        boolean changed = true;
+        while (changed) {
+            changed = false;
+            Pair<ActivityNodeId, ActivityNodeId> pair = findMergePair(plan, spec, stages);
+            if (pair != null) {
+                merge(stageMap, stages, pair.first, pair.second);
+                changed = true;
+            }
+        }
+
+        JobStage endStage = new JobStage(new HashSet<ActivityNodeId>());
+        Map<ActivityNodeId, Set<ActivityNodeId>> blocker2BlockedMap = plan.getBlocker2BlockedMap();
+        for (JobStage s : stages) {
+            endStage.addDependency(s);
+            s.addDependent(endStage);
+            Set<JobStage> blockedStages = new HashSet<JobStage>();
+            for (ActivityNodeId t : s.getTasks()) {
+                Set<ActivityNodeId> blockedTasks = blocker2BlockedMap.get(t);
+                if (blockedTasks != null) {
+                    for (ActivityNodeId bt : blockedTasks) {
+                        blockedStages.add(stageMap.get(bt));
+                    }
+                }
+            }
+            for (JobStage bs : blockedStages) {
+                bs.addDependency(s);
+                s.addDependent(bs);
+            }
+        }
+        if (LOGGER.isLoggable(Level.INFO)) {
+            LOGGER.info("Inferred " + (stages.size() + 1) + " stages");
+            for (JobStage s : stages) {
+                LOGGER.info(s.toString());
+            }
+            LOGGER.info("SID: ENDSTAGE");
+        }
+        return endStage;
+    }
+
+    private void merge(Map<ActivityNodeId, JobStage> eqSetMap, Set<JobStage> eqSets, ActivityNodeId t1,
+        ActivityNodeId t2) {
+        JobStage stage1 = eqSetMap.get(t1);
+        Set<ActivityNodeId> s1 = stage1.getTasks();
+        JobStage stage2 = eqSetMap.get(t2);
+        Set<ActivityNodeId> s2 = stage2.getTasks();
+
+        Set<ActivityNodeId> mergedSet = new HashSet<ActivityNodeId>();
+        mergedSet.addAll(s1);
+        mergedSet.addAll(s2);
+
+        eqSets.remove(stage1);
+        eqSets.remove(stage2);
+        JobStage mergedStage = new JobStage(mergedSet);
+        eqSets.add(mergedStage);
+
+        for (ActivityNodeId t : mergedSet) {
+            eqSetMap.put(t, mergedStage);
+        }
+    }
+
+    public JobPlan plan(JobSpecification jobSpec, EnumSet<JobFlag> jobFlags) throws Exception {
+        final JobPlanBuilder builder = new JobPlanBuilder();
+        builder.init(jobSpec, jobFlags);
+        PlanUtils.visit(jobSpec, new IOperatorDescriptorVisitor() {
+            @Override
+            public void visit(IOperatorDescriptor op) throws Exception {
+                op.contributeTaskGraph(builder);
+            }
+        });
+        JobPlan plan = builder.getPlan();
+        JobStage endStage = inferStages(plan);
+        plan.setEndStage(endStage);
+
+        return plan;
+    }
+}
\ No newline at end of file
diff --git a/hyracks-runtime/src/main/java/edu/uci/ics/hyracks/controller/clustercontroller/NodeControllerState.java b/hyracks-runtime/src/main/java/edu/uci/ics/hyracks/controller/clustercontroller/NodeControllerState.java
new file mode 100644
index 0000000..a1dac8c
--- /dev/null
+++ b/hyracks-runtime/src/main/java/edu/uci/ics/hyracks/controller/clustercontroller/NodeControllerState.java
@@ -0,0 +1,29 @@
+package edu.uci.ics.hyracks.controller.clustercontroller;
+
+import edu.uci.ics.hyracks.controller.nodecontroller.INodeController;
+
+public class NodeControllerState {
+    private final INodeController nodeController;
+
+    private int lastHeartbeatDuration;
+
+    public NodeControllerState(INodeController nodeController) {
+        this.nodeController = nodeController;
+    }
+
+    void notifyHeartbeat() {
+        lastHeartbeatDuration = 0;
+    }
+
+    int incrementLastHeartbeatDuration() {
+        return lastHeartbeatDuration++;
+    }
+
+    int getLastHeartbeatDuration() {
+        return lastHeartbeatDuration;
+    }
+
+    public INodeController getNodeController() {
+        return nodeController;
+    }
+}
\ No newline at end of file
diff --git a/hyracks-runtime/src/main/java/edu/uci/ics/hyracks/controller/clustercontroller/StageProgress.java b/hyracks-runtime/src/main/java/edu/uci/ics/hyracks/controller/clustercontroller/StageProgress.java
new file mode 100644
index 0000000..69f7380
--- /dev/null
+++ b/hyracks-runtime/src/main/java/edu/uci/ics/hyracks/controller/clustercontroller/StageProgress.java
@@ -0,0 +1,56 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.controller.clustercontroller;
+
+import java.util.HashSet;
+import java.util.Set;
+import java.util.UUID;
+
+import edu.uci.ics.hyracks.api.job.statistics.StageStatistics;
+
+public class StageProgress {
+    private final UUID stageId;
+
+    private final Set<String> pendingNodes;
+
+    private final StageStatistics stageStatistics;
+
+    public StageProgress(UUID stageId) {
+        this.stageId = stageId;
+        pendingNodes = new HashSet<String>();
+        stageStatistics = new StageStatistics();
+        stageStatistics.setStageId(stageId);
+    }
+
+    public UUID getStageId() {
+        return stageId;
+    }
+
+    public void addPendingNodes(Set<String> nodes) {
+        pendingNodes.addAll(nodes);
+    }
+
+    public void markNodeComplete(String nodeId) {
+        pendingNodes.remove(nodeId);
+    }
+
+    public boolean stageComplete() {
+        return pendingNodes.isEmpty();
+    }
+
+    public StageStatistics getStageStatistics() {
+        return stageStatistics;
+    }
+}
\ No newline at end of file
diff --git a/hyracks-runtime/src/main/java/edu/uci/ics/hyracks/controller/nodecontroller/INodeController.java b/hyracks-runtime/src/main/java/edu/uci/ics/hyracks/controller/nodecontroller/INodeController.java
new file mode 100644
index 0000000..6eaacb1
--- /dev/null
+++ b/hyracks-runtime/src/main/java/edu/uci/ics/hyracks/controller/nodecontroller/INodeController.java
@@ -0,0 +1,54 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.controller.nodecontroller;
+
+import java.rmi.Remote;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+
+import edu.uci.ics.hyracks.api.dataflow.ActivityNodeId;
+import edu.uci.ics.hyracks.api.dataflow.OperatorDescriptorId;
+import edu.uci.ics.hyracks.api.dataflow.PortInstanceId;
+import edu.uci.ics.hyracks.comm.Endpoint;
+import edu.uci.ics.hyracks.config.NCConfig;
+import edu.uci.ics.hyracks.controller.NodeCapability;
+import edu.uci.ics.hyracks.controller.clustercontroller.IClusterController;
+import edu.uci.ics.hyracks.job.JobPlan;
+
+public interface INodeController extends Remote {
+    public String getId() throws Exception;
+
+    public NCConfig getConfiguration() throws Exception;
+
+    public NodeCapability getNodeCapability() throws Exception;
+
+    public Map<PortInstanceId, Endpoint> initializeJobletPhase1(UUID jobId, JobPlan plan, UUID stageId, int attempt,
+        Map<ActivityNodeId, Set<Integer>> tasks, Map<OperatorDescriptorId, Set<Integer>> opPartitions) throws Exception;
+
+    public void initializeJobletPhase2(UUID jobId, JobPlan plan, UUID stageId, Map<ActivityNodeId, Set<Integer>> tasks,
+        Map<OperatorDescriptorId, Set<Integer>> opPartitions, Map<PortInstanceId, Endpoint> globalPortMap)
+        throws Exception;
+
+    public void commitJobletInitialization(UUID jobId, UUID stageId) throws Exception;
+
+    public void abortJoblet(UUID jobId, UUID stageId) throws Exception;
+
+    public void cleanUpJob(UUID jobId) throws Exception;
+
+    public void startStage(UUID jobId, UUID stageId) throws Exception;
+
+    public void notifyRegistration(IClusterController ccs) throws Exception;
+}
\ No newline at end of file
diff --git a/hyracks-runtime/src/main/java/edu/uci/ics/hyracks/controller/nodecontroller/Joblet.java b/hyracks-runtime/src/main/java/edu/uci/ics/hyracks/controller/nodecontroller/Joblet.java
new file mode 100644
index 0000000..7f0a44e
--- /dev/null
+++ b/hyracks-runtime/src/main/java/edu/uci/ics/hyracks/controller/nodecontroller/Joblet.java
@@ -0,0 +1,103 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.controller.nodecontroller;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.Executor;
+
+import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
+import edu.uci.ics.hyracks.api.dataflow.OperatorDescriptorId;
+import edu.uci.ics.hyracks.api.job.IOperatorEnvironment;
+import edu.uci.ics.hyracks.api.job.statistics.StageletStatistics;
+
+public class Joblet {
+    private static final long serialVersionUID = 1L;
+
+    private final NodeControllerService nodeController;
+
+    private final UUID jobId;
+
+    private final Map<UUID, Stagelet> stageletMap;
+
+    private final Map<OperatorDescriptorId, Map<Integer, IOperatorEnvironment>> envMap;
+
+    public Joblet(NodeControllerService nodeController, UUID jobId) throws Exception {
+        this.nodeController = nodeController;
+        this.jobId = jobId;
+        stageletMap = new HashMap<UUID, Stagelet>();
+        envMap = new HashMap<OperatorDescriptorId, Map<Integer, IOperatorEnvironment>>();
+    }
+
+    public UUID getJobId() {
+        return jobId;
+    }
+
+    public IOperatorEnvironment getEnvironment(IOperatorDescriptor hod, int partition) {
+        if (!envMap.containsKey(hod.getOperatorId())) {
+            envMap.put(hod.getOperatorId(), new HashMap<Integer, IOperatorEnvironment>());
+        }
+        Map<Integer, IOperatorEnvironment> opEnvMap = envMap.get(hod.getOperatorId());
+        if (!opEnvMap.containsKey(partition)) {
+            opEnvMap.put(partition, new OperatorEnvironmentImpl());
+        }
+        return opEnvMap.get(partition);
+    }
+
+    private static final class OperatorEnvironmentImpl implements IOperatorEnvironment {
+        private final Map<String, Object> map;
+
+        public OperatorEnvironmentImpl() {
+            map = new HashMap<String, Object>();
+        }
+
+        @Override
+        public Object get(String name) {
+            return map.get(name);
+        }
+
+        @Override
+        public void set(String name, Object value) {
+            map.put(name, value);
+        }
+    }
+
+    public void setStagelet(UUID stageId, Stagelet stagelet) {
+        stageletMap.put(stageId, stagelet);
+    }
+
+    public Stagelet getStagelet(UUID stageId) throws Exception {
+        return stageletMap.get(stageId);
+    }
+
+    public Executor getExecutor() {
+        return nodeController.getExecutor();
+    }
+
+    public synchronized void notifyStageletComplete(UUID stageId, int attempt, StageletStatistics stats) throws Exception {
+        stageletMap.remove(stageId);
+        nodeController.notifyStageComplete(jobId, stageId, attempt, stats);
+    }
+
+    public void notifyStageletFailed(UUID stageId, int attempt) throws Exception {
+        stageletMap.remove(stageId);
+        nodeController.notifyStageFailed(jobId, stageId, attempt);        
+    }
+
+    public NodeControllerService getNodeController() {
+        return nodeController;
+    }
+}
\ No newline at end of file
diff --git a/hyracks-runtime/src/main/java/edu/uci/ics/hyracks/controller/nodecontroller/NodeControllerService.java b/hyracks-runtime/src/main/java/edu/uci/ics/hyracks/controller/nodecontroller/NodeControllerService.java
new file mode 100644
index 0000000..461fc18
--- /dev/null
+++ b/hyracks-runtime/src/main/java/edu/uci/ics/hyracks/controller/nodecontroller/NodeControllerService.java
@@ -0,0 +1,459 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.controller.nodecontroller;
+
+import java.net.InetAddress;
+import java.nio.ByteBuffer;
+import java.rmi.registry.LocateRegistry;
+import java.rmi.registry.Registry;
+import java.text.MessageFormat;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.UUID;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import edu.uci.ics.hyracks.api.comm.IConnectionDemultiplexer;
+import edu.uci.ics.hyracks.api.comm.IFrameReader;
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksContext;
+import edu.uci.ics.hyracks.api.dataflow.ActivityNodeId;
+import edu.uci.ics.hyracks.api.dataflow.Direction;
+import edu.uci.ics.hyracks.api.dataflow.IActivityNode;
+import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;
+import edu.uci.ics.hyracks.api.dataflow.IEndpointDataWriterFactory;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
+import edu.uci.ics.hyracks.api.dataflow.OperatorDescriptorId;
+import edu.uci.ics.hyracks.api.dataflow.OperatorInstanceId;
+import edu.uci.ics.hyracks.api.dataflow.PortInstanceId;
+import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.job.JobFlag;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.api.job.statistics.StageletStatistics;
+import edu.uci.ics.hyracks.comm.ConnectionManager;
+import edu.uci.ics.hyracks.comm.DemuxDataReceiveListenerFactory;
+import edu.uci.ics.hyracks.comm.Endpoint;
+import edu.uci.ics.hyracks.config.NCConfig;
+import edu.uci.ics.hyracks.context.HyracksContext;
+import edu.uci.ics.hyracks.controller.AbstractRemoteService;
+import edu.uci.ics.hyracks.controller.NodeCapability;
+import edu.uci.ics.hyracks.controller.NodeParameters;
+import edu.uci.ics.hyracks.controller.clustercontroller.IClusterController;
+import edu.uci.ics.hyracks.job.JobPlan;
+import edu.uci.ics.hyracks.runtime.OperatorRunnable;
+
+public class NodeControllerService extends AbstractRemoteService implements INodeController {
+    private static final long serialVersionUID = 1L;
+
+    private NCConfig ncConfig;
+
+    private final String id;
+
+    private final IHyracksContext ctx;
+
+    private final NodeCapability nodeCapability;
+
+    private final ConnectionManager connectionManager;
+
+    private final Timer timer;
+
+    private IClusterController ccs;
+
+    private Map<UUID, Joblet> jobletMap;
+
+    private Executor executor;
+
+    private NodeParameters nodeParameters;
+
+    public NodeControllerService(NCConfig ncConfig) throws Exception {
+        this.ncConfig = ncConfig;
+        id = ncConfig.nodeId;
+        this.ctx = new HyracksContext(ncConfig.frameSize);
+        if (id == null) {
+            throw new Exception("id not set");
+        }
+        nodeCapability = computeNodeCapability();
+        connectionManager = new ConnectionManager(ctx, getIpAddress(ncConfig));
+        jobletMap = new HashMap<UUID, Joblet>();
+        executor = Executors.newCachedThreadPool();
+        timer = new Timer(true);
+    }
+
+    private static Logger LOGGER = Logger.getLogger(NodeControllerService.class.getName());
+
+    @Override
+    public void start() throws Exception {
+        LOGGER.log(Level.INFO, "Starting NodeControllerService");
+        connectionManager.start();
+        Registry registry = LocateRegistry.getRegistry(ncConfig.ccHost, ncConfig.ccPort);
+        IClusterController cc = (IClusterController) registry.lookup(IClusterController.class.getName());
+        this.nodeParameters = cc.registerNode(this);
+
+        // Schedule heartbeat generator.
+        timer.schedule(new HeartbeatTask(cc), 0, nodeParameters.getHeartbeatPeriod());
+
+        LOGGER.log(Level.INFO, "Started NodeControllerService");
+    }
+
+    @Override
+    public void stop() throws Exception {
+        LOGGER.log(Level.INFO, "Stopping NodeControllerService");
+        connectionManager.stop();
+        LOGGER.log(Level.INFO, "Stopped NodeControllerService");
+    }
+
+    @Override
+    public String getId() {
+        return id;
+    }
+
+    @Override
+    public NodeCapability getNodeCapability() throws Exception {
+        return nodeCapability;
+    }
+
+    public ConnectionManager getConnectionManager() {
+        return connectionManager;
+    }
+
+    private static NodeCapability computeNodeCapability() {
+        NodeCapability nc = new NodeCapability();
+        nc.setCPUCount(Runtime.getRuntime().availableProcessors());
+        return nc;
+    }
+
+    private static InetAddress getIpAddress(NCConfig ncConfig) throws Exception {
+        String ipaddrStr = ncConfig.dataIPAddress;
+        ipaddrStr = ipaddrStr.trim();
+        Pattern pattern = Pattern.compile("(\\d{1,3})\\.(\\d{1,3})\\.(\\d{1,3})\\.(\\d{1,3})");
+        Matcher m = pattern.matcher(ipaddrStr);
+        if (!m.matches()) {
+            throw new Exception(MessageFormat.format(
+                    "Connection Manager IP Address String %s does is not a valid IP Address.", ipaddrStr));
+        }
+        byte[] ipBytes = new byte[4];
+        ipBytes[0] = (byte) Integer.parseInt(m.group(1));
+        ipBytes[1] = (byte) Integer.parseInt(m.group(2));
+        ipBytes[2] = (byte) Integer.parseInt(m.group(3));
+        ipBytes[3] = (byte) Integer.parseInt(m.group(4));
+        return InetAddress.getByAddress(ipBytes);
+    }
+
+    @Override
+    public Map<PortInstanceId, Endpoint> initializeJobletPhase1(UUID jobId, final JobPlan plan, UUID stageId,
+            int attempt, Map<ActivityNodeId, Set<Integer>> tasks, Map<OperatorDescriptorId, Set<Integer>> opPartitions)
+            throws Exception {
+        try {
+            LOGGER.log(Level.INFO, String.valueOf(jobId) + "[" + id + ":" + stageId + "]: Initializing Joblet Phase 1");
+
+            IRecordDescriptorProvider rdp = new IRecordDescriptorProvider() {
+                @Override
+                public RecordDescriptor getOutputRecordDescriptor(OperatorDescriptorId opId, int outputIndex) {
+                    return plan.getJobSpecification().getOperatorOutputRecordDescriptor(opId, outputIndex);
+                }
+
+                @Override
+                public RecordDescriptor getInputRecordDescriptor(OperatorDescriptorId opId, int inputIndex) {
+                    return plan.getJobSpecification().getOperatorInputRecordDescriptor(opId, inputIndex);
+                }
+            };
+
+            final Joblet joblet = getLocalJoblet(jobId);
+
+            Stagelet stagelet = new Stagelet(joblet, stageId, attempt, id);
+            joblet.setStagelet(stageId, stagelet);
+
+            final Map<PortInstanceId, Endpoint> portMap = new HashMap<PortInstanceId, Endpoint>();
+            Map<OperatorInstanceId, OperatorRunnable> honMap = stagelet.getOperatorMap();
+
+            List<Endpoint> endpointList = new ArrayList<Endpoint>();
+
+            for (ActivityNodeId hanId : tasks.keySet()) {
+                IActivityNode han = plan.getActivityNodeMap().get(hanId);
+                if (LOGGER.isLoggable(Level.FINEST)) {
+                    LOGGER.finest("Initializing " + hanId + " -> " + han);
+                }
+                IOperatorDescriptor op = han.getOwner();
+                List<IConnectorDescriptor> inputs = plan.getTaskInputs(hanId);
+                for (int i : tasks.get(hanId)) {
+                    IOperatorNodePushable hon = han.createPushRuntime(ctx, joblet.getEnvironment(op, i), rdp, i,
+                            opPartitions.get(op.getOperatorId()).size());
+                    OperatorRunnable or = new OperatorRunnable(ctx, hon);
+                    stagelet.setOperator(op.getOperatorId(), i, or);
+                    if (inputs != null) {
+                        for (int j = 0; j < inputs.size(); ++j) {
+                            if (j >= 1) {
+                                throw new IllegalStateException();
+                            }
+                            IConnectorDescriptor conn = inputs.get(j);
+                            OperatorDescriptorId producerOpId = plan.getJobSpecification().getProducer(conn)
+                                    .getOperatorId();
+                            OperatorDescriptorId consumerOpId = plan.getJobSpecification().getConsumer(conn)
+                                    .getOperatorId();
+                            Endpoint endpoint = new Endpoint(connectionManager.getNetworkAddress(), i);
+                            endpointList.add(endpoint);
+                            DemuxDataReceiveListenerFactory drlf = new DemuxDataReceiveListenerFactory(ctx, jobId,
+                                    stageId);
+                            connectionManager.acceptConnection(endpoint.getEndpointId(), drlf);
+                            PortInstanceId piId = new PortInstanceId(op.getOperatorId(), Direction.INPUT, plan
+                                    .getTaskInputMap().get(hanId).get(j), i);
+                            if (LOGGER.isLoggable(Level.FINEST)) {
+                                LOGGER.finest("Created endpoint " + piId + " -> " + endpoint);
+                            }
+                            portMap.put(piId, endpoint);
+                            IFrameReader reader = createReader(conn, drlf, i, plan, stagelet,
+                                    opPartitions.get(producerOpId).size(), opPartitions.get(consumerOpId).size());
+                            or.setFrameReader(reader);
+                        }
+                    }
+                    honMap.put(new OperatorInstanceId(op.getOperatorId(), i), or);
+                }
+            }
+
+            stagelet.setEndpointList(endpointList);
+
+            return portMap;
+        } catch (Exception e) {
+            e.printStackTrace();
+            throw e;
+        }
+    }
+
+    private IFrameReader createReader(final IConnectorDescriptor conn, IConnectionDemultiplexer demux,
+            final int receiverIndex, JobPlan plan, final Stagelet stagelet, int nProducerCount, int nConsumerCount)
+            throws HyracksDataException {
+        final IFrameReader reader = conn.createReceiveSideReader(ctx, plan.getJobSpecification()
+                .getConnectorRecordDescriptor(conn), demux, receiverIndex, nProducerCount,
+                nConsumerCount);
+
+        return plan.getJobFlags().contains(JobFlag.COLLECT_FRAME_COUNTS) ? new IFrameReader() {
+            private int frameCount;
+
+            @Override
+            public void open() throws HyracksDataException {
+                frameCount = 0;
+                reader.open();
+            }
+
+            @Override
+            public boolean nextFrame(ByteBuffer buffer) throws HyracksDataException {
+                boolean status = reader.nextFrame(buffer);
+                if (status) {
+                    ++frameCount;
+                }
+                return status;
+            }
+
+            @Override
+            public void close() throws HyracksDataException {
+                reader.close();
+                stagelet.getStatistics()
+                        .getStatisticsMap()
+                        .put("framecount." + conn.getConnectorId().getId() + ".receiver." + receiverIndex,
+                                String.valueOf(frameCount));
+            }
+        } : reader;
+    }
+
+    @Override
+    public void initializeJobletPhase2(UUID jobId, final JobPlan plan, UUID stageId,
+            Map<ActivityNodeId, Set<Integer>> tasks, Map<OperatorDescriptorId, Set<Integer>> opPartitions,
+            final Map<PortInstanceId, Endpoint> globalPortMap) throws Exception {
+        try {
+            LOGGER.log(Level.INFO, String.valueOf(jobId) + "[" + id + ":" + stageId + "]: Initializing Joblet Phase 2");
+            final Joblet ji = getLocalJoblet(jobId);
+            Stagelet si = (Stagelet) ji.getStagelet(stageId);
+            final Map<OperatorInstanceId, OperatorRunnable> honMap = si.getOperatorMap();
+
+            final Stagelet stagelet = (Stagelet) ji.getStagelet(stageId);
+
+            final JobSpecification spec = plan.getJobSpecification();
+
+            for (ActivityNodeId hanId : tasks.keySet()) {
+                IActivityNode han = plan.getActivityNodeMap().get(hanId);
+                IOperatorDescriptor op = han.getOwner();
+                List<IConnectorDescriptor> outputs = plan.getTaskOutputs(hanId);
+                for (int i : tasks.get(hanId)) {
+                    OperatorRunnable or = honMap.get(new OperatorInstanceId(op.getOperatorId(), i));
+                    if (outputs != null) {
+                        for (int j = 0; j < outputs.size(); ++j) {
+                            final IConnectorDescriptor conn = outputs.get(j);
+                            OperatorDescriptorId producerOpId = plan.getJobSpecification().getProducer(conn)
+                                    .getOperatorId();
+                            OperatorDescriptorId consumerOpId = plan.getJobSpecification().getConsumer(conn)
+                                    .getOperatorId();
+                            final int senderIndex = i;
+                            IEndpointDataWriterFactory edwFactory = new IEndpointDataWriterFactory() {
+                                @Override
+                                public IFrameWriter createFrameWriter(int index) throws HyracksDataException {
+                                    PortInstanceId piId = new PortInstanceId(spec.getConsumer(conn).getOperatorId(),
+                                            Direction.INPUT, spec.getConsumerInputIndex(conn), index);
+                                    Endpoint ep = globalPortMap.get(piId);
+                                    if (ep == null) {
+                                        LOGGER.info("Got null Endpoint for " + piId);
+                                        throw new NullPointerException();
+                                    }
+                                    if (LOGGER.isLoggable(Level.FINEST)) {
+                                        LOGGER.finest("Probed endpoint " + piId + " -> " + ep);
+                                    }
+                                    return createWriter(connectionManager.connect(ep.getNetworkAddress(),
+                                            ep.getEndpointId(), senderIndex), plan, conn, senderIndex, index, stagelet);
+                                }
+                            };
+                            or.setFrameWriter(j, conn.createSendSideWriter(ctx, plan.getJobSpecification()
+                                    .getConnectorRecordDescriptor(conn), edwFactory, i,
+                                    opPartitions.get(producerOpId).size(), opPartitions.get(consumerOpId).size()), spec
+                                    .getConnectorRecordDescriptor(conn));
+                        }
+                    }
+                    stagelet.installRunnable(new OperatorInstanceId(op.getOperatorId(), i));
+                }
+            }
+        } catch (Exception e) {
+            e.printStackTrace();
+            throw e;
+        }
+    }
+
+    private IFrameWriter createWriter(final IFrameWriter writer, JobPlan plan, final IConnectorDescriptor conn,
+            final int senderIndex, final int receiverIndex, final Stagelet stagelet) throws HyracksDataException {
+        return plan.getJobFlags().contains(JobFlag.COLLECT_FRAME_COUNTS) ? new IFrameWriter() {
+            private int frameCount;
+
+            @Override
+            public void open() throws HyracksDataException {
+                frameCount = 0;
+                writer.open();
+            }
+
+            @Override
+            public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+                ++frameCount;
+                writer.nextFrame(buffer);
+            }
+
+            @Override
+            public void close() throws HyracksDataException {
+                writer.close();
+                stagelet.getStatistics()
+                        .getStatisticsMap()
+                        .put("framecount." + conn.getConnectorId().getId() + ".sender." + senderIndex + "."
+                                + receiverIndex, String.valueOf(frameCount));
+            }
+        } : writer;
+    }
+
+    @Override
+    public void commitJobletInitialization(UUID jobId, UUID stageId) throws Exception {
+        final Joblet ji = getLocalJoblet(jobId);
+        Stagelet si = (Stagelet) ji.getStagelet(stageId);
+        for (Endpoint e : si.getEndpointList()) {
+            connectionManager.unacceptConnection(e.getEndpointId());
+        }
+        si.setEndpointList(null);
+    }
+
+    private synchronized Joblet getLocalJoblet(UUID jobId) throws Exception {
+        Joblet ji = jobletMap.get(jobId);
+        if (ji == null) {
+            ji = new Joblet(this, jobId);
+            jobletMap.put(jobId, ji);
+        }
+        return ji;
+    }
+
+    public Executor getExecutor() {
+        return executor;
+    }
+
+    @Override
+    public synchronized void cleanUpJob(UUID jobId) throws Exception {
+        if (LOGGER.isLoggable(Level.INFO)) {
+            LOGGER.info("Cleaning up after job: " + jobId);
+        }
+        jobletMap.remove(jobId);
+        connectionManager.dumpStats();
+    }
+
+    @Override
+    public void startStage(UUID jobId, UUID stageId) throws Exception {
+        Joblet ji = jobletMap.get(jobId);
+        if (ji != null) {
+            Stagelet s = ji.getStagelet(stageId);
+            if (s != null) {
+                s.start();
+            }
+        }
+    }
+
+    public void notifyStageComplete(UUID jobId, UUID stageId, int attempt, StageletStatistics stats) throws Exception {
+        ccs.notifyStageletComplete(jobId, stageId, attempt, id, stats);
+    }
+
+    public void notifyStageFailed(UUID jobId, UUID stageId, int attempt) throws Exception {
+        ccs.notifyStageletFailure(jobId, stageId, attempt, id);
+    }
+
+    @Override
+    public void notifyRegistration(IClusterController ccs) throws Exception {
+        this.ccs = ccs;
+    }
+
+    @Override
+    public NCConfig getConfiguration() throws Exception {
+        return ncConfig;
+    }
+
+    private class HeartbeatTask extends TimerTask {
+        private IClusterController cc;
+
+        public HeartbeatTask(IClusterController cc) {
+            this.cc = cc;
+        }
+
+        @Override
+        public void run() {
+            try {
+                cc.nodeHeartbeat(id);
+            } catch (Exception e) {
+                e.printStackTrace();
+            }
+        }
+    }
+
+    @Override
+    public synchronized void abortJoblet(UUID jobId, UUID stageId) throws Exception {
+        Joblet ji = jobletMap.get(jobId);
+        if (ji != null) {
+            Stagelet stagelet = ji.getStagelet(stageId);
+            if (stagelet != null) {
+                stagelet.abort();
+                connectionManager.abortConnections(jobId, stageId);
+            }
+        }
+    }
+}
\ No newline at end of file
diff --git a/hyracks-runtime/src/main/java/edu/uci/ics/hyracks/controller/nodecontroller/Stagelet.java b/hyracks-runtime/src/main/java/edu/uci/ics/hyracks/controller/nodecontroller/Stagelet.java
new file mode 100644
index 0000000..c9bbc29
--- /dev/null
+++ b/hyracks-runtime/src/main/java/edu/uci/ics/hyracks/controller/nodecontroller/Stagelet.java
@@ -0,0 +1,170 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.controller.nodecontroller;
+
+import java.rmi.RemoteException;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import edu.uci.ics.hyracks.api.dataflow.OperatorDescriptorId;
+import edu.uci.ics.hyracks.api.dataflow.OperatorInstanceId;
+import edu.uci.ics.hyracks.api.job.statistics.StageletStatistics;
+import edu.uci.ics.hyracks.comm.Endpoint;
+import edu.uci.ics.hyracks.runtime.OperatorRunnable;
+
+public class Stagelet {
+    private static final long serialVersionUID = 1L;
+
+    private static final Logger LOGGER = Logger.getLogger(Stagelet.class.getName());
+
+    private final Joblet joblet;
+
+    private final UUID stageId;
+
+    private final int attempt;
+
+    private final Map<OperatorInstanceId, OperatorRunnable> honMap;
+
+    private List<Endpoint> endpointList;
+
+    private boolean started;
+
+    private volatile boolean abort;
+
+    private final Set<OperatorInstanceId> pendingOperators;
+
+    private final StageletStatistics stats;
+
+    public Stagelet(Joblet joblet, UUID stageId, int attempt, String nodeId) throws RemoteException {
+        this.joblet = joblet;
+        this.stageId = stageId;
+        this.attempt = attempt;
+        pendingOperators = new HashSet<OperatorInstanceId>();
+        started = false;
+        honMap = new HashMap<OperatorInstanceId, OperatorRunnable>();
+        stats = new StageletStatistics();
+        stats.setNodeId(nodeId);
+    }
+
+    public void setOperator(OperatorDescriptorId odId, int partition, OperatorRunnable hon) {
+        honMap.put(new OperatorInstanceId(odId, partition), hon);
+    }
+
+    public Map<OperatorInstanceId, OperatorRunnable> getOperatorMap() {
+        return honMap;
+    }
+
+    public void setEndpointList(List<Endpoint> endpointList) {
+        this.endpointList = endpointList;
+    }
+
+    public List<Endpoint> getEndpointList() {
+        return endpointList;
+    }
+
+    public synchronized void start() throws Exception {
+        if (started) {
+            throw new Exception("Joblet already started");
+        }
+        started = true;
+        stats.setStartTime(new Date());
+        notifyAll();
+    }
+
+    public synchronized void abort() {
+        this.abort = true;
+        for (OperatorRunnable r : honMap.values()) {
+            r.abort();
+        }
+    }
+
+    public void installRunnable(final OperatorInstanceId opIId) {
+        pendingOperators.add(opIId);
+        final OperatorRunnable hon = honMap.get(opIId);
+        joblet.getExecutor().execute(new Runnable() {
+            @Override
+            public void run() {
+                try {
+                    waitUntilStarted();
+                } catch (InterruptedException e) {
+                    e.printStackTrace();
+                    return;
+                }
+                if (abort) {
+                    return;
+                }
+                try {
+                    LOGGER.log(Level.INFO, joblet.getJobId() + ":" + stageId + ":" + opIId.getOperatorId() + ":"
+                            + opIId.getPartition() + ": STARTING");
+                } catch (Exception e) {
+                    e.printStackTrace();
+                    // notifyOperatorFailure(opIId);
+                }
+                try {
+                    hon.run();
+                    notifyOperatorCompletion(opIId);
+                } catch (Exception e) {
+                    e.printStackTrace();
+                    // notifyOperatorFailure(opIId);
+                }
+                try {
+                    LOGGER.log(Level.INFO, joblet.getJobId() + ":" + stageId + ":" + opIId.getOperatorId() + ":"
+                            + opIId.getPartition() + ": TERMINATED");
+                } catch (Exception e) {
+                    e.printStackTrace();
+                    // notifyOperatorFailure(opIId);
+                }
+            }
+        });
+    }
+
+    protected synchronized void notifyOperatorCompletion(OperatorInstanceId opIId) {
+        pendingOperators.remove(opIId);
+        if (pendingOperators.isEmpty()) {
+            stats.setEndTime(new Date());
+            try {
+                joblet.notifyStageletComplete(stageId, attempt, stats);
+            } catch (Exception e) {
+                e.printStackTrace();
+            }
+        }
+    }
+
+    protected synchronized void notifyOperatorFailure(OperatorInstanceId opIId) {
+        abort();
+        try {
+            joblet.notifyStageletFailed(stageId, attempt);
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+    }
+
+    private synchronized void waitUntilStarted() throws InterruptedException {
+        while (!started && !abort) {
+            wait();
+        }
+    }
+
+    public StageletStatistics getStatistics() {
+        return stats;
+    }
+}
\ No newline at end of file
diff --git a/hyracks-runtime/src/main/java/edu/uci/ics/hyracks/driver/CCDriver.java b/hyracks-runtime/src/main/java/edu/uci/ics/hyracks/driver/CCDriver.java
new file mode 100644
index 0000000..bb1ba84
--- /dev/null
+++ b/hyracks-runtime/src/main/java/edu/uci/ics/hyracks/driver/CCDriver.java
@@ -0,0 +1,39 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.driver;
+
+import org.kohsuke.args4j.CmdLineParser;
+
+import edu.uci.ics.hyracks.config.CCConfig;
+import edu.uci.ics.hyracks.controller.clustercontroller.ClusterControllerService;
+
+public class CCDriver {
+    public static void main(String args[]) throws Exception {
+        CCConfig ccConfig = new CCConfig();
+        CmdLineParser cp = new CmdLineParser(ccConfig);
+        try {
+            cp.parseArgument(args);
+        } catch (Exception e) {
+            System.err.println(e.getMessage());
+            cp.printUsage(System.err);
+            return;
+        }
+        ClusterControllerService ccService = new ClusterControllerService(ccConfig);
+        ccService.start();
+        while (true) {
+            Thread.sleep(100000);
+        }
+    }
+}
\ No newline at end of file
diff --git a/hyracks-runtime/src/main/java/edu/uci/ics/hyracks/driver/NCDriver.java b/hyracks-runtime/src/main/java/edu/uci/ics/hyracks/driver/NCDriver.java
new file mode 100644
index 0000000..3c856ab
--- /dev/null
+++ b/hyracks-runtime/src/main/java/edu/uci/ics/hyracks/driver/NCDriver.java
@@ -0,0 +1,59 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.driver;
+
+import org.kohsuke.args4j.CmdLineParser;
+
+import edu.uci.ics.dcache.client.DCacheClient;
+import edu.uci.ics.dcache.client.DCacheClientConfig;
+import edu.uci.ics.hyracks.config.NCConfig;
+import edu.uci.ics.hyracks.controller.nodecontroller.NodeControllerService;
+
+public class NCDriver {
+    public static void main(String args[]) throws Exception {
+        NCConfig ncConfig = new NCConfig();
+        CmdLineParser cp = new CmdLineParser(ncConfig);
+        try {
+            cp.parseArgument(args);
+        } catch (Exception e) {
+            System.err.println(e.getMessage());
+            cp.printUsage(System.err);
+            return;
+        }
+
+        DCacheClientConfig dccConfig = new DCacheClientConfig();
+        dccConfig.servers = ncConfig.dcacheClientServers;
+        dccConfig.serverLocal = ncConfig.dcacheClientServerLocal;
+        dccConfig.path = ncConfig.dcacheClientPath;
+
+        DCacheClient.get().init(dccConfig);
+
+        final NodeControllerService nService = new NodeControllerService(ncConfig);
+        nService.start();
+        Runtime.getRuntime().addShutdownHook(new Thread() {
+            @Override
+            public void run() {
+                try {
+                    nService.stop();
+                } catch (Exception e) {
+                    e.printStackTrace();
+                }
+            }
+        });
+        while (true) {
+            Thread.sleep(10000);
+        }
+    }
+}
\ No newline at end of file
diff --git a/hyracks-runtime/src/main/java/edu/uci/ics/hyracks/job/IOperatorDescriptorVisitor.java b/hyracks-runtime/src/main/java/edu/uci/ics/hyracks/job/IOperatorDescriptorVisitor.java
new file mode 100644
index 0000000..0e63c94
--- /dev/null
+++ b/hyracks-runtime/src/main/java/edu/uci/ics/hyracks/job/IOperatorDescriptorVisitor.java
@@ -0,0 +1,21 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.job;
+
+import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
+
+public interface IOperatorDescriptorVisitor {
+    public void visit(IOperatorDescriptor op) throws Exception;
+}
\ No newline at end of file
diff --git a/hyracks-runtime/src/main/java/edu/uci/ics/hyracks/job/JobPlan.java b/hyracks-runtime/src/main/java/edu/uci/ics/hyracks/job/JobPlan.java
new file mode 100644
index 0000000..5516870
--- /dev/null
+++ b/hyracks-runtime/src/main/java/edu/uci/ics/hyracks/job/JobPlan.java
@@ -0,0 +1,166 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.job;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import edu.uci.ics.hyracks.api.dataflow.ActivityNodeId;
+import edu.uci.ics.hyracks.api.dataflow.IActivityNode;
+import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;
+import edu.uci.ics.hyracks.api.dataflow.OperatorDescriptorId;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.job.JobFlag;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+
+public class JobPlan implements Serializable {
+    private static final long serialVersionUID = 1L;
+
+    private final JobSpecification jobSpec;
+
+    private final EnumSet<JobFlag> jobFlags;
+
+    private final Map<ActivityNodeId, IActivityNode> activityNodes;
+
+    private final Map<ActivityNodeId, Set<ActivityNodeId>> blocker2blockedMap;
+
+    private final Map<ActivityNodeId, Set<ActivityNodeId>> blocked2blockerMap;
+
+    private final Map<OperatorDescriptorId, Set<ActivityNodeId>> operatorTaskMap;
+
+    private final Map<ActivityNodeId, List<Integer>> taskInputMap;
+
+    private final Map<ActivityNodeId, List<Integer>> taskOutputMap;
+
+    private final Map<OperatorDescriptorId, List<ActivityNodeId>> operatorInputMap;
+
+    private final Map<OperatorDescriptorId, List<ActivityNodeId>> operatorOutputMap;
+
+    private JobStage endStage;
+
+    public JobPlan(JobSpecification jobSpec, EnumSet<JobFlag> jobFlags) {
+        this.jobSpec = jobSpec;
+        this.jobFlags = jobFlags;
+        activityNodes = new HashMap<ActivityNodeId, IActivityNode>();
+        blocker2blockedMap = new HashMap<ActivityNodeId, Set<ActivityNodeId>>();
+        blocked2blockerMap = new HashMap<ActivityNodeId, Set<ActivityNodeId>>();
+        operatorTaskMap = new HashMap<OperatorDescriptorId, Set<ActivityNodeId>>();
+        taskInputMap = new HashMap<ActivityNodeId, List<Integer>>();
+        taskOutputMap = new HashMap<ActivityNodeId, List<Integer>>();
+        operatorInputMap = new HashMap<OperatorDescriptorId, List<ActivityNodeId>>();
+        operatorOutputMap = new HashMap<OperatorDescriptorId, List<ActivityNodeId>>();
+    }
+
+    public JobSpecification getJobSpecification() {
+        return jobSpec;
+    }
+
+    public EnumSet<JobFlag> getJobFlags() {
+        return jobFlags;
+    }
+
+    public Map<ActivityNodeId, IActivityNode> getActivityNodeMap() {
+        return activityNodes;
+    }
+
+    public Map<ActivityNodeId, Set<ActivityNodeId>> getBlocker2BlockedMap() {
+        return blocker2blockedMap;
+    }
+
+    public Map<ActivityNodeId, Set<ActivityNodeId>> getBlocked2BlockerMap() {
+        return blocked2blockerMap;
+    }
+
+    public Map<OperatorDescriptorId, Set<ActivityNodeId>> getOperatorTaskMap() {
+        return operatorTaskMap;
+    }
+
+    public Map<ActivityNodeId, List<Integer>> getTaskInputMap() {
+        return taskInputMap;
+    }
+
+    public Map<ActivityNodeId, List<Integer>> getTaskOutputMap() {
+        return taskOutputMap;
+    }
+
+    public Map<OperatorDescriptorId, List<ActivityNodeId>> getOperatorInputMap() {
+        return operatorInputMap;
+    }
+
+    public Map<OperatorDescriptorId, List<ActivityNodeId>> getOperatorOutputMap() {
+        return operatorOutputMap;
+    }
+
+    public void setEndStage(JobStage endStage) {
+        this.endStage = endStage;
+    }
+
+    public JobStage getEndStage() {
+        return endStage;
+    }
+
+    public List<IConnectorDescriptor> getTaskInputs(ActivityNodeId hanId) {
+        List<Integer> inputIndexes = taskInputMap.get(hanId);
+        if (inputIndexes == null) {
+            return null;
+        }
+        OperatorDescriptorId ownerId = hanId.getOperatorDescriptorId();
+        List<IConnectorDescriptor> inputs = new ArrayList<IConnectorDescriptor>();
+        for (Integer i : inputIndexes) {
+            inputs.add(jobSpec.getInputConnectorDescriptor(ownerId, i));
+        }
+        return inputs;
+    }
+
+    public List<IConnectorDescriptor> getTaskOutputs(ActivityNodeId hanId) {
+        List<Integer> outputIndexes = taskOutputMap.get(hanId);
+        if (outputIndexes == null) {
+            return null;
+        }
+        OperatorDescriptorId ownerId = hanId.getOperatorDescriptorId();
+        List<IConnectorDescriptor> outputs = new ArrayList<IConnectorDescriptor>();
+        for (Integer i : outputIndexes) {
+            outputs.add(jobSpec.getOutputConnectorDescriptor(ownerId, i));
+        }
+        return outputs;
+    }
+
+    public RecordDescriptor getTaskInputRecordDescriptor(ActivityNodeId hanId, int inputIndex) {
+        int opInputIndex = getTaskInputMap().get(hanId).get(inputIndex);
+        return jobSpec.getOperatorInputRecordDescriptor(hanId.getOperatorDescriptorId(), opInputIndex);
+    }
+
+    public RecordDescriptor getTaskOutputRecordDescriptor(ActivityNodeId hanId, int outputIndex) {
+        int opOutputIndex = getTaskOutputMap().get(hanId).get(outputIndex);
+        return jobSpec.getOperatorOutputRecordDescriptor(hanId.getOperatorDescriptorId(), opOutputIndex);
+    }
+
+    @Override
+    public String toString() {
+        StringBuilder buffer = new StringBuilder();
+        buffer.append("ActivityNodes: " + activityNodes);
+        buffer.append('\n');
+        buffer.append("Blocker->Blocked: " + blocker2blockedMap);
+        buffer.append('\n');
+        buffer.append("Blocked->Blocker: " + blocked2blockerMap);
+        buffer.append('\n');
+        return buffer.toString();
+    }
+}
\ No newline at end of file
diff --git a/hyracks-runtime/src/main/java/edu/uci/ics/hyracks/job/JobStage.java b/hyracks-runtime/src/main/java/edu/uci/ics/hyracks/job/JobStage.java
new file mode 100644
index 0000000..0048ce9
--- /dev/null
+++ b/hyracks-runtime/src/main/java/edu/uci/ics/hyracks/job/JobStage.java
@@ -0,0 +1,89 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.job;
+
+import java.io.Serializable;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.UUID;
+
+import edu.uci.ics.hyracks.api.dataflow.ActivityNodeId;
+
+public class JobStage implements Serializable {
+    private static final long serialVersionUID = 1L;
+
+    private final UUID id;
+
+    private final Set<ActivityNodeId> tasks;
+
+    private final Set<JobStage> dependencies;
+
+    private final Set<JobStage> dependents;
+
+    private boolean started;
+
+    public JobStage(Set<ActivityNodeId> tasks) {
+        this.id = UUID.randomUUID();
+        this.tasks = tasks;
+        dependencies = new HashSet<JobStage>();
+        dependents = new HashSet<JobStage>();
+    }
+
+    public UUID getId() {
+        return id;
+    }
+
+    public Set<ActivityNodeId> getTasks() {
+        return tasks;
+    }
+
+    public void addDependency(JobStage stage) {
+        dependencies.add(stage);
+    }
+
+    public void addDependent(JobStage stage) {
+        dependents.add(stage);
+    }
+
+    public Set<JobStage> getDependencies() {
+        return dependencies;
+    }
+
+    @Override
+    public int hashCode() {
+        return id == null ? 0 : id.hashCode();
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (!(o instanceof JobStage)) {
+            return false;
+        }
+        return id == ((JobStage) o).id;
+    }
+
+    @Override
+    public String toString() {
+        return "SID:" + id + ": " + tasks;
+    }
+
+    public boolean isStarted() {
+        return started;
+    }
+
+    public void setStarted() {
+        started = true;
+    }
+}
\ No newline at end of file
diff --git a/hyracks-runtime/src/main/java/edu/uci/ics/hyracks/job/PlanUtils.java b/hyracks-runtime/src/main/java/edu/uci/ics/hyracks/job/PlanUtils.java
new file mode 100644
index 0000000..e2337f7
--- /dev/null
+++ b/hyracks-runtime/src/main/java/edu/uci/ics/hyracks/job/PlanUtils.java
@@ -0,0 +1,39 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.job;
+
+import java.util.HashSet;
+import java.util.Set;
+
+import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
+import edu.uci.ics.hyracks.api.dataflow.OperatorDescriptorId;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+
+public class PlanUtils {
+    public static void visit(JobSpecification spec, IOperatorDescriptorVisitor visitor) throws Exception {
+        Set<OperatorDescriptorId> seen = new HashSet<OperatorDescriptorId>();
+        for (IOperatorDescriptor op : spec.getOperatorMap().values()) {
+            visitOperator(visitor, seen, op);
+        }
+    }
+
+    private static void visitOperator(IOperatorDescriptorVisitor visitor, Set<OperatorDescriptorId> seen,
+            IOperatorDescriptor op) throws Exception {
+        if (!seen.contains(op)) {
+            visitor.visit(op);
+        }
+        seen.add(op.getOperatorId());
+    }
+}
\ No newline at end of file
diff --git a/hyracks-runtime/src/main/java/edu/uci/ics/hyracks/resources/ResourceManager.java b/hyracks-runtime/src/main/java/edu/uci/ics/hyracks/resources/ResourceManager.java
new file mode 100644
index 0000000..eff017a
--- /dev/null
+++ b/hyracks-runtime/src/main/java/edu/uci/ics/hyracks/resources/ResourceManager.java
@@ -0,0 +1,40 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.resources;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.hyracks.api.context.IHyracksContext;
+import edu.uci.ics.hyracks.api.resources.IResourceManager;
+
+public final class ResourceManager implements IResourceManager {
+    private final IHyracksContext ctx;
+
+    public ResourceManager(IHyracksContext ctx) {
+        this.ctx = ctx;
+    }
+
+    @Override
+    public ByteBuffer allocateFrame() {
+        return ByteBuffer.allocate(ctx.getFrameSize());
+    }
+
+    @Override
+    public File createFile(String prefix, String suffix) throws IOException {
+        return File.createTempFile(prefix, suffix);
+    }
+}
\ No newline at end of file
diff --git a/hyracks-runtime/src/main/java/edu/uci/ics/hyracks/runtime/OperatorRunnable.java b/hyracks-runtime/src/main/java/edu/uci/ics/hyracks/runtime/OperatorRunnable.java
new file mode 100644
index 0000000..6ea3bf1
--- /dev/null
+++ b/hyracks-runtime/src/main/java/edu/uci/ics/hyracks/runtime/OperatorRunnable.java
@@ -0,0 +1,69 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.runtime;
+
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.hyracks.api.comm.IFrameReader;
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksContext;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+
+public class OperatorRunnable implements Runnable {
+    private IOperatorNodePushable opNode;
+    private IFrameReader reader;
+    private ByteBuffer buffer;
+    private volatile boolean abort;
+
+    public OperatorRunnable(IHyracksContext ctx, IOperatorNodePushable opNode) {
+        this.opNode = opNode;
+        buffer = ctx.getResourceManager().allocateFrame();
+    }
+
+    public void setFrameWriter(int index, IFrameWriter writer, RecordDescriptor recordDesc) {
+        opNode.setFrameWriter(index, writer, recordDesc);
+    }
+
+    public void setFrameReader(IFrameReader reader) {
+        this.reader = reader;
+    }
+
+    public void abort() {
+        abort = true;
+    }
+
+    @Override
+    public void run() {
+        try {
+            opNode.open();
+            if (reader != null) {
+                reader.open();
+                while (reader.nextFrame(buffer)) {
+                    if (abort) {
+                        break;
+                    }
+                    buffer.flip();
+                    opNode.nextFrame(buffer);
+                    buffer.compact();
+                }
+                reader.close();
+            }
+            opNode.close();
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+}
\ No newline at end of file
diff --git a/hyracks-runtime/src/main/java/edu/uci/ics/hyracks/service/AbstractService.java b/hyracks-runtime/src/main/java/edu/uci/ics/hyracks/service/AbstractService.java
new file mode 100644
index 0000000..fcb6ea4
--- /dev/null
+++ b/hyracks-runtime/src/main/java/edu/uci/ics/hyracks/service/AbstractService.java
@@ -0,0 +1,18 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.service;
+
+public abstract class AbstractService implements IService {
+}
\ No newline at end of file
diff --git a/hyracks-runtime/src/main/java/edu/uci/ics/hyracks/service/ILifeCycle.java b/hyracks-runtime/src/main/java/edu/uci/ics/hyracks/service/ILifeCycle.java
new file mode 100644
index 0000000..e73f5b0
--- /dev/null
+++ b/hyracks-runtime/src/main/java/edu/uci/ics/hyracks/service/ILifeCycle.java
@@ -0,0 +1,21 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.service;
+
+public interface ILifeCycle {
+    public void start() throws Exception;
+
+    public void stop() throws Exception;
+}
\ No newline at end of file
diff --git a/hyracks-runtime/src/main/java/edu/uci/ics/hyracks/service/IService.java b/hyracks-runtime/src/main/java/edu/uci/ics/hyracks/service/IService.java
new file mode 100644
index 0000000..aaf7e0e
--- /dev/null
+++ b/hyracks-runtime/src/main/java/edu/uci/ics/hyracks/service/IService.java
@@ -0,0 +1,19 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.service;
+
+
+public interface IService extends ILifeCycle {
+}
\ No newline at end of file
diff --git a/hyracks-runtime/src/main/java/edu/uci/ics/hyracks/web/WebServer.java b/hyracks-runtime/src/main/java/edu/uci/ics/hyracks/web/WebServer.java
new file mode 100644
index 0000000..b6ab52b
--- /dev/null
+++ b/hyracks-runtime/src/main/java/edu/uci/ics/hyracks/web/WebServer.java
@@ -0,0 +1,50 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.web;
+
+import org.eclipse.jetty.server.Connector;
+import org.eclipse.jetty.server.Handler;
+import org.eclipse.jetty.server.Server;
+import org.eclipse.jetty.server.handler.ContextHandlerCollection;
+import org.eclipse.jetty.server.nio.SelectChannelConnector;
+
+public class WebServer {
+    private Server server;
+    private SelectChannelConnector connector;
+
+    public WebServer(Handler[] handlers) throws Exception {
+        server = new Server();
+
+        connector = new SelectChannelConnector();
+
+        server.setConnectors(new Connector[] { connector });
+
+        ContextHandlerCollection handler = new ContextHandlerCollection();
+        handler.setHandlers(handlers);
+        server.setHandler(handler);
+    }
+
+    public void setPort(int port) {
+        connector.setPort(port);
+    }
+
+    public void start() throws Exception {
+        server.start();
+    }
+
+    public void stop() throws Exception {
+        server.stop();
+    }
+}
\ No newline at end of file
diff --git a/hyracks-runtime/src/main/resources/edu/uci/ics/hyracks/controller/clustercontroller/scheduler.olg b/hyracks-runtime/src/main/resources/edu/uci/ics/hyracks/controller/clustercontroller/scheduler.olg
new file mode 100644
index 0000000..d346467
--- /dev/null
+++ b/hyracks-runtime/src/main/resources/edu/uci/ics/hyracks/controller/clustercontroller/scheduler.olg
@@ -0,0 +1,310 @@
+program hyrackscc;
+
+import java.util.UUID;
+import java.util.Set;
+
+import jol.types.basic.Tuple;
+import jol.types.basic.TupleSet;
+
+import edu.uci.ics.hyracks.api.dataflow.ActivityNodeId;
+import edu.uci.ics.hyracks.api.dataflow.OperatorDescriptorId;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
+import edu.uci.ics.hyracks.api.dataflow.ConnectorDescriptorId;
+import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;
+import edu.uci.ics.hyracks.api.job.JobStatus;
+import edu.uci.ics.hyracks.job.JobPlan;
+
+define(activitystage_temp, keys(), {UUID, OperatorDescriptorId, ActivityNodeId, Integer});
+
+activitystage_INITIAL activitystage_temp(JobId, OperatorId, ActivityId, 0) :-
+    activitynode(JobId, OperatorId, ActivityId, _);
+
+activitystage_BLOCKED activitystage_temp(JobId, OperatorId2, ActivityId2, StageNumber) :-
+    activitystage_temp(JobId, OperatorId1, ActivityId1, StageNumber1),
+    activitystage_temp(JobId, OperatorId2, ActivityId2, StageNumber2),
+    activityblocked(JobId, OperatorId1, ActivityId1, OperatorId2, ActivityId2),
+    StageNumber2 <= StageNumber1
+    {
+        StageNumber := StageNumber1 + 1;
+    };
+
+activitystage_PIPELINED_1 activitystage_temp(JobId, OperatorId2, ActivityId2, StageNumber) :-
+    activitystage_temp(JobId, OperatorId1, ActivityId1, StageNumber1),
+    activitystage_temp(JobId, OperatorId2, ActivityId2, StageNumber2),
+    activityconnection(JobId, OperatorId1, Operator1Port, edu.uci.ics.hyracks.api.dataflow.Direction.OUTPUT, ActivityId1, _),
+    activityconnection(JobId, OperatorId2, Operator2Port, edu.uci.ics.hyracks.api.dataflow.Direction.INPUT, ActivityId2, _),
+    connectordescriptor(JobId, _, OperatorId1, Operator1Port, OperatorId2, Operator2Port, _),
+    StageNumber1 != StageNumber2
+    {
+        StageNumber := java.lang.Math.max(StageNumber1, StageNumber2);
+    };
+
+activitystage_PIPELINED_2 activitystage_temp(JobId, OperatorId1, ActivityId1, StageNumber) :-
+    activitystage_temp(JobId, OperatorId1, ActivityId1, StageNumber1),
+    activitystage_temp(JobId, OperatorId2, ActivityId2, StageNumber2),
+    activityconnection(JobId, OperatorId1, Operator1Port, edu.uci.ics.hyracks.api.dataflow.Direction.OUTPUT, ActivityId1, _),
+    activityconnection(JobId, OperatorId2, Operator2Port, edu.uci.ics.hyracks.api.dataflow.Direction.INPUT, ActivityId2, _),
+    connectordescriptor(JobId, _, OperatorId1, Operator1Port, OperatorId2, Operator2Port, _),
+    StageNumber1 != StageNumber2
+    {
+        StageNumber := java.lang.Math.max(StageNumber1, StageNumber2);
+    };
+
+watch(activitystage_temp, a);
+
+watch(activityconnection, a);
+watch(activityblocked, a);
+watch(operatordescriptor, a);
+watch(connectordescriptor, a);
+
+watch(activitystage, a);
+watch(activitystage, i);
+watch(activitystage, d);
+
+define(activitystage, keys(0, 1, 2), {UUID, OperatorDescriptorId, ActivityNodeId, Integer});
+
+activitystage(JobId, OperatorId, ActivityId, max<StageNumber>) :-
+    activitystage_temp(JobId, OperatorId, ActivityId, StageNumber);
+
+define(jobstage, keys(0, 1), {UUID, Integer, UUID});
+
+jobstage(JobId, StageNumber, StageId) :-
+    activitystage(JobId, _, _, StageNumber)
+    {
+        StageId := java.util.UUID.randomUUID();
+    };
+
+watch(jobstage, a);
+
+define(jobattempt, keys(), {UUID, Integer});
+
+jobattempt(JobId, 0) :-
+    job(JobId, edu.uci.ics.hyracks.api.job.JobStatus.INITIALIZED, _, _, _),
+    jobstart(JobId, _);
+
+jobattempt(JobId, NextAttempt) :-
+    jobattempt(JobId, Attempt),
+    stagestart(JobId, _, Attempt),
+    abortcomplete(JobId, _, Attempt)
+    {
+        NextAttempt := Attempt + 1;
+    };
+
+define(stagestart, keys(), {UUID, Integer, Integer});
+define(stagefinish, keys(0, 1, 2), {UUID, Integer, Integer, Set});
+
+watch(jobstart, i);
+
+stagestart_INITIAL stagestart(JobId, 0, Attempt) :-
+    jobattempt#insert(JobId, Attempt);
+
+update_job_status_RUNNING job(JobId, edu.uci.ics.hyracks.api.job.JobStatus.RUNNING, JobSpec, JobPlan, null) :-
+    job(JobId, edu.uci.ics.hyracks.api.job.JobStatus.INITIALIZED, JobSpec, JobPlan, _),
+    jobstart(JobId, _);
+
+stagestart_NEXT stagestart(JobId, NextStageNumber, Attempt) :-
+    stagestart(JobId, StageNumber, Attempt),
+    stagefinish#insert(StageId, StageNumber, Attempt, _)
+    {
+        NextStageNumber := StageNumber + 1;
+    };
+
+watch(stagestart, a);
+watch(stagestart, d);
+
+define(operatorlocationcandidates, keys(), {UUID, OperatorDescriptorId, String, Integer, Integer});
+
+operatorlocationcandidates(JobId, OperatorId, NodeId, Partition, Benefit) :-
+    operatorlocation(JobId, OperatorId, NodeId, Partition, Benefit),
+    availablenodes(NodeId);
+
+watch(availablenodes, a);
+watch(availablenodes, i);
+watch(availablenodes, d);
+
+define(availablenodecount, keys(0), {Integer, Integer});
+
+watch(availablenodecount, a);
+watch(availablenodecount, i);
+watch(availablenodecount, d);
+
+availablenodecount(0, count<NodeId>) :-
+    availablenodes(NodeId);
+
+watch(rankedavailablenodes, a);
+watch(rankedavailablenodes, i);
+watch(rankedavailablenodes, d);
+
+watch(operatorlocationcandidates, a);
+watch(operatorlocationcandidates, i);
+watch(operatorlocationcandidates, d);
+
+define(maxoperatorlocationbenefit, keys(0, 1, 2), {UUID, OperatorDescriptorId, Integer, Integer});
+
+maxoperatorlocationbenefit(JobId, OperatorId, Partition, max<Benefit>) :-
+    operatorlocationcandidates(JobId, OperatorId, _, Partition, Benefit);
+
+watch(maxoperatorlocationbenefit, a);
+watch(maxoperatorlocationbenefit, i);
+watch(maxoperatorlocationbenefit, d);
+
+define(attemptoperatorlocationdecision, keys(0, 1, 3, 4), {UUID, OperatorDescriptorId, String, Integer, Integer});
+
+watch(attemptoperatorlocationdecision, a);
+watch(attemptoperatorlocationdecision, i);
+watch(attemptoperatorlocationdecision, d);
+
+attemptoperatorlocationdecision(JobId, OperatorId, NodeId, Partition, Attempt) :-
+    jobattempt#insert(JobId, Attempt),
+    operatorlocationcandidates(JobId, OperatorId, NodeId, Partition, Benefit),
+    maxoperatorlocationbenefit(JobId, OperatorId, Partition, Benefit);
+
+attemptoperatorlocationdecision(JobId, OperatorId, NodeId, Partition, Attempt) :-
+    jobattempt#insert(JobId, Attempt),
+    operatorclonecountexpansiontotalorder(JobId, OperatorId, Partition, CloneRank),
+    rankedavailablenodes(NodeId, NodeRank),
+    availablenodecount(_, NodeCount),
+    NodeRank == CloneRank % NodeCount;
+
+define(operatorclonecount_temp, keys(), {UUID, OperatorDescriptorId, Integer, Integer});
+
+operatorclonecount_temp(JobId, OperatorId, NPartitions, 0) :-
+    operatorclonecount(JobId, OperatorId, NPartitions);
+
+define(operatorclonecountexpansiontotalorder, keys(0, 1, 2), {UUID, OperatorDescriptorId, Integer, Integer});
+
+operatorclonecountexpansiontotalorder(JobId, OperatorId, Partition, Rank) :-
+    expandpartitioncountconstraint(operatorclonecount_temp(JobId, OperatorId, Partition, Rank));
+
+watch(operatorclonecountexpansiontotalorder, a);
+watch(operatorclonecountexpansiontotalorder, i);
+watch(operatorclonecountexpansiontotalorder, d);
+
+watch(operatorclonecount, a);
+watch(operatorclonecount, i);
+watch(operatorclonecount, d);
+
+define(activitystart, keys(), {UUID, OperatorDescriptorId, ActivityNodeId, Integer, Integer, UUID, String, Integer});
+
+activitystart(JobId, OperatorId, ActivityId, StageNumber, Attempt, StageId, NodeId, Partition) :-
+    stagestart#insert(JobId, StageNumber, Attempt),
+    operatordescriptor(JobId, OperatorId, _, _),
+    activitystage(JobId, OperatorId, ActivityId, StageNumber),
+    jobstage(JobId, StageNumber, StageId),
+    attemptoperatorlocationdecision(JobId, OperatorId, NodeId, Partition, Attempt);
+
+watch(activitystart, a);
+
+define(stageletstart, keys(0, 1, 3, 4), {UUID, UUID, JobPlan, String, Integer, Set});
+
+stageletstart(JobId, StageId, JobPlan, NodeId, Attempt, set<ActivityInfo>) :-
+    activitystart#insert(JobId, _, ActivityId, StageNumber, Attempt, StageId, NodeId, Partition),
+    job(JobId, edu.uci.ics.hyracks.api.job.JobStatus.RUNNING, _, JobPlan, _)
+    {
+        ActivityInfo := [ActivityId, Partition];
+    };
+
+watch(stageletstart, a);
+watch(stageletstart, i);
+
+define(startmessage_agg, keys(0, 1, 2), {UUID, UUID, Integer, JobPlan, Set});
+
+startmessage_agg(JobId, StageId, Attempt, JobPlan, set<Tuple>) :-
+    stageletstart#insert(JobId, StageId, JobPlan, NodeId, Attempt, ActivityInfoSet),
+    availablenodes(NodeId),
+    ActivityInfoSet.size() != 0
+    {
+        Tuple := [NodeId, ActivityInfoSet];
+    };
+
+startmessage(JobId, StageId, Attempt, JobPlan, TSet) :-
+    startmessage_agg(JobId, StageId, Attempt, JobPlan, TSet);
+
+watch(startmessage, a);
+watch(startmessage, i);
+
+define(stageletabort, keys(0, 1, 3, 4), {UUID, UUID, JobPlan, String, Integer, Set});
+
+stageletabort(JobId, StageId, JobPlan, NodeId, Attempt, ActivityIdSet) :-
+    stageletfailure(JobId, StageId, NodeId, Attempt),
+    stageletstart(JobId, StageId, JobPlan, NodeId, Attempt, ActivityIdSet);
+
+stageletabort(JobId, StageId, JobPlan, NodeIdOther, Attempt, ActivityIdSet) :-
+    stageletstart(JobId, StageId, JobPlan, NodeId, Attempt, _),
+    stageletstart(JobId, StageId, _, NodeIdOther, Attempt, ActivityIdSet),
+    failednodes#insert(NodeId),
+    notin stageletcomplete(JobId, StageId, NodeId, Attempt, _);
+
+watch(stageletabort, a);
+watch(stageletabort, i);
+watch(stageletabort, d);
+
+define(stageabort, keys(0, 1, 2), {UUID, UUID, Integer, Set});
+
+stageabort(JobId, StageId, Attempt, set<NodeId>) :-
+    stageletabort#insert(JobId, StageId, _, NodeId, Attempt, _);
+
+define(abortmessage_agg, keys(0, 1, 2), {UUID, UUID, Integer, JobPlan, Set});
+
+abortmessage_agg(JobId, StageId, Attempt, JobPlan, set<Tuple>) :-
+    stageletabort#insert(JobId, StageId, JobPlan, NodeId, Attempt, ActivityIdSet),
+    availablenodes(NodeId)
+    {
+        Tuple := [NodeId, ActivityIdSet];
+    };
+
+abortmessage(JobId, StageId, Attempt, JobPlan, TSet) :-
+    abortmessage_agg(JobId, StageId, Attempt, JobPlan, TSet),
+    TSet.size() != 0;
+
+watch(abortmessage, a);
+watch(abortmessage, i);
+
+define(stageletabortcomplete, keys(), {UUID, UUID, String, Integer});
+
+stageletabortcomplete(JobId, StageId, NodeId, Attempt) :-
+    abortnotify(JobId, StageId, NodeId, Attempt);
+
+stageletabortcomplete(JobId, StageId, NodeId, Attempt) :-
+    stageletabort(JobId, StageId, _, NodeId, Attempt, _),
+    notin availablenodes(NodeId);
+
+define(stageletabortcomplete_agg, keys(0, 1, 2), {UUID, UUID, Integer, Set});
+
+stageletabortcomplete_agg(JobId, StageId, Attempt, set<NodeId>) :-
+    stageletabortcomplete(JobId, StageId, NodeId, Attempt);
+
+define(abortcomplete, keys(), {UUID, UUID, Integer});
+
+abortcomplete(JobId, StageId, Attempt) :-
+    stageletabortcomplete_agg(JobId, StageId, Attempt, NodeIdSet1),
+    stageabort(JobId, StageId, Attempt, NodeIdSet2),
+    NodeIdSet1.size() == NodeIdSet2.size();
+
+define(stageletcomplete_agg, keys(0, 1, 2), {UUID, UUID, Integer, Set});
+
+stageletcomplete_agg(JobId, StageId, Attempt, set<Statistics>) :-
+    stageletcomplete(JobId, StageId, NodeId, Attempt, Statistics);
+
+stagefinish(JobId, StageNumber, Attempt, SSet) :-
+    startmessage_agg(JobId, StageId, Attempt, _, TSet),
+    stageletcomplete_agg(JobId, StageId, Attempt, SSet),
+    jobstage(JobId, StageNumber, StageId),
+    TSet.size() == SSet.size();
+
+update_job_status_TERMINATED job(JobId, edu.uci.ics.hyracks.api.job.JobStatus.TERMINATED, JobSpec, JobPlan, null) :-
+    job(JobId, edu.uci.ics.hyracks.api.job.JobStatus.RUNNING, JobSpec, JobPlan, _),
+    stagestart#insert(JobId, StageNumber, Attempt),
+    stagefinish(JobId, _, Attempt, SSet),
+    notin jobstage(JobId, StageNumber);
+
+define(jobcleanup_agg, {UUID, Set});
+
+jobcleanup_agg(JobId, set<NodeId>) :-
+    stagestart#insert(JobId, StageNumber, Attempt),
+    stagefinish(JobId, _, Attempt, _),
+    attemptoperatorlocationdecision(JobId, _, NodeId, _, Attempt),
+    notin jobstage(JobId, StageNumber);
+
+jobcleanup(JobId, NodeIdSet) :-
+    jobcleanup_agg(JobId, NodeIdSet);
\ No newline at end of file