Merged online_aggregation @186:220

git-svn-id: https://hyracks.googlecode.com/svn/trunk/hyracks@221 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/application/IApplicationContext.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/application/IApplicationContext.java
index 4ba5869..c739a5e 100644
--- a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/application/IApplicationContext.java
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/application/IApplicationContext.java
@@ -14,5 +14,10 @@
  */
 package edu.uci.ics.hyracks.api.application;
 
+import java.io.Serializable;
+
 public interface IApplicationContext {
+    public ClassLoader getClassLoader();
+
+    public Serializable getDestributedState();
 }
\ No newline at end of file
diff --git a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/application/IBootstrap.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/application/IBootstrap.java
index ceae504..efbf0df 100644
--- a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/application/IBootstrap.java
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/application/IBootstrap.java
@@ -18,6 +18,4 @@
     public void start() throws Exception;
 
     public void stop() throws Exception;
-
-    public void setApplicationContext(IApplicationContext appCtx);
 }
\ No newline at end of file
diff --git a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/application/ICCApplicationContext.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/application/ICCApplicationContext.java
new file mode 100644
index 0000000..a431939
--- /dev/null
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/application/ICCApplicationContext.java
@@ -0,0 +1,28 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.application;
+
+import java.io.Serializable;
+
+import edu.uci.ics.hyracks.api.job.IJobLifecycleListener;
+import edu.uci.ics.hyracks.api.job.IJobSpecificationFactory;
+
+public interface ICCApplicationContext extends IApplicationContext {
+    public void setDistributedState(Serializable state);
+
+    public void setJobSpecificationFactory(IJobSpecificationFactory jobSpecFactory);
+
+    public void addJobLifecycleListener(IJobLifecycleListener jobLifecycleListener);
+}
\ No newline at end of file
diff --git a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/application/ICCBootstrap.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/application/ICCBootstrap.java
new file mode 100644
index 0000000..3acbe5e
--- /dev/null
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/application/ICCBootstrap.java
@@ -0,0 +1,5 @@
+package edu.uci.ics.hyracks.api.application;
+
+public interface ICCBootstrap extends IBootstrap {
+    public void setApplicationContext(ICCApplicationContext appCtx);
+}
\ No newline at end of file
diff --git a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/application/INCApplicationContext.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/application/INCApplicationContext.java
new file mode 100644
index 0000000..9e63446
--- /dev/null
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/application/INCApplicationContext.java
@@ -0,0 +1,21 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.application;
+
+import java.io.Serializable;
+
+public interface INCApplicationContext extends IApplicationContext {
+    public void setDistributedState(Serializable state);
+}
\ No newline at end of file
diff --git a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/application/INCBootstrap.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/application/INCBootstrap.java
new file mode 100644
index 0000000..4aa1ac3
--- /dev/null
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/application/INCBootstrap.java
@@ -0,0 +1,5 @@
+package edu.uci.ics.hyracks.api.application;
+
+public interface INCBootstrap extends IBootstrap {
+    public void setApplicationContext(INCApplicationContext appCtx);
+}
\ No newline at end of file
diff --git a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/AbstractHyracksConnection.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/AbstractHyracksConnection.java
index 753079b..779e9ac 100644
--- a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/AbstractHyracksConnection.java
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/AbstractHyracksConnection.java
@@ -14,10 +14,7 @@
  */
 package edu.uci.ics.hyracks.api.client;
 
-import java.io.ByteArrayOutputStream;
 import java.io.File;
-import java.io.IOException;
-import java.io.ObjectOutputStream;
 import java.util.EnumSet;
 import java.util.UUID;
 
@@ -31,6 +28,7 @@
 import edu.uci.ics.hyracks.api.job.JobFlag;
 import edu.uci.ics.hyracks.api.job.JobSpecification;
 import edu.uci.ics.hyracks.api.job.JobStatus;
+import edu.uci.ics.hyracks.api.util.JavaSerializationUtils;
 
 abstract class AbstractHyracksConnection implements IHyracksClientConnection {
     private final String ccHost;
@@ -73,14 +71,7 @@
 
     @Override
     public UUID createJob(String appName, JobSpecification jobSpec, EnumSet<JobFlag> jobFlags) throws Exception {
-        return hci.createJob(appName, serialize(jobSpec), jobFlags);
-    }
-
-    private byte[] serialize(JobSpecification jobSpec) throws IOException {
-        ByteArrayOutputStream baos = new ByteArrayOutputStream();
-        ObjectOutputStream oos = new ObjectOutputStream(baos);
-        oos.writeObject(jobSpec);
-        return baos.toByteArray();
+        return hci.createJob(appName, JavaSerializationUtils.serialize(jobSpec), jobFlags);
     }
 
     @Override
diff --git a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/context/IHyracksContext.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/context/IHyracksContext.java
index 535c925..d90ac60 100644
--- a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/context/IHyracksContext.java
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/context/IHyracksContext.java
@@ -14,6 +14,8 @@
  */
 package edu.uci.ics.hyracks.api.context;
 
+import java.util.UUID;
+
 import edu.uci.ics.hyracks.api.job.profiling.counters.ICounterContext;
 import edu.uci.ics.hyracks.api.resources.IResourceManager;
 
@@ -23,4 +25,6 @@
     public int getFrameSize();
 
     public ICounterContext getCounterContext();
+
+    public UUID getJobId();
 }
\ No newline at end of file
diff --git a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/control/INodeController.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/control/INodeController.java
index cd8c067..94e5bd3 100644
--- a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/control/INodeController.java
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/control/INodeController.java
@@ -49,7 +49,7 @@
 
     public void notifyRegistration(IClusterController ccs) throws Exception;
 
-    public void createApplication(String appName, boolean deployHar) throws Exception;
+    public void createApplication(String appName, boolean deployHar, byte[] serializedDistributedState) throws Exception;
 
     public void destroyApplication(String appName) throws Exception;
 }
\ No newline at end of file
diff --git a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/IJobLifecycleListener.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/IJobLifecycleListener.java
new file mode 100644
index 0000000..c2f207e
--- /dev/null
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/IJobLifecycleListener.java
@@ -0,0 +1,25 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.job;
+
+import java.util.UUID;
+
+public interface IJobLifecycleListener {
+    public void notifyJobCreation(UUID jobId, JobSpecification jobSpec);
+
+    public void notifyJobStart(UUID jobId);
+
+    public void notifyJobFinish(UUID jobId);
+}
\ No newline at end of file
diff --git a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/IJobSpecificationFactory.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/IJobSpecificationFactory.java
new file mode 100644
index 0000000..722aaae
--- /dev/null
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/IJobSpecificationFactory.java
@@ -0,0 +1,24 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.job;
+
+import edu.uci.ics.hyracks.api.application.ICCApplicationContext;
+import edu.uci.ics.hyracks.api.application.ICCBootstrap;
+import edu.uci.ics.hyracks.api.exceptions.HyracksException;
+
+public interface IJobSpecificationFactory {
+    public JobSpecification createJobSpecification(byte[] bytes, ICCBootstrap bootstrap, ICCApplicationContext appCtx)
+            throws HyracksException;
+}
\ No newline at end of file
diff --git a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/JobSpecification.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/JobSpecification.java
index e724deb..fce9c22 100644
--- a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/JobSpecification.java
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/JobSpecification.java
@@ -46,6 +46,8 @@
 
     private final Map<ConnectorDescriptorId, Pair<Pair<IOperatorDescriptor, Integer>, Pair<IOperatorDescriptor, Integer>>> connectorOpMap;
 
+    private final Map<String, Serializable> properties;
+
     public JobSpecification() {
         roots = new ArrayList<OperatorDescriptorId>();
         opMap = new HashMap<OperatorDescriptorId, IOperatorDescriptor>();
@@ -53,6 +55,7 @@
         opInputMap = new HashMap<OperatorDescriptorId, List<IConnectorDescriptor>>();
         opOutputMap = new HashMap<OperatorDescriptorId, List<IConnectorDescriptor>>();
         connectorOpMap = new HashMap<ConnectorDescriptorId, Pair<Pair<IOperatorDescriptor, Integer>, Pair<IOperatorDescriptor, Integer>>>();
+        properties = new HashMap<String, Serializable>();
     }
 
     public void addRoot(IOperatorDescriptor op) {
@@ -68,6 +71,14 @@
                         new Pair<IOperatorDescriptor, Integer>(producerOp, producerPort),
                         new Pair<IOperatorDescriptor, Integer>(consumerOp, consumerPort)));
     }
+    
+    public void setProperty(String name, Serializable value) {
+        properties.put(name, value);
+    }
+    
+    public Serializable getProperty(String name) {
+        return properties.get(name);
+    }
 
     private <T> void extend(List<T> list, int index) {
         int n = list.size();
diff --git a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/util/JavaSerializationUtils.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/util/JavaSerializationUtils.java
new file mode 100644
index 0000000..05a75f9
--- /dev/null
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/util/JavaSerializationUtils.java
@@ -0,0 +1,53 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.util;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.ObjectStreamClass;
+import java.io.Serializable;
+
+public class JavaSerializationUtils {
+    public static byte[] serialize(Serializable jobSpec) throws IOException {
+        ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        ObjectOutputStream oos = new ObjectOutputStream(baos);
+        oos.writeObject(jobSpec);
+        return baos.toByteArray();
+    }
+
+    public static Object deserialize(byte[] bytes, ClassLoader classLoader) throws IOException, ClassNotFoundException {
+        ObjectInputStream ois = new ClassLoaderObjectInputStream(new ByteArrayInputStream(bytes), classLoader);
+        return ois.readObject();
+    }
+
+    private static class ClassLoaderObjectInputStream extends ObjectInputStream {
+        private ClassLoader classLoader;
+
+        protected ClassLoaderObjectInputStream(InputStream in, ClassLoader classLoader) throws IOException,
+                SecurityException {
+            super(in);
+            this.classLoader = classLoader;
+        }
+
+        @Override
+        protected Class<?> resolveClass(ObjectStreamClass desc) throws ClassNotFoundException {
+            return Class.forName(desc.getName(), false, classLoader);
+        }
+    }
+}
\ No newline at end of file
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
index e411517..a9fd347 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
@@ -24,7 +24,6 @@
 import java.rmi.registry.LocateRegistry;
 import java.rmi.registry.Registry;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.EnumSet;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -56,7 +55,6 @@
 import org.eclipse.jetty.server.Request;
 import org.eclipse.jetty.server.handler.AbstractHandler;
 import org.eclipse.jetty.server.handler.ContextHandler;
-import org.json.JSONArray;
 import org.json.JSONObject;
 
 import edu.uci.ics.hyracks.api.client.ClusterControllerInfo;
@@ -74,6 +72,8 @@
 import edu.uci.ics.hyracks.api.job.JobPlan;
 import edu.uci.ics.hyracks.api.job.JobSpecification;
 import edu.uci.ics.hyracks.api.job.JobStatus;
+import edu.uci.ics.hyracks.api.util.JavaSerializationUtils;
+import edu.uci.ics.hyracks.control.cc.application.CCApplicationContext;
 import edu.uci.ics.hyracks.control.cc.web.WebServer;
 import edu.uci.ics.hyracks.control.cc.web.handlers.util.IJSONOutputFunction;
 import edu.uci.ics.hyracks.control.cc.web.handlers.util.JSONOutputRequestHandler;
@@ -92,7 +92,7 @@
 
     private final Map<String, NodeControllerState> nodeRegistry;
 
-    private final Map<String, ApplicationContext> applications;
+    private final Map<String, CCApplicationContext> applications;
 
     private final ServerContext serverCtx;
 
@@ -111,7 +111,7 @@
     public ClusterControllerService(CCConfig ccConfig) throws Exception {
         this.ccConfig = ccConfig;
         nodeRegistry = new LinkedHashMap<String, NodeControllerState>();
-        applications = new Hashtable<String, ApplicationContext>();
+        applications = new Hashtable<String, CCApplicationContext>();
         serverCtx = new ServerContext(ServerContext.ServerType.CLUSTER_CONTROLLER, new File(
                 ClusterControllerService.class.getName()));
         Set<DebugLevel> jolDebugLevel = LOGGER.isLoggable(Level.FINE) ? Runtime.DEBUG_ALL : new HashSet<DebugLevel>();
@@ -148,11 +148,19 @@
 
     @Override
     public UUID createJob(String appName, byte[] jobSpec, EnumSet<JobFlag> jobFlags) throws Exception {
-        ApplicationContext appCtx = applications.get(appName);
+        CCApplicationContext appCtx = getApplicationContext(appName);
         if (appCtx == null) {
             throw new HyracksException("No application with id " + appName + " found");
         }
-        return jobManager.createJob(appName, (JobSpecification) appCtx.deserialize(jobSpec), jobFlags);
+        UUID jobId = UUID.randomUUID();
+        JobSpecification specification = appCtx.createJobSpecification(jobId, jobSpec);
+        jobManager.createJob(jobId, appName, specification, jobFlags);
+        appCtx.notifyJobCreation(jobId, specification);
+        return jobId;
+    }
+
+    public synchronized CCApplicationContext getApplicationContext(String appName) {
+        return applications.get(appName);
     }
 
     @Override
@@ -328,7 +336,7 @@
                     }
                     String appName = parts[0];
                     ApplicationContext appCtx;
-                    appCtx = applications.get(appName);
+                    appCtx = getApplicationContext(appName);
                     if (appCtx != null) {
                         if (HttpMethods.PUT.equals(request.getMethod())) {
                             OutputStream os = appCtx.getHarOutputStream();
@@ -384,7 +392,7 @@
             if (applications.containsKey(appName)) {
                 throw new HyracksException("Duplicate application with name: " + appName + " being created.");
             }
-            ApplicationContext appCtx = new ApplicationContext(serverCtx, appName);
+            CCApplicationContext appCtx = new CCApplicationContext(serverCtx, appName);
             applications.put(appName, appCtx);
         }
     }
@@ -404,8 +412,10 @@
 
     @Override
     public void startApplication(final String appName) throws Exception {
-        ApplicationContext appCtx = applications.get(appName);
+        ApplicationContext appCtx = getApplicationContext(appName);
+        appCtx.initializeClassPath();
         appCtx.initialize();
+        final byte[] serializedDistributedState = JavaSerializationUtils.serialize(appCtx.getDestributedState());
         final boolean deployHar = appCtx.containsHar();
         RemoteOp<Void>[] ops;
         synchronized (this) {
@@ -419,7 +429,7 @@
 
                     @Override
                     public Void execute(INodeController node) throws Exception {
-                        node.createApplication(appName, deployHar);
+                        node.createApplication(appName, deployHar, serializedDistributedState);
                         return null;
                     }
                 });
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/IJobManager.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/IJobManager.java
index 1f14b3b..88076c9 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/IJobManager.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/IJobManager.java
@@ -23,7 +23,8 @@
 import edu.uci.ics.hyracks.api.job.JobStatus;
 
 public interface IJobManager {
-    public UUID createJob(String appName, JobSpecification jobSpec, EnumSet<JobFlag> jobFlags) throws Exception;
+    public void createJob(UUID jobId, String appName, JobSpecification jobSpec, EnumSet<JobFlag> jobFlags)
+            throws Exception;
 
     public void start(UUID jobId) throws Exception;
 
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/JOLJobManagerImpl.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/JOLJobManagerImpl.java
index 9ed4f00..2eabaf1 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/JOLJobManagerImpl.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/JOLJobManagerImpl.java
@@ -60,6 +60,7 @@
 import edu.uci.ics.hyracks.api.job.JobPlan;
 import edu.uci.ics.hyracks.api.job.JobSpecification;
 import edu.uci.ics.hyracks.api.job.JobStatus;
+import edu.uci.ics.hyracks.control.cc.application.CCApplicationContext;
 
 public class JOLJobManagerImpl implements IJobManager {
     private static final Logger LOGGER = Logger.getLogger(JOLJobManagerImpl.class.getName());
@@ -302,7 +303,8 @@
                             try {
                                 Object[] data = t.toArray();
                                 UUID jobId = (UUID) data[0];
-                                Set<String> ts = (Set<String>) data[1];
+                                String appName = (String) data[1];
+                                Set<String> ts = (Set<String>) data[2];
                                 ClusterControllerService.JobCompleteNotifier[] jcns = new ClusterControllerService.JobCompleteNotifier[ts
                                         .size()];
                                 int i = 0;
@@ -317,6 +319,10 @@
                                     jolRuntime.schedule(JOL_SCOPE, JobCleanUpCompleteTable.TABLE_NAME, jccTuples, null);
                                     jolRuntime.evaluate();
                                 }
+                                CCApplicationContext appCtx = ccs.getApplicationContext(appName);
+                                if (appCtx != null) {
+                                    appCtx.notifyJobFinish(jobId);
+                                }
                             } catch (Exception e) {
                             }
                         }
@@ -378,9 +384,8 @@
     }
 
     @Override
-    public UUID createJob(String appName, JobSpecification jobSpec, EnumSet<JobFlag> jobFlags) throws Exception {
-        final UUID jobId = UUID.randomUUID();
-
+    public void createJob(final UUID jobId, String appName, JobSpecification jobSpec, EnumSet<JobFlag> jobFlags)
+            throws Exception {
         final JobPlanBuilder builder = new JobPlanBuilder();
         builder.init(jobSpec, jobFlags);
 
@@ -443,8 +448,6 @@
         jolRuntime.schedule(JOL_SCOPE, ActivityBlockedTable.TABLE_NAME, abTuples, null);
 
         jolRuntime.evaluate();
-
-        return jobId;
     }
 
     private int addPartitionConstraintTuples(UUID jobId, IOperatorDescriptor od, BasicTupleSet olTuples,
@@ -868,7 +871,7 @@
         private static Key PRIMARY_KEY = new Key(0);
 
         @SuppressWarnings("unchecked")
-        private static final Class[] SCHEMA = new Class[] { UUID.class, Set.class };
+        private static final Class[] SCHEMA = new Class[] { UUID.class, String.class, Set.class };
 
         public JobCleanUpTable(Runtime context) {
             super(context, TABLE_NAME, PRIMARY_KEY, SCHEMA);
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/application/CCApplicationContext.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/application/CCApplicationContext.java
new file mode 100644
index 0000000..8ced75d
--- /dev/null
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/application/CCApplicationContext.java
@@ -0,0 +1,79 @@
+package edu.uci.ics.hyracks.control.cc.application;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+
+import edu.uci.ics.hyracks.api.application.ICCApplicationContext;
+import edu.uci.ics.hyracks.api.application.ICCBootstrap;
+import edu.uci.ics.hyracks.api.exceptions.HyracksException;
+import edu.uci.ics.hyracks.api.job.IJobLifecycleListener;
+import edu.uci.ics.hyracks.api.job.IJobSpecificationFactory;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.control.cc.job.DeserializingJobSpecificationFactory;
+import edu.uci.ics.hyracks.control.common.application.ApplicationContext;
+import edu.uci.ics.hyracks.control.common.context.ServerContext;
+
+public class CCApplicationContext extends ApplicationContext implements ICCApplicationContext {
+    private IJobSpecificationFactory jobSpecFactory;
+
+    private List<IJobLifecycleListener> jobLifecycleListeners;
+
+    public CCApplicationContext(ServerContext serverCtx, String appName) throws IOException {
+        super(serverCtx, appName);
+        jobSpecFactory = DeserializingJobSpecificationFactory.INSTANCE;
+        jobLifecycleListeners = new ArrayList<IJobLifecycleListener>();
+    }
+
+    @Override
+    protected void start() throws Exception {
+        ((ICCBootstrap) bootstrap).setApplicationContext(this);
+        bootstrap.start();
+    }
+
+    @Override
+    public void setJobSpecificationFactory(IJobSpecificationFactory jobSpecFactory) {
+        this.jobSpecFactory = jobSpecFactory;
+    }
+
+    public JobSpecification createJobSpecification(UUID jobId, byte[] bytes) throws HyracksException {
+        return jobSpecFactory.createJobSpecification(bytes, (ICCBootstrap) bootstrap, this);
+    }
+
+    @Override
+    protected void stop() throws Exception {
+        if (bootstrap != null) {
+            bootstrap.stop();
+        }
+    }
+
+    @Override
+    public void setDistributedState(Serializable state) {
+        this.distributedState = state;
+    }
+
+    @Override
+    public void addJobLifecycleListener(IJobLifecycleListener jobLifecycleListener) {
+        jobLifecycleListeners.add(jobLifecycleListener);
+    }
+
+    public synchronized void notifyJobStart(UUID jobId) {
+        for (IJobLifecycleListener l : jobLifecycleListeners) {
+            l.notifyJobStart(jobId);
+        }
+    }
+
+    public synchronized void notifyJobFinish(UUID jobId) {
+        for (IJobLifecycleListener l : jobLifecycleListeners) {
+            l.notifyJobFinish(jobId);
+        }
+    }
+
+    public synchronized void notifyJobCreation(UUID jobId, JobSpecification specification) {
+        for (IJobLifecycleListener l : jobLifecycleListeners) {
+            l.notifyJobCreation(jobId, specification);
+        }
+    }
+}
\ No newline at end of file
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/DeserializingJobSpecificationFactory.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/DeserializingJobSpecificationFactory.java
new file mode 100644
index 0000000..4df524c
--- /dev/null
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/DeserializingJobSpecificationFactory.java
@@ -0,0 +1,29 @@
+package edu.uci.ics.hyracks.control.cc.job;
+
+import java.io.IOException;
+
+import edu.uci.ics.hyracks.api.application.ICCApplicationContext;
+import edu.uci.ics.hyracks.api.application.ICCBootstrap;
+import edu.uci.ics.hyracks.api.exceptions.HyracksException;
+import edu.uci.ics.hyracks.api.job.IJobSpecificationFactory;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.api.util.JavaSerializationUtils;
+
+public class DeserializingJobSpecificationFactory implements IJobSpecificationFactory {
+    public static final IJobSpecificationFactory INSTANCE = new DeserializingJobSpecificationFactory();
+
+    private DeserializingJobSpecificationFactory() {
+    }
+
+    @Override
+    public JobSpecification createJobSpecification(byte[] bytes, ICCBootstrap bootstrap, ICCApplicationContext appCtx)
+            throws HyracksException {
+        try {
+            return (JobSpecification) JavaSerializationUtils.deserialize(bytes, appCtx.getClassLoader());
+        } catch (IOException e) {
+            throw new HyracksException(e);
+        } catch (ClassNotFoundException e) {
+            throw new HyracksException(e);
+        }
+    }
+}
\ No newline at end of file
diff --git a/hyracks-control-cc/src/main/resources/edu/uci/ics/hyracks/control/cc/scheduler.olg b/hyracks-control-cc/src/main/resources/edu/uci/ics/hyracks/control/cc/scheduler.olg
index a717128..8174098 100644
--- a/hyracks-control-cc/src/main/resources/edu/uci/ics/hyracks/control/cc/scheduler.olg
+++ b/hyracks-control-cc/src/main/resources/edu/uci/ics/hyracks/control/cc/scheduler.olg
@@ -324,13 +324,14 @@
         NewStats.putAll(SMap);
     };
 
-define(jobcleanup_agg, {UUID, Set});
+define(jobcleanup_agg, {UUID, String, Set});
 
-jobcleanup_agg(JobId, set<NodeId>) :-
+jobcleanup_agg(JobId, AppName, set<NodeId>) :-
     stagestart#insert(JobId, StageNumber, Attempt),
     stagefinish(JobId, _, Attempt, _),
     attemptoperatorlocationdecision(JobId, _, NodeId, _, Attempt),
+    job(JobId, AppName, _, _, _, _),
     notin jobstage(JobId, StageNumber);
 
-jobcleanup(JobId, NodeIdSet) :-
-    jobcleanup_agg(JobId, NodeIdSet);
+jobcleanup(JobId, AppName, NodeIdSet) :-
+    jobcleanup_agg(JobId, AppName, NodeIdSet);
diff --git a/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/application/ApplicationContext.java b/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/application/ApplicationContext.java
index c499ac9..6a494aa 100644
--- a/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/application/ApplicationContext.java
+++ b/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/application/ApplicationContext.java
@@ -14,14 +14,13 @@
  */
 package edu.uci.ics.hyracks.control.common.application;
 
-import java.io.ByteArrayInputStream;
 import java.io.File;
 import java.io.FileInputStream;
 import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
-import java.io.ObjectInputStream;
 import java.io.OutputStream;
+import java.io.Serializable;
 import java.net.MalformedURLException;
 import java.net.URL;
 import java.net.URLClassLoader;
@@ -39,20 +38,22 @@
 
 import edu.uci.ics.hyracks.api.application.IApplicationContext;
 import edu.uci.ics.hyracks.api.application.IBootstrap;
+import edu.uci.ics.hyracks.api.util.JavaSerializationUtils;
 import edu.uci.ics.hyracks.control.common.context.ServerContext;
 
-public class ApplicationContext implements IApplicationContext {
+public abstract class ApplicationContext implements IApplicationContext {
     private static final String APPLICATION_ROOT = "applications";
     private static final String CLUSTER_CONTROLLER_BOOTSTRAP_CLASS_KEY = "cc.bootstrap.class";
     private static final String NODE_CONTROLLER_BOOTSTRAP_CLASS_KEY = "nc.bootstrap.class";
 
-    private ServerContext serverCtx;
-    private final String appName;
-    private final File applicationRootDir;
-    private ClassLoader classLoader;
-    private ApplicationStatus status;
-    private Properties deploymentDescriptor;
-    private IBootstrap bootstrap;
+    protected ServerContext serverCtx;
+    protected final String appName;
+    protected final File applicationRootDir;
+    protected ClassLoader classLoader;
+    protected ApplicationStatus status;
+    protected Properties deploymentDescriptor;
+    protected IBootstrap bootstrap;
+    protected Serializable distributedState;
 
     public ApplicationContext(ServerContext serverCtx, String appName) throws IOException {
         this.serverCtx = serverCtx;
@@ -67,10 +68,7 @@
         return appName;
     }
 
-    public void initialize() throws Exception {
-        if (status != ApplicationStatus.CREATED) {
-            throw new IllegalStateException();
-        }
+    public void initializeClassPath() throws Exception {
         if (expandArchive()) {
             File expandedFolder = getExpandedFolder();
             List<URL> urls = new ArrayList<URL>();
@@ -83,7 +81,16 @@
             });
             classLoader = new URLClassLoader(urls.toArray(new URL[urls.size()]));
             deploymentDescriptor = parseDeploymentDescriptor();
+        } else {
+            classLoader = getClass().getClassLoader();
+        }
+    }
 
+    public void initialize() throws Exception {
+        if (status != ApplicationStatus.CREATED) {
+            throw new IllegalStateException();
+        }
+        if (expandArchive()) {
             String bootstrapClass = null;
             switch (serverCtx.getServerType()) {
                 case CLUSTER_CONTROLLER: {
@@ -97,15 +104,16 @@
             }
             if (bootstrapClass != null) {
                 bootstrap = (IBootstrap) classLoader.loadClass(bootstrapClass).newInstance();
-                bootstrap.setApplicationContext(this);
-                bootstrap.start();
+                start();
             }
-        } else {
-            classLoader = getClass().getClassLoader();
         }
         status = ApplicationStatus.INITIALIZED;
     }
 
+    protected abstract void start() throws Exception;
+
+    protected abstract void stop() throws Exception;
+
     private void findJarFiles(File dir, List<URL> urls) throws MalformedURLException {
         for (File f : dir.listFiles()) {
             if (f.isDirectory()) {
@@ -161,16 +169,13 @@
 
     public void deinitialize() throws Exception {
         status = ApplicationStatus.DEINITIALIZED;
-        if (bootstrap != null) {
-            bootstrap.stop();
-        }
+        stop();
         File expandedFolder = getExpandedFolder();
         FileUtils.deleteDirectory(expandedFolder);
     }
 
     public Object deserialize(byte[] bytes) throws IOException, ClassNotFoundException {
-        ObjectInputStream ois = new ClassLoaderObjectInputStream(new ByteArrayInputStream(bytes), classLoader);
-        return ois.readObject();
+        return JavaSerializationUtils.deserialize(bytes, classLoader);
     }
 
     public OutputStream getHarOutputStream() throws IOException {
@@ -188,4 +193,14 @@
     public boolean containsHar() {
         return getArchiveFile().exists();
     }
+
+    @Override
+    public Serializable getDestributedState() {
+        return distributedState;
+    }
+
+    @Override
+    public ClassLoader getClassLoader() {
+        return classLoader;
+    }
 }
\ No newline at end of file
diff --git a/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/application/ClassLoaderObjectInputStream.java b/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/application/ClassLoaderObjectInputStream.java
deleted file mode 100644
index 1ae29ba..0000000
--- a/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/application/ClassLoaderObjectInputStream.java
+++ /dev/null
@@ -1,20 +0,0 @@
-package edu.uci.ics.hyracks.control.common.application;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.ObjectInputStream;
-import java.io.ObjectStreamClass;
-
-public class ClassLoaderObjectInputStream extends ObjectInputStream {
-    private ClassLoader classLoader;
-
-    protected ClassLoaderObjectInputStream(InputStream in, ClassLoader classLoader) throws IOException,
-            SecurityException {
-        super(in);
-        this.classLoader = classLoader;
-    }
-
-    protected Class<?> resolveClass(ObjectStreamClass desc) throws ClassNotFoundException {
-        return Class.forName(desc.getName(), false, classLoader);
-    }
-}
\ No newline at end of file
diff --git a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
index 3690ed7..9733d94 100644
--- a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
+++ b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
@@ -17,6 +17,7 @@
 import java.io.File;
 import java.io.InputStream;
 import java.io.OutputStream;
+import java.io.Serializable;
 import java.net.InetAddress;
 import java.nio.ByteBuffer;
 import java.rmi.registry.LocateRegistry;
@@ -76,6 +77,7 @@
 import edu.uci.ics.hyracks.control.common.AbstractRemoteService;
 import edu.uci.ics.hyracks.control.common.application.ApplicationContext;
 import edu.uci.ics.hyracks.control.common.context.ServerContext;
+import edu.uci.ics.hyracks.control.nc.application.NCApplicationContext;
 import edu.uci.ics.hyracks.control.nc.comm.ConnectionManager;
 import edu.uci.ics.hyracks.control.nc.comm.DemuxDataReceiveListenerFactory;
 import edu.uci.ics.hyracks.control.nc.runtime.DelegateHyracksContext;
@@ -107,7 +109,7 @@
 
     private final ServerContext serverCtx;
 
-    private final Map<String, ApplicationContext> applications;
+    private final Map<String, NCApplicationContext> applications;
 
     public NodeControllerService(NCConfig ncConfig) throws Exception {
         this.ncConfig = ncConfig;
@@ -123,7 +125,7 @@
         timer = new Timer(true);
         serverCtx = new ServerContext(ServerContext.ServerType.NODE_CONTROLLER, new File(new File(
                 NodeControllerService.class.getName()), id));
-        applications = new Hashtable<String, ApplicationContext>();
+        applications = new Hashtable<String, NCApplicationContext>();
     }
 
     private static Logger LOGGER = Logger.getLogger(NodeControllerService.class.getName());
@@ -218,7 +220,8 @@
             Stagelet stagelet = new Stagelet(joblet, stageId, attempt, id);
             joblet.setStagelet(stageId, stagelet);
 
-            IHyracksContext stageletContext = new DelegateHyracksContext(ctx, stagelet.getStageletCounterContext());
+            IHyracksContext stageletContext = new DelegateHyracksContext(ctx, jobId,
+                    stagelet.getStageletCounterContext());
 
             final Map<PortInstanceId, Endpoint> portMap = new HashMap<PortInstanceId, Endpoint>();
             Map<OperatorInstanceId, OperatorRunnable> honMap = stagelet.getOperatorMap();
@@ -328,7 +331,7 @@
 
             final Stagelet stagelet = (Stagelet) ji.getStagelet(stageId);
 
-            final IHyracksContext stageletContext = new DelegateHyracksContext(ctx,
+            final IHyracksContext stageletContext = new DelegateHyracksContext(ctx, jobId,
                     stagelet.getStageletCounterContext());
 
             final JobSpecification spec = plan.getJobSpecification();
@@ -554,13 +557,14 @@
     }
 
     @Override
-    public void createApplication(String appName, boolean deployHar) throws Exception {
-        ApplicationContext appCtx;
+    public void createApplication(String appName, boolean deployHar, byte[] serializedDistributedState)
+            throws Exception {
+        NCApplicationContext appCtx;
         synchronized (applications) {
             if (applications.containsKey(appName)) {
                 throw new HyracksException("Duplicate application with name: " + appName + " being created.");
             }
-            appCtx = new ApplicationContext(serverCtx, appName);
+            appCtx = new NCApplicationContext(serverCtx, appName);
             applications.put(appName, appCtx);
         }
         if (deployHar) {
@@ -577,6 +581,8 @@
                 is.close();
             }
         }
+        appCtx.initializeClassPath();
+        appCtx.setDistributedState((Serializable) appCtx.deserialize(serializedDistributedState));
         appCtx.initialize();
     }
 
diff --git a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/application/NCApplicationContext.java b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/application/NCApplicationContext.java
new file mode 100644
index 0000000..c894b4a
--- /dev/null
+++ b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/application/NCApplicationContext.java
@@ -0,0 +1,33 @@
+package edu.uci.ics.hyracks.control.nc.application;
+
+import java.io.IOException;
+import java.io.Serializable;
+
+import edu.uci.ics.hyracks.api.application.INCApplicationContext;
+import edu.uci.ics.hyracks.api.application.INCBootstrap;
+import edu.uci.ics.hyracks.control.common.application.ApplicationContext;
+import edu.uci.ics.hyracks.control.common.context.ServerContext;
+
+public class NCApplicationContext extends ApplicationContext implements INCApplicationContext {
+    public NCApplicationContext(ServerContext serverCtx, String appName) throws IOException {
+        super(serverCtx, appName);
+    }
+
+    @Override
+    public void setDistributedState(Serializable state) {
+        distributedState = state;
+    }
+
+    @Override
+    protected void start() throws Exception {
+        ((INCBootstrap) bootstrap).setApplicationContext(this);
+        bootstrap.start();
+    }
+
+    @Override
+    protected void stop() throws Exception {
+        if (bootstrap != null) {
+            bootstrap.stop();
+        }
+    }
+}
\ No newline at end of file
diff --git a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/runtime/DelegateHyracksContext.java b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/runtime/DelegateHyracksContext.java
index 12c1a76..963c2e7 100644
--- a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/runtime/DelegateHyracksContext.java
+++ b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/runtime/DelegateHyracksContext.java
@@ -14,6 +14,8 @@
  */
 package edu.uci.ics.hyracks.control.nc.runtime;
 
+import java.util.UUID;
+
 import edu.uci.ics.hyracks.api.context.IHyracksContext;
 import edu.uci.ics.hyracks.api.job.profiling.counters.ICounterContext;
 import edu.uci.ics.hyracks.api.resources.IResourceManager;
@@ -23,9 +25,12 @@
 
     private final ICounterContext counterContext;
 
-    public DelegateHyracksContext(IHyracksContext delegate, ICounterContext counterContext) {
+    private final UUID jobId;
+
+    public DelegateHyracksContext(IHyracksContext delegate, UUID jobId, ICounterContext counterContext) {
         this.delegate = delegate;
         this.counterContext = counterContext;
+        this.jobId = jobId;
     }
 
     @Override
@@ -39,6 +44,11 @@
     }
 
     @Override
+    public UUID getJobId() {
+        return jobId;
+    }
+
+    @Override
     public ICounterContext getCounterContext() {
         return counterContext;
     }
diff --git a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/runtime/RootHyracksContext.java b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/runtime/RootHyracksContext.java
index 7eddfc3..fb7a01d 100644
--- a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/runtime/RootHyracksContext.java
+++ b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/runtime/RootHyracksContext.java
@@ -14,6 +14,8 @@
  */
 package edu.uci.ics.hyracks.control.nc.runtime;
 
+import java.util.UUID;
+
 import edu.uci.ics.hyracks.api.context.IHyracksContext;
 import edu.uci.ics.hyracks.api.job.profiling.counters.ICounterContext;
 import edu.uci.ics.hyracks.api.resources.IResourceManager;
@@ -41,4 +43,9 @@
     public ICounterContext getCounterContext() {
         throw new UnsupportedOperationException();
     }
+
+    @Override
+    public UUID getJobId() {
+        throw new UnsupportedOperationException();
+    }
 }
\ No newline at end of file
diff --git a/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/comm/util/FrameUtils.java b/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/comm/util/FrameUtils.java
index ed34fcb..0d9e2b4 100644
--- a/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/comm/util/FrameUtils.java
+++ b/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/comm/util/FrameUtils.java
@@ -16,6 +16,7 @@
 
 import java.nio.ByteBuffer;
 
+import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
 import edu.uci.ics.hyracks.api.comm.IFrameWriter;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 
@@ -38,4 +39,9 @@
         buffer.position(0);
         buffer.limit(buffer.capacity());
     }
+
+    public static int getAbsoluteFieldStartOffset(IFrameTupleAccessor accessor, int tuple, int field) {
+        return accessor.getTupleStartOffset(tuple) + accessor.getFieldSlotsLength()
+                + accessor.getFieldStartOffset(tuple, field);
+    }
 }
\ No newline at end of file
diff --git a/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/data/HadoopNewPartitionerTuplePartitionComputerFactory.java b/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/data/HadoopNewPartitionerTuplePartitionComputerFactory.java
new file mode 100644
index 0000000..71a3110
--- /dev/null
+++ b/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/data/HadoopNewPartitionerTuplePartitionComputerFactory.java
@@ -0,0 +1,62 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.hadoop.data;
+
+import java.io.DataInputStream;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.Partitioner;
+
+import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
+import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
+import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputer;
+import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputerFactory;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.util.ByteBufferInputStream;
+
+public class HadoopNewPartitionerTuplePartitionComputerFactory<K extends Writable, V extends Writable> extends
+        AbstractClassBasedDelegate<Partitioner<K, V>> implements ITuplePartitionComputerFactory {
+    private static final long serialVersionUID = 1L;
+    private final ISerializerDeserializer<K> keyIO;
+    private final ISerializerDeserializer<V> valueIO;
+
+    public HadoopNewPartitionerTuplePartitionComputerFactory(Class<? extends Partitioner<K, V>> klass,
+            ISerializerDeserializer<K> keyIO, ISerializerDeserializer<V> valueIO) {
+        super(klass);
+        this.keyIO = keyIO;
+        this.valueIO = valueIO;
+    }
+
+    @Override
+    public ITuplePartitionComputer createPartitioner() {
+        return new ITuplePartitionComputer() {
+            private final ByteBufferInputStream bbis = new ByteBufferInputStream();
+            private final DataInputStream dis = new DataInputStream(bbis);
+
+            @Override
+            public int partition(IFrameTupleAccessor accessor, int tIndex, int nParts) throws HyracksDataException {
+                int keyStart = accessor.getTupleStartOffset(tIndex) + accessor.getFieldSlotsLength()
+                        + accessor.getFieldStartOffset(tIndex, 0);
+                bbis.setByteBuffer(accessor.getBuffer(), keyStart);
+                K key = keyIO.deserialize(dis);
+                int valueStart = accessor.getTupleStartOffset(tIndex) + accessor.getFieldSlotsLength()
+                        + accessor.getFieldStartOffset(tIndex, 1);
+                bbis.setByteBuffer(accessor.getBuffer(), valueStart);
+                V value = valueIO.deserialize(dis);
+                return instance.getPartition(key, value, nParts);
+            }
+        };
+    }
+}
\ No newline at end of file
diff --git a/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/data/WritableComparingBinaryComparatorFactory.java b/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/data/WritableComparingBinaryComparatorFactory.java
index d06d6ff..4efa614 100644
--- a/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/data/WritableComparingBinaryComparatorFactory.java
+++ b/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/data/WritableComparingBinaryComparatorFactory.java
@@ -14,7 +14,7 @@
  */
 package edu.uci.ics.hyracks.dataflow.hadoop.data;
 
-import org.apache.hadoop.io.WritableComparator;
+import org.apache.hadoop.io.RawComparator;
 
 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
@@ -23,15 +23,15 @@
 public class WritableComparingBinaryComparatorFactory<T> implements IBinaryComparatorFactory {
     private static final long serialVersionUID = 1L;
 
-    private Class<? extends WritableComparator> cmpClass;
+    private Class<? extends RawComparator<T>> cmpClass;
 
-    public WritableComparingBinaryComparatorFactory(Class<? extends WritableComparator> cmpClass) {
+    public WritableComparingBinaryComparatorFactory(Class<? extends RawComparator<T>> cmpClass) {
         this.cmpClass = cmpClass;
     }
 
     @Override
     public IBinaryComparator createBinaryComparator() {
-        final WritableComparator instance = ReflectionUtils.createInstance(cmpClass);
+        final RawComparator<T> instance = ReflectionUtils.createInstance(cmpClass);
         return new IBinaryComparator() {
             @Override
             public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
diff --git a/hyracks-dataflow-std/.classpath b/hyracks-dataflow-std/.classpath
index 86f50f4..31cf404 100644
--- a/hyracks-dataflow-std/.classpath
+++ b/hyracks-dataflow-std/.classpath
@@ -1,7 +1,7 @@
 <?xml version="1.0" encoding="UTF-8"?>
 <classpath>
 	<classpathentry kind="src" output="target/classes" path="src/main/java"/>
-	<classpathentry kind="src" path="src/test/java"/>
+	<classpathentry kind="src" output="target/test-classes" path="src/test/java"/>
 	<classpathentry kind="con" path="org.eclipse.jdt.launching.JRE_CONTAINER/org.eclipse.jdt.internal.debug.ui.launcher.StandardVMType/JavaSE-1.6"/>
 	<classpathentry kind="con" path="org.maven.ide.eclipse.MAVEN2_CLASSPATH_CONTAINER"/>
 	<classpathentry kind="output" path="target/classes"/>
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/ExternalSortOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/ExternalSortOperatorDescriptor.java
index 228d22b..36a49ee 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/ExternalSortOperatorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/ExternalSortOperatorDescriptor.java
@@ -15,24 +15,13 @@
 package edu.uci.ics.hyracks.dataflow.std.sort;
 
 import java.io.File;
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.io.RandomAccessFile;
 import java.nio.ByteBuffer;
-import java.nio.channels.FileChannel;
-import java.util.ArrayList;
-import java.util.Comparator;
-import java.util.LinkedList;
 import java.util.List;
 
-import edu.uci.ics.hyracks.api.comm.IFrameReader;
-import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
-import edu.uci.ics.hyracks.api.comm.IFrameWriter;
 import edu.uci.ics.hyracks.api.context.IHyracksContext;
 import edu.uci.ics.hyracks.api.dataflow.IActivityGraphBuilder;
 import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
 import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
-import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
 import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
 import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
@@ -40,15 +29,10 @@
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.api.job.IOperatorEnvironment;
 import edu.uci.ics.hyracks.api.job.JobSpecification;
-import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
-import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
-import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
 import edu.uci.ics.hyracks.dataflow.std.base.AbstractActivityNode;
 import edu.uci.ics.hyracks.dataflow.std.base.AbstractOperatorDescriptor;
 import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputSinkOperatorNodePushable;
 import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNodePushable;
-import edu.uci.ics.hyracks.dataflow.std.util.ReferenceEntry;
-import edu.uci.ics.hyracks.dataflow.std.util.ReferencedPriorityQueue;
 
 public class ExternalSortOperatorDescriptor extends AbstractOperatorDescriptor {
     private static final String FRAMESORTER = "framesorter";
@@ -104,61 +88,29 @@
         @Override
         public IOperatorNodePushable createPushRuntime(final IHyracksContext ctx, final IOperatorEnvironment env,
                 IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) {
-            final FrameSorter frameSorter = new FrameSorter(ctx, sortFields, firstKeyNormalizerFactory,
-                    comparatorFactories, recordDescriptors[0]);
-            final int maxSortFrames = framesLimit - 1;
+            final ExternalSortRunGenerator runGen = new ExternalSortRunGenerator(ctx, sortFields,
+                    firstKeyNormalizerFactory, comparatorFactories, recordDescriptors[0], framesLimit);
             IOperatorNodePushable op = new AbstractUnaryInputSinkOperatorNodePushable() {
-                private LinkedList<File> runs;
-
                 @Override
                 public void open() throws HyracksDataException {
-                    runs = new LinkedList<File>();
-                    frameSorter.reset();
+                    runGen.open();
                 }
 
                 @Override
                 public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
-                    if (frameSorter.getFrameCount() >= maxSortFrames) {
-                        flushFramesToRun();
-                    }
-                    frameSorter.insertFrame(buffer);
+                    runGen.nextFrame(buffer);
                 }
 
                 @Override
                 public void close() throws HyracksDataException {
-                    if (frameSorter.getFrameCount() > 0) {
-                        if (runs.size() <= 0) {
-                            frameSorter.sortFrames();
-                            env.set(FRAMESORTER, frameSorter);
-                        } else {
-                            flushFramesToRun();
-                        }
-                    }
-                    env.set(RUNS, runs);
-                }
-
-                private void flushFramesToRun() throws HyracksDataException {
-                    frameSorter.sortFrames();
-                    File runFile;
-                    try {
-                        runFile = ctx.getResourceManager().createFile(
-                                ExternalSortOperatorDescriptor.class.getSimpleName(), ".run");
-                    } catch (IOException e) {
-                        throw new HyracksDataException(e);
-                    }
-                    RunFileWriter writer = new RunFileWriter(runFile);
-                    writer.open();
-                    try {
-                        frameSorter.flushFrames(writer);
-                    } finally {
-                        writer.close();
-                    }
-                    frameSorter.reset();
-                    runs.add(runFile);
+                    runGen.close();
+                    env.set(FRAMESORTER, runGen.getFrameSorter());
+                    env.set(RUNS, runGen.getRuns());
                 }
 
                 @Override
                 public void flush() throws HyracksDataException {
+                    runGen.flush();
                 }
             };
             return op;
@@ -176,283 +128,19 @@
         @Override
         public IOperatorNodePushable createPushRuntime(final IHyracksContext ctx, final IOperatorEnvironment env,
                 IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) {
-            final IBinaryComparator[] comparators = new IBinaryComparator[comparatorFactories.length];
-            for (int i = 0; i < comparatorFactories.length; ++i) {
-                comparators[i] = comparatorFactories[i].createBinaryComparator();
-            }
             IOperatorNodePushable op = new AbstractUnaryOutputSourceOperatorNodePushable() {
-                private List<ByteBuffer> inFrames;
-                private ByteBuffer outFrame;
-                LinkedList<File> runs;
-                private FrameTupleAppender outFrameAppender;
-
                 @Override
                 public void initialize() throws HyracksDataException {
-                    runs = (LinkedList<File>) env.get(RUNS);
-                    writer.open();
-                    try {
-                        if (runs.size() <= 0) {
-                            FrameSorter frameSorter = (FrameSorter) env.get(FRAMESORTER);
-                            if (frameSorter != null) {
-                                frameSorter.flushFrames(writer);
-                            }
-                            env.set(FRAMESORTER, null);
-                        } else {
-                            inFrames = new ArrayList<ByteBuffer>();
-                            outFrame = ctx.getResourceManager().allocateFrame();
-                            outFrameAppender = new FrameTupleAppender(ctx);
-                            outFrameAppender.reset(outFrame, true);
-                            for (int i = 0; i < framesLimit - 1; ++i) {
-                                inFrames.add(ctx.getResourceManager().allocateFrame());
-                            }
-                            int passCount = 0;
-                            while (runs.size() > 0) {
-                                passCount++;
-                                try {
-                                    doPass(runs, passCount);
-                                } catch (Exception e) {
-                                    throw new HyracksDataException(e);
-                                }
-                            }
-                        }
-                    } finally {
-                        writer.close();
-                    }
+                    List<File> runs = (List<File>) env.get(RUNS);
+                    FrameSorter frameSorter = (FrameSorter) env.get(FRAMESORTER);
+                    ExternalSortRunMerger merger = new ExternalSortRunMerger(ctx, frameSorter, runs, sortFields,
+                            comparatorFactories, recordDescriptors[0], framesLimit, writer);
+                    merger.process();
+                    env.set(FRAMESORTER, null);
                     env.set(RUNS, null);
                 }
-
-                // creates a new run from runs that can fit in memory.
-                private void doPass(LinkedList<File> runs, int passCount) throws HyracksDataException, IOException {
-                    File newRun = null;
-                    IFrameWriter writer = this.writer;
-                    boolean finalPass = false;
-                    if (runs.size() + 1 <= framesLimit) { // + 1 outFrame
-                        finalPass = true;
-                        for (int i = inFrames.size() - 1; i >= runs.size(); i--) {
-                            inFrames.remove(i);
-                        }
-                    } else {
-                        newRun = ctx.getResourceManager().createFile(
-                                ExternalSortOperatorDescriptor.class.getSimpleName(), ".run");
-                        writer = new RunFileWriter(newRun);
-                        writer.open();
-                    }
-                    try {
-                        RunFileReader[] runCursors = new RunFileReader[inFrames.size()];
-                        FrameTupleAccessor[] tupleAccessors = new FrameTupleAccessor[inFrames.size()];
-                        Comparator<ReferenceEntry> comparator = createEntryComparator(comparators);
-                        ReferencedPriorityQueue topTuples = new ReferencedPriorityQueue(ctx, recordDescriptors[0],
-                                inFrames.size(), comparator);
-                        int[] tupleIndexes = new int[inFrames.size()];
-                        for (int i = 0; i < inFrames.size(); i++) {
-                            tupleIndexes[i] = 0;
-                            int runIndex = topTuples.peek().getRunid();
-                            runCursors[runIndex] = new RunFileReader(runs.get(runIndex));
-                            runCursors[runIndex].open();
-                            if (runCursors[runIndex].nextFrame(inFrames.get(runIndex))) {
-                                tupleAccessors[runIndex] = new FrameTupleAccessor(ctx, recordDescriptors[0]);
-                                tupleAccessors[runIndex].reset(inFrames.get(runIndex));
-                                setNextTopTuple(runIndex, tupleIndexes, runCursors, tupleAccessors, topTuples);
-                            } else {
-                                closeRun(runIndex, runCursors, tupleAccessors);
-                            }
-                        }
-
-                        while (!topTuples.areRunsExhausted()) {
-                            ReferenceEntry top = topTuples.peek();
-                            int runIndex = top.getRunid();
-                            FrameTupleAccessor fta = top.getAccessor();
-                            int tupleIndex = top.getTupleIndex();
-
-                            if (!outFrameAppender.append(fta, tupleIndex)) {
-                                FrameUtils.flushFrame(outFrame, writer);
-                                outFrameAppender.reset(outFrame, true);
-                                if (!outFrameAppender.append(fta, tupleIndex)) {
-                                    throw new IllegalStateException();
-                                }
-                            }
-
-                            ++tupleIndexes[runIndex];
-                            setNextTopTuple(runIndex, tupleIndexes, runCursors, tupleAccessors, topTuples);
-                        }
-                        if (outFrameAppender.getTupleCount() > 0) {
-                            FrameUtils.flushFrame(outFrame, writer);
-                            outFrameAppender.reset(outFrame, true);
-                        }
-                        runs.subList(0, inFrames.size()).clear();
-                        if (!finalPass) {
-                            runs.add(0, newRun);
-                        }
-                    } finally {
-                        if (!finalPass) {
-                            writer.close();
-                        }
-                    }
-                }
-
-                private void setNextTopTuple(int runIndex, int[] tupleIndexes, RunFileReader[] runCursors,
-                        FrameTupleAccessor[] tupleAccessors, ReferencedPriorityQueue topTuples) throws IOException {
-                    boolean exists = hasNextTuple(runIndex, tupleIndexes, runCursors, tupleAccessors);
-                    if (exists) {
-                        topTuples.popAndReplace(tupleAccessors[runIndex], tupleIndexes[runIndex]);
-                    } else {
-                        topTuples.pop();
-                        closeRun(runIndex, runCursors, tupleAccessors);
-                    }
-                }
-
-                private boolean hasNextTuple(int runIndex, int[] tupleIndexes, RunFileReader[] runCursors,
-                        FrameTupleAccessor[] tupleAccessors) throws IOException {
-                    if (tupleAccessors[runIndex] == null || runCursors[runIndex] == null) {
-                        return false;
-                    } else if (tupleIndexes[runIndex] >= tupleAccessors[runIndex].getTupleCount()) {
-                        ByteBuffer buf = tupleAccessors[runIndex].getBuffer(); // same-as-inFrames.get(runIndex)
-                        if (runCursors[runIndex].nextFrame(buf)) {
-                            tupleIndexes[runIndex] = 0;
-                            return hasNextTuple(runIndex, tupleIndexes, runCursors, tupleAccessors);
-                        } else {
-                            return false;
-                        }
-                    } else {
-                        return true;
-                    }
-                }
-
-                private void closeRun(int index, RunFileReader[] runCursors, IFrameTupleAccessor[] tupleAccessor)
-                        throws HyracksDataException {
-                    runCursors[index].close();
-                    runCursors[index] = null;
-                    tupleAccessor[index] = null;
-                }
             };
             return op;
         }
     }
-
-    private Comparator<ReferenceEntry> createEntryComparator(final IBinaryComparator[] comparators) {
-        return new Comparator<ReferenceEntry>() {
-            public int compare(ReferenceEntry tp1, ReferenceEntry tp2) {
-                FrameTupleAccessor fta1 = (FrameTupleAccessor) tp1.getAccessor();
-                FrameTupleAccessor fta2 = (FrameTupleAccessor) tp2.getAccessor();
-                int j1 = (Integer) tp1.getTupleIndex();
-                int j2 = (Integer) tp2.getTupleIndex();
-                byte[] b1 = fta1.getBuffer().array();
-                byte[] b2 = fta2.getBuffer().array();
-                for (int f = 0; f < sortFields.length; ++f) {
-                    int fIdx = sortFields[f];
-                    int s1 = fta1.getTupleStartOffset(j1) + fta1.getFieldSlotsLength()
-                            + fta1.getFieldStartOffset(j1, fIdx);
-                    int l1 = fta1.getFieldEndOffset(j1, fIdx) - fta1.getFieldStartOffset(j1, fIdx);
-                    int s2 = fta2.getTupleStartOffset(j2) + fta2.getFieldSlotsLength()
-                            + fta2.getFieldStartOffset(j2, fIdx);
-                    int l2 = fta2.getFieldEndOffset(j2, fIdx) - fta2.getFieldStartOffset(j2, fIdx);
-                    int c = comparators[f].compare(b1, s1, l1, b2, s2, l2);
-                    if (c != 0) {
-                        return c;
-                    }
-                }
-                return 0;
-            }
-        };
-    }
-
-    private class RunFileWriter implements IFrameWriter {
-        private final File file;
-        private FileChannel channel;
-
-        public RunFileWriter(File file) {
-            this.file = file;
-        }
-
-        @Override
-        public void open() throws HyracksDataException {
-            RandomAccessFile raf;
-            try {
-                raf = new RandomAccessFile(file, "rw");
-            } catch (FileNotFoundException e) {
-                throw new HyracksDataException(e);
-            }
-            channel = raf.getChannel();
-        }
-
-        @Override
-        public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
-            int remain = buffer.capacity();
-            while (remain > 0) {
-                int len;
-                try {
-                    len = channel.write(buffer);
-                } catch (IOException e) {
-                    throw new HyracksDataException(e);
-                }
-                if (len < 0) {
-                    throw new HyracksDataException("Error writing data");
-                }
-                remain -= len;
-            }
-        }
-
-        @Override
-        public void close() throws HyracksDataException {
-            if (channel != null) {
-                try {
-                    channel.close();
-                } catch (IOException e) {
-                    throw new HyracksDataException(e);
-                }
-            }
-        }
-
-        @Override
-        public void flush() throws HyracksDataException {
-        }
-    }
-
-    public static class RunFileReader implements IFrameReader {
-        private final File file;
-        private FileChannel channel;
-
-        public RunFileReader(File file) throws FileNotFoundException {
-            this.file = file;
-        }
-
-        @Override
-        public void open() throws HyracksDataException {
-            RandomAccessFile raf;
-            try {
-                raf = new RandomAccessFile(file, "r");
-            } catch (FileNotFoundException e) {
-                throw new HyracksDataException(e);
-            }
-            channel = raf.getChannel();
-        }
-
-        @Override
-        public boolean nextFrame(ByteBuffer buffer) throws HyracksDataException {
-            buffer.clear();
-            int remain = buffer.capacity();
-            while (remain > 0) {
-                int len;
-                try {
-                    len = channel.read(buffer);
-                } catch (IOException e) {
-                    throw new HyracksDataException(e);
-                }
-                if (len < 0) {
-                    return false;
-                }
-                remain -= len;
-            }
-            return true;
-        }
-
-        @Override
-        public void close() throws HyracksDataException {
-            try {
-                channel.close();
-            } catch (IOException e) {
-                throw new HyracksDataException(e);
-            }
-        }
-    }
 }
\ No newline at end of file
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/ExternalSortRunGenerator.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/ExternalSortRunGenerator.java
new file mode 100644
index 0000000..e2a08c2
--- /dev/null
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/ExternalSortRunGenerator.java
@@ -0,0 +1,100 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.std.sort;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.LinkedList;
+import java.util.List;
+
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksContext;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+
+public class ExternalSortRunGenerator implements IFrameWriter {
+    private final IHyracksContext ctx;
+    private final FrameSorter frameSorter;
+    private final List<File> runs;
+    private final int maxSortFrames;
+
+    public ExternalSortRunGenerator(IHyracksContext ctx, int[] sortFields,
+            INormalizedKeyComputerFactory firstKeyNormalizerFactory, IBinaryComparatorFactory[] comparatorFactories,
+            RecordDescriptor recordDesc, int framesLimit) {
+        this.ctx = ctx;
+        frameSorter = new FrameSorter(ctx, sortFields, firstKeyNormalizerFactory, comparatorFactories, recordDesc);
+        runs = new LinkedList<File>();
+        maxSortFrames = framesLimit - 1;
+    }
+
+    @Override
+    public void open() throws HyracksDataException {
+        runs.clear();
+        frameSorter.reset();
+    }
+
+    @Override
+    public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+        if (frameSorter.getFrameCount() >= maxSortFrames) {
+            flushFramesToRun();
+        }
+        frameSorter.insertFrame(buffer);
+    }
+
+    @Override
+    public void close() throws HyracksDataException {
+        if (frameSorter.getFrameCount() > 0) {
+            if (runs.size() <= 0) {
+                frameSorter.sortFrames();
+            } else {
+                flushFramesToRun();
+            }
+        }
+    }
+
+    private void flushFramesToRun() throws HyracksDataException {
+        frameSorter.sortFrames();
+        File runFile;
+        try {
+            runFile = ctx.getResourceManager().createFile(ExternalSortOperatorDescriptor.class.getSimpleName(), ".run");
+        } catch (IOException e) {
+            throw new HyracksDataException(e);
+        }
+        RunFileWriter writer = new RunFileWriter(runFile);
+        writer.open();
+        try {
+            frameSorter.flushFrames(writer);
+        } finally {
+            writer.close();
+        }
+        frameSorter.reset();
+        runs.add(runFile);
+    }
+
+    @Override
+    public void flush() throws HyracksDataException {
+    }
+
+    public FrameSorter getFrameSorter() {
+        return frameSorter;
+    }
+
+    public List<File> getRuns() {
+        return runs;
+    }
+}
\ No newline at end of file
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/ExternalSortRunMerger.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/ExternalSortRunMerger.java
new file mode 100644
index 0000000..91114b2
--- /dev/null
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/ExternalSortRunMerger.java
@@ -0,0 +1,236 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.std.sort;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+
+import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksContext;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
+import edu.uci.ics.hyracks.dataflow.std.util.ReferenceEntry;
+import edu.uci.ics.hyracks.dataflow.std.util.ReferencedPriorityQueue;
+
+public class ExternalSortRunMerger {
+    private final IHyracksContext ctx;
+    private final FrameSorter frameSorter;
+    private final List<File> runs;
+    private final int[] sortFields;
+    private final IBinaryComparator[] comparators;
+    private final RecordDescriptor recordDesc;
+    private final int framesLimit;
+    private final IFrameWriter writer;
+    private List<ByteBuffer> inFrames;
+    private ByteBuffer outFrame;
+    private FrameTupleAppender outFrameAppender;
+
+    public ExternalSortRunMerger(IHyracksContext ctx, FrameSorter frameSorter, List<File> runs, int[] sortFields,
+            IBinaryComparatorFactory[] comparatorFactories, RecordDescriptor recordDesc, int framesLimit,
+            IFrameWriter writer) {
+        this.ctx = ctx;
+        this.frameSorter = frameSorter;
+        this.runs = runs;
+        this.sortFields = sortFields;
+        comparators = new IBinaryComparator[comparatorFactories.length];
+        for (int i = 0; i < comparatorFactories.length; ++i) {
+            comparators[i] = comparatorFactories[i].createBinaryComparator();
+        }
+        this.recordDesc = recordDesc;
+        this.framesLimit = framesLimit;
+        this.writer = writer;
+    }
+
+    public void process(boolean doFinalPass) throws HyracksDataException {
+        if (doFinalPass) {
+            writer.open();
+        }
+        try {
+            if (runs.size() <= 0) {
+                if (frameSorter != null) {
+                    frameSorter.flushFrames(writer);
+                }
+            } else {
+                inFrames = new ArrayList<ByteBuffer>();
+                outFrame = ctx.getResourceManager().allocateFrame();
+                outFrameAppender = new FrameTupleAppender(ctx);
+                outFrameAppender.reset(outFrame, true);
+                for (int i = 0; i < framesLimit - 1; ++i) {
+                    inFrames.add(ctx.getResourceManager().allocateFrame());
+                }
+                int passCount = 0;
+                while (runs.size() > 0) {
+                    passCount++;
+                    try {
+                        doPass(runs, passCount, doFinalPass);
+                    } catch (Exception e) {
+                        throw new HyracksDataException(e);
+                    }
+                }
+            }
+        } finally {
+            if (doFinalPass) {
+                writer.close();
+            }
+        }
+    }
+
+    public void process() throws HyracksDataException {
+        process(true);
+    }
+
+    // creates a new run from runs that can fit in memory.
+    private void doPass(List<File> runs, int passCount, boolean doFinalPass) throws HyracksDataException, IOException {
+        File newRun = null;
+        IFrameWriter writer = this.writer;
+        boolean finalPass = false;
+        if (runs.size() + 1 <= framesLimit) { // + 1 outFrame
+            if (!doFinalPass) {
+                return;
+            }
+            finalPass = true;
+            for (int i = inFrames.size() - 1; i >= runs.size(); i--) {
+                inFrames.remove(i);
+            }
+        } else {
+            newRun = ctx.getResourceManager().createFile(ExternalSortOperatorDescriptor.class.getSimpleName(), ".run");
+            writer = new RunFileWriter(newRun);
+            writer.open();
+        }
+        try {
+            RunFileReader[] runCursors = new RunFileReader[inFrames.size()];
+            FrameTupleAccessor[] tupleAccessors = new FrameTupleAccessor[inFrames.size()];
+            Comparator<ReferenceEntry> comparator = createEntryComparator(comparators);
+            ReferencedPriorityQueue topTuples = new ReferencedPriorityQueue(ctx, recordDesc, inFrames.size(),
+                    comparator);
+            int[] tupleIndexes = new int[inFrames.size()];
+            for (int i = 0; i < inFrames.size(); i++) {
+                tupleIndexes[i] = 0;
+                int runIndex = topTuples.peek().getRunid();
+                runCursors[runIndex] = new RunFileReader(runs.get(runIndex));
+                runCursors[runIndex].open();
+                if (runCursors[runIndex].nextFrame(inFrames.get(runIndex))) {
+                    tupleAccessors[runIndex] = new FrameTupleAccessor(ctx, recordDesc);
+                    tupleAccessors[runIndex].reset(inFrames.get(runIndex));
+                    setNextTopTuple(runIndex, tupleIndexes, runCursors, tupleAccessors, topTuples);
+                } else {
+                    closeRun(runIndex, runCursors, tupleAccessors);
+                }
+            }
+
+            while (!topTuples.areRunsExhausted()) {
+                ReferenceEntry top = topTuples.peek();
+                int runIndex = top.getRunid();
+                FrameTupleAccessor fta = top.getAccessor();
+                int tupleIndex = top.getTupleIndex();
+
+                if (!outFrameAppender.append(fta, tupleIndex)) {
+                    FrameUtils.flushFrame(outFrame, writer);
+                    outFrameAppender.reset(outFrame, true);
+                    if (!outFrameAppender.append(fta, tupleIndex)) {
+                        throw new IllegalStateException();
+                    }
+                }
+
+                ++tupleIndexes[runIndex];
+                setNextTopTuple(runIndex, tupleIndexes, runCursors, tupleAccessors, topTuples);
+            }
+            if (outFrameAppender.getTupleCount() > 0) {
+                FrameUtils.flushFrame(outFrame, writer);
+                outFrameAppender.reset(outFrame, true);
+            }
+            runs.subList(0, inFrames.size()).clear();
+            if (!finalPass) {
+                runs.add(0, newRun);
+            }
+        } finally {
+            if (!finalPass) {
+                writer.close();
+            }
+        }
+    }
+
+    private void setNextTopTuple(int runIndex, int[] tupleIndexes, RunFileReader[] runCursors,
+            FrameTupleAccessor[] tupleAccessors, ReferencedPriorityQueue topTuples) throws IOException {
+        boolean exists = hasNextTuple(runIndex, tupleIndexes, runCursors, tupleAccessors);
+        if (exists) {
+            topTuples.popAndReplace(tupleAccessors[runIndex], tupleIndexes[runIndex]);
+        } else {
+            topTuples.pop();
+            closeRun(runIndex, runCursors, tupleAccessors);
+        }
+    }
+
+    private boolean hasNextTuple(int runIndex, int[] tupleIndexes, RunFileReader[] runCursors,
+            FrameTupleAccessor[] tupleAccessors) throws IOException {
+        if (tupleAccessors[runIndex] == null || runCursors[runIndex] == null) {
+            return false;
+        } else if (tupleIndexes[runIndex] >= tupleAccessors[runIndex].getTupleCount()) {
+            ByteBuffer buf = tupleAccessors[runIndex].getBuffer(); // same-as-inFrames.get(runIndex)
+            if (runCursors[runIndex].nextFrame(buf)) {
+                tupleIndexes[runIndex] = 0;
+                return hasNextTuple(runIndex, tupleIndexes, runCursors, tupleAccessors);
+            } else {
+                return false;
+            }
+        } else {
+            return true;
+        }
+    }
+
+    private void closeRun(int index, RunFileReader[] runCursors, IFrameTupleAccessor[] tupleAccessor)
+            throws HyracksDataException {
+        runCursors[index].close();
+        runCursors[index] = null;
+        tupleAccessor[index] = null;
+    }
+
+    private Comparator<ReferenceEntry> createEntryComparator(final IBinaryComparator[] comparators) {
+        return new Comparator<ReferenceEntry>() {
+            public int compare(ReferenceEntry tp1, ReferenceEntry tp2) {
+                FrameTupleAccessor fta1 = (FrameTupleAccessor) tp1.getAccessor();
+                FrameTupleAccessor fta2 = (FrameTupleAccessor) tp2.getAccessor();
+                int j1 = tp1.getTupleIndex();
+                int j2 = tp2.getTupleIndex();
+                byte[] b1 = fta1.getBuffer().array();
+                byte[] b2 = fta2.getBuffer().array();
+                for (int f = 0; f < sortFields.length; ++f) {
+                    int fIdx = sortFields[f];
+                    int s1 = fta1.getTupleStartOffset(j1) + fta1.getFieldSlotsLength()
+                            + fta1.getFieldStartOffset(j1, fIdx);
+                    int l1 = fta1.getFieldEndOffset(j1, fIdx) - fta1.getFieldStartOffset(j1, fIdx);
+                    int s2 = fta2.getTupleStartOffset(j2) + fta2.getFieldSlotsLength()
+                            + fta2.getFieldStartOffset(j2, fIdx);
+                    int l2 = fta2.getFieldEndOffset(j2, fIdx) - fta2.getFieldStartOffset(j2, fIdx);
+                    int c = comparators[f].compare(b1, s1, l1, b2, s2, l2);
+                    if (c != 0) {
+                        return c;
+                    }
+                }
+                return 0;
+            }
+        };
+    }
+}
\ No newline at end of file
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/RunFileReader.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/RunFileReader.java
new file mode 100644
index 0000000..f31100e
--- /dev/null
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/RunFileReader.java
@@ -0,0 +1,59 @@
+package edu.uci.ics.hyracks.dataflow.std.sort;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+
+import edu.uci.ics.hyracks.api.comm.IFrameReader;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+
+public class RunFileReader implements IFrameReader {
+    private final File file;
+    private FileChannel channel;
+
+    public RunFileReader(File file) throws FileNotFoundException {
+        this.file = file;
+    }
+
+    @Override
+    public void open() throws HyracksDataException {
+        RandomAccessFile raf;
+        try {
+            raf = new RandomAccessFile(file, "r");
+        } catch (FileNotFoundException e) {
+            throw new HyracksDataException(e);
+        }
+        channel = raf.getChannel();
+    }
+
+    @Override
+    public boolean nextFrame(ByteBuffer buffer) throws HyracksDataException {
+        buffer.clear();
+        int remain = buffer.capacity();
+        while (remain > 0) {
+            int len;
+            try {
+                len = channel.read(buffer);
+            } catch (IOException e) {
+                throw new HyracksDataException(e);
+            }
+            if (len < 0) {
+                return false;
+            }
+            remain -= len;
+        }
+        return true;
+    }
+
+    @Override
+    public void close() throws HyracksDataException {
+        try {
+            channel.close();
+        } catch (IOException e) {
+            throw new HyracksDataException(e);
+        }
+    }
+}
\ No newline at end of file
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/RunFileWriter.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/RunFileWriter.java
new file mode 100644
index 0000000..fd0a03e
--- /dev/null
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/RunFileWriter.java
@@ -0,0 +1,77 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.std.sort;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+
+public class RunFileWriter implements IFrameWriter {
+    private final File file;
+    private FileChannel channel;
+
+    public RunFileWriter(File file) {
+        this.file = file;
+    }
+
+    @Override
+    public void open() throws HyracksDataException {
+        RandomAccessFile raf;
+        try {
+            raf = new RandomAccessFile(file, "rw");
+        } catch (FileNotFoundException e) {
+            throw new HyracksDataException(e);
+        }
+        channel = raf.getChannel();
+    }
+
+    @Override
+    public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+        int remain = buffer.capacity();
+        while (remain > 0) {
+            int len;
+            try {
+                len = channel.write(buffer);
+            } catch (IOException e) {
+                throw new HyracksDataException(e);
+            }
+            if (len < 0) {
+                throw new HyracksDataException("Error writing data");
+            }
+            remain -= len;
+        }
+    }
+
+    @Override
+    public void close() throws HyracksDataException {
+        if (channel != null) {
+            try {
+                channel.close();
+            } catch (IOException e) {
+                throw new HyracksDataException(e);
+            }
+        }
+    }
+
+    @Override
+    public void flush() throws HyracksDataException {
+    }
+}
\ No newline at end of file
diff --git a/hyracks-examples/btree-example/btreehelper/src/main/java/edu/uci/ics/hyracks/examples/btree/helper/NCBootstrap.java b/hyracks-examples/btree-example/btreehelper/src/main/java/edu/uci/ics/hyracks/examples/btree/helper/NCBootstrap.java
index 4be1349..e87d8bd 100644
--- a/hyracks-examples/btree-example/btreehelper/src/main/java/edu/uci/ics/hyracks/examples/btree/helper/NCBootstrap.java
+++ b/hyracks-examples/btree-example/btreehelper/src/main/java/edu/uci/ics/hyracks/examples/btree/helper/NCBootstrap.java
@@ -2,14 +2,14 @@
 
 import java.util.logging.Logger;
 
-import edu.uci.ics.hyracks.api.application.IApplicationContext;
-import edu.uci.ics.hyracks.api.application.IBootstrap;
+import edu.uci.ics.hyracks.api.application.INCApplicationContext;
+import edu.uci.ics.hyracks.api.application.INCBootstrap;
 
-public class NCBootstrap implements IBootstrap {
+public class NCBootstrap implements INCBootstrap {
     private static final Logger LOGGER = Logger.getLogger(NCBootstrap.class.getName());
 
-    private IApplicationContext appCtx;
-    
+    private INCApplicationContext appCtx;
+
     @Override
     public void start() throws Exception {
         LOGGER.info("Starting NC Bootstrap");
@@ -24,7 +24,7 @@
     }
 
     @Override
-    public void setApplicationContext(IApplicationContext appCtx) {
+    public void setApplicationContext(INCApplicationContext appCtx) {
         this.appCtx = appCtx;
     }
-}
+}
\ No newline at end of file