IMRU new api and helloworld example

Almost working, one bug left


git-svn-id: https://hyracks.googlecode.com/svn/branches/fullstack_imru@2604 123451ca-8445-de46-9d55-352943316053
diff --git a/imru/imru-core/src/main/java/edu/uci/ics/hyracks/imru/api/IMapFunction.java b/imru/imru-core/src/main/java/edu/uci/ics/hyracks/imru/api/IMapFunction.java
index d914cad..1702967 100644
--- a/imru/imru-core/src/main/java/edu/uci/ics/hyracks/imru/api/IMapFunction.java
+++ b/imru/imru-core/src/main/java/edu/uci/ics/hyracks/imru/api/IMapFunction.java
@@ -1,6 +1,8 @@
 package edu.uci.ics.hyracks.imru.api;
 
+import java.io.OutputStream;
 import java.nio.ByteBuffer;
+import java.util.Iterator;
 
 import edu.uci.ics.hyracks.api.comm.IFrameWriter;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
diff --git a/imru/imru-core/src/main/java/edu/uci/ics/hyracks/imru/api/IMapFunction2.java b/imru/imru-core/src/main/java/edu/uci/ics/hyracks/imru/api/IMapFunction2.java
new file mode 100644
index 0000000..970854c
--- /dev/null
+++ b/imru/imru-core/src/main/java/edu/uci/ics/hyracks/imru/api/IMapFunction2.java
@@ -0,0 +1,11 @@
+package edu.uci.ics.hyracks.imru.api;
+
+import java.nio.ByteBuffer;
+import java.util.Iterator;
+
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+
+public interface IMapFunction2 {
+    void map(Iterator<ByteBuffer> input, IFrameWriter writer) throws HyracksDataException;
+}
diff --git a/imru/imru-core/src/main/java/edu/uci/ics/hyracks/imru/api/IMapFunctionFactory.java b/imru/imru-core/src/main/java/edu/uci/ics/hyracks/imru/api/IMapFunctionFactory.java
index 6eab539..a0a66eb 100644
--- a/imru/imru-core/src/main/java/edu/uci/ics/hyracks/imru/api/IMapFunctionFactory.java
+++ b/imru/imru-core/src/main/java/edu/uci/ics/hyracks/imru/api/IMapFunctionFactory.java
@@ -3,5 +3,7 @@
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
 
 public interface IMapFunctionFactory<Model extends IModel> {
+    boolean useAPI2();
     IMapFunction createMapFunction(IHyracksTaskContext ctx, int cachedDataFrameSize, Model model);
+    IMapFunction2 createMapFunction2(IHyracksTaskContext ctx, int cachedDataFrameSize, Model model);
 }
diff --git a/imru/imru-core/src/main/java/edu/uci/ics/hyracks/imru/api2/ASyncIO.java b/imru/imru-core/src/main/java/edu/uci/ics/hyracks/imru/api2/ASyncIO.java
new file mode 100644
index 0000000..9fc35f3
--- /dev/null
+++ b/imru/imru-core/src/main/java/edu/uci/ics/hyracks/imru/api2/ASyncIO.java
@@ -0,0 +1,67 @@
+package edu.uci.ics.hyracks.imru.api2;
+
+import java.nio.ByteBuffer;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+
+public class ASyncIO {
+    private LinkedList<byte[]> queue = new LinkedList<byte[]>();
+    private boolean more = true;
+
+    public ASyncIO() {
+    }
+
+    public void close() throws HyracksDataException {
+        more = false;
+        synchronized (queue) {
+            queue.notifyAll();
+        }
+    }
+
+    public void add(byte[] data) throws HyracksDataException {
+        synchronized (queue) {
+            queue.addLast(data);
+            queue.notifyAll();
+        }
+    }
+
+    public Iterator<byte[]> getInput() {
+        return new Iterator<byte[]>() {
+            byte[] data;
+
+            @Override
+            public void remove() {
+            }
+
+            @Override
+            public byte[] next() {
+                if (!hasNext())
+                    return null;
+                byte[] data2 = data;
+                data = null;
+                return data2;
+            }
+
+            @Override
+            public boolean hasNext() {
+                try {
+                    if (data == null) {
+                        synchronized (queue) {
+                            while (queue.size() == 0 && more) {
+                                queue.wait();
+                            }
+                            if (queue.size() > 0)
+                                data = queue.removeFirst();
+                        }
+                    }
+                } catch (InterruptedException e) {
+                    e.printStackTrace();
+                }
+                return data != null;
+            }
+        };
+    }
+}
diff --git a/imru/imru-core/src/main/java/edu/uci/ics/hyracks/imru/api2/IMRUJob.java b/imru/imru-core/src/main/java/edu/uci/ics/hyracks/imru/api2/IMRUJob.java
new file mode 100644
index 0000000..226f1b8
--- /dev/null
+++ b/imru/imru-core/src/main/java/edu/uci/ics/hyracks/imru/api2/IMRUJob.java
@@ -0,0 +1,33 @@
+package edu.uci.ics.hyracks.imru.api2;
+
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.Serializable;
+import java.nio.ByteBuffer;
+import java.util.Iterator;
+
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+
+abstract public class IMRUJob<Model, IntermediateResult> implements
+        Serializable {
+    abstract public IntermediateResult map(Iterator<ByteBuffer> input,
+            Model model, int cachedDataFrameSize) throws HyracksDataException;
+
+    abstract public IntermediateResult reduce(Iterator<IntermediateResult> input)
+            throws HyracksDataException;
+
+    abstract public void update(Iterator<IntermediateResult> input, Model model)
+            throws HyracksDataException;
+
+    abstract public Model initModel();
+
+    public abstract int getCachedDataFrameSize();
+
+    public abstract void parse(IHyracksTaskContext ctx, InputStream in,
+            IFrameWriter writer) throws HyracksDataException;
+
+    public abstract boolean shouldTerminate(Model model);
+
+}
diff --git a/imru/imru-core/src/main/java/edu/uci/ics/hyracks/imru/api2/IMRUJob2.java b/imru/imru-core/src/main/java/edu/uci/ics/hyracks/imru/api2/IMRUJob2.java
index c2d2899..2d86880 100644
--- a/imru/imru-core/src/main/java/edu/uci/ics/hyracks/imru/api2/IMRUJob2.java
+++ b/imru/imru-core/src/main/java/edu/uci/ics/hyracks/imru/api2/IMRUJob2.java
@@ -1,57 +1,32 @@
 package edu.uci.ics.hyracks.imru.api2;
 
-import java.io.File;
+import java.io.InputStream;
+import java.io.OutputStream;
 import java.io.Serializable;
 import java.nio.ByteBuffer;
+import java.util.Iterator;
 
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-
-import edu.uci.ics.hyracks.api.client.HyracksConnection;
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.api.job.JobStatus;
-import edu.uci.ics.hyracks.dataflow.std.file.ITupleParserFactory;
-import edu.uci.ics.hyracks.imru.api.IModel;
-import edu.uci.ics.hyracks.imru.base.IJobFactory;
-import edu.uci.ics.hyracks.imru.deserialized.AbstractDeserializingIMRUJobSpecification;
-import edu.uci.ics.hyracks.imru.deserialized.IDeserializedMapFunction;
-import edu.uci.ics.hyracks.imru.deserialized.IDeserializedMapFunctionFactory;
-import edu.uci.ics.hyracks.imru.deserialized.IDeserializedReduceFunction;
-import edu.uci.ics.hyracks.imru.deserialized.IDeserializedReduceFunctionFactory;
-import edu.uci.ics.hyracks.imru.deserialized.IDeserializedUpdateFunction;
-import edu.uci.ics.hyracks.imru.deserialized.IDeserializedUpdateFunctionFactory;
-import edu.uci.ics.hyracks.imru.example.bgd2.LinearModel;
-import edu.uci.ics.hyracks.imru.hadoop.config.ConfigurationFactory;
-import edu.uci.ics.hyracks.imru.jobgen.clusterconfig.ClusterConfig;
-import edu.uci.ics.hyracks.imru.runtime.IMRUDriver;
 
-public interface IMRUJob2<Model extends IModel, T extends Serializable>
-        extends Serializable {
+abstract public class IMRUJob2<Model> implements Serializable {
+    abstract public void map(Iterator<ByteBuffer> input, Model model,
+            OutputStream output,int cachedDataFrameSize) throws HyracksDataException;
+
+    abstract public void reduce(IHyracksTaskContext ctx,
+            Iterator<byte[]> input, OutputStream output)
+            throws HyracksDataException;
+
+    abstract public void update(IHyracksTaskContext ctx,
+            Iterator<byte[]> input, Model model) throws HyracksDataException;
+
+    abstract public Model initModel();
+
     public abstract int getCachedDataFrameSize();
 
-    public abstract ITupleParserFactory getTupleParserFactory();
+    public abstract void parse(IHyracksTaskContext ctx, InputStream in,
+            IFrameWriter writer) throws HyracksDataException;
 
     public abstract boolean shouldTerminate(Model model);
-
-    public abstract void openMap(Model model, int cachedDataFrameSize)
-            throws HyracksDataException;
-
-    public abstract void map(ByteBuffer input, Model model,
-            int cachedDataFrameSize) throws HyracksDataException;
-
-    public abstract T closeMap(Model model, int cachedDataFrameSize)
-            throws HyracksDataException;
-
-    public abstract void openReduce() throws HyracksDataException;
-
-    public abstract void reduce(T input) throws HyracksDataException;
-
-    public abstract T closeReduce() throws HyracksDataException;
-
-    public abstract void openUpdate(Model model) throws HyracksDataException;
-
-    public abstract void update(T input, Model model)
-            throws HyracksDataException;
-
-    public abstract void closeUpdate(Model model) throws HyracksDataException;
 }
diff --git a/imru/imru-core/src/main/java/edu/uci/ics/hyracks/imru/api2/IMRUJobControl.java b/imru/imru-core/src/main/java/edu/uci/ics/hyracks/imru/api2/IMRUJobControl.java
index bb0062a..afebdc9 100644
--- a/imru/imru-core/src/main/java/edu/uci/ics/hyracks/imru/api2/IMRUJobControl.java
+++ b/imru/imru-core/src/main/java/edu/uci/ics/hyracks/imru/api2/IMRUJobControl.java
@@ -1,17 +1,42 @@
 package edu.uci.ics.hyracks.imru.api2;
 
+import java.io.ByteArrayOutputStream;
 import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
 import java.io.Serializable;
 import java.nio.ByteBuffer;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 
 import edu.uci.ics.hyracks.api.client.HyracksConnection;
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.api.job.JobStatus;
+import edu.uci.ics.hyracks.api.util.JavaSerializationUtils;
+import edu.uci.ics.hyracks.control.nc.application.NCApplicationContext;
+import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
+import edu.uci.ics.hyracks.dataflow.std.file.ITupleParser;
 import edu.uci.ics.hyracks.dataflow.std.file.ITupleParserFactory;
+import edu.uci.ics.hyracks.imru.api.IIMRUJobSpecification;
+import edu.uci.ics.hyracks.imru.api.IMapFunction;
+import edu.uci.ics.hyracks.imru.api.IMapFunction2;
+import edu.uci.ics.hyracks.imru.api.IMapFunctionFactory;
 import edu.uci.ics.hyracks.imru.api.IModel;
+import edu.uci.ics.hyracks.imru.api.IReassemblingReduceFunction;
+import edu.uci.ics.hyracks.imru.api.IReassemblingUpdateFunction;
+import edu.uci.ics.hyracks.imru.api.IReduceFunction;
+import edu.uci.ics.hyracks.imru.api.IReduceFunctionFactory;
+import edu.uci.ics.hyracks.imru.api.IUpdateFunction;
+import edu.uci.ics.hyracks.imru.api.IUpdateFunctionFactory;
 import edu.uci.ics.hyracks.imru.base.IJobFactory;
 import edu.uci.ics.hyracks.imru.deserialized.AbstractDeserializingIMRUJobSpecification;
 import edu.uci.ics.hyracks.imru.deserialized.IDeserializedMapFunction;
@@ -20,7 +45,11 @@
 import edu.uci.ics.hyracks.imru.deserialized.IDeserializedReduceFunctionFactory;
 import edu.uci.ics.hyracks.imru.deserialized.IDeserializedUpdateFunction;
 import edu.uci.ics.hyracks.imru.deserialized.IDeserializedUpdateFunctionFactory;
+import edu.uci.ics.hyracks.imru.example.bgd.R;
 import edu.uci.ics.hyracks.imru.hadoop.config.ConfigurationFactory;
+import edu.uci.ics.hyracks.imru.jobgen.GenericAggregationIMRUJobFactory;
+import edu.uci.ics.hyracks.imru.jobgen.NAryAggregationIMRUJobFactory;
+import edu.uci.ics.hyracks.imru.jobgen.NoAggregationIMRUJobFactory;
 import edu.uci.ics.hyracks.imru.jobgen.clusterconfig.ClusterConfig;
 import edu.uci.ics.hyracks.imru.runtime.IMRUDriver;
 
@@ -28,6 +57,9 @@
     public HyracksConnection hcc;
     public Configuration conf = new Configuration();
     public ConfigurationFactory confFactory;
+    IJobFactory jobFactory;
+    private static final int BYTES_IN_INT = 4;
+    private static ExecutorService threadPool = Executors.newCachedThreadPool();
 
     public void connect(String ccHost, int ccPort, String hadoopConfPath,
             String clusterConfPath) throws Exception {
@@ -45,19 +77,52 @@
         confFactory = new ConfigurationFactory(conf);
     }
 
+    public void selectNoAggregation(String examplePaths) {
+        jobFactory = new NoAggregationIMRUJobFactory(examplePaths, confFactory);
+    }
+
+    public void selectGenericAggregation(String examplePaths, int aggCount) {
+        if (aggCount < 1)
+            throw new IllegalArgumentException(
+                    "Must specify a nonnegative aggregator count using the -agg-count option");
+        jobFactory = new GenericAggregationIMRUJobFactory(examplePaths,
+                confFactory, aggCount);
+    }
+
+    public void selectNAryAggregation(String examplePaths, int fanIn) {
+        if (fanIn < 1) {
+            throw new IllegalArgumentException(
+                    "Must specify nonnegative -fan-in");
+        }
+        jobFactory = new NAryAggregationIMRUJobFactory(examplePaths,
+                confFactory, fanIn);
+    }
+
     IMRUDriver<Model> driver;
 
-    static class JobSpecification<Model extends IModel, T extends Serializable>
+    static class JobSpecificationTmp<Model extends IModel, T extends Serializable>
             extends AbstractDeserializingIMRUJobSpecification<Model, T> {
-        IMRUJob2<Model, T> job2;
+        IMRUJobTmp<Model, T> job2;
 
-        public JobSpecification(IMRUJob2<Model, T> job2) {
+        public JobSpecificationTmp(IMRUJobTmp<Model, T> job2) {
             this.job2 = job2;
         }
 
         @Override
         public ITupleParserFactory getTupleParserFactory() {
-            return job2.getTupleParserFactory();
+            return new ITupleParserFactory() {
+                @Override
+                public ITupleParser createTupleParser(
+                        final IHyracksTaskContext ctx) {
+                    return new ITupleParser() {
+                        @Override
+                        public void parse(InputStream in, IFrameWriter writer)
+                                throws HyracksDataException {
+                            job2.parse(ctx, in, writer);
+                        }
+                    };
+                }
+            };
         }
 
         @Override
@@ -147,15 +212,311 @@
         }
     }
 
-    public JobStatus run(IMRUJob2<Model, T> job2, Model initialModel,
-            IJobFactory jobFactory, String tempPath, String app)
-            throws Exception {
-        JobSpecification<Model, T> job = new JobSpecification<Model, T>(job2);
+    public JobStatus run(IMRUJobTmp<Model, T> job2, Model initialModel,
+            String tempPath, String app) throws Exception {
+        JobSpecificationTmp<Model, T> job = new JobSpecificationTmp<Model, T>(
+                job2);
         driver = new IMRUDriver<Model>(hcc, job, initialModel, jobFactory,
                 conf, tempPath, app);
         return driver.run();
     }
 
+    static class JobSpecification2<Model extends IModel> implements
+            IIMRUJobSpecification<Model> {
+        IMRUJob2<Model> job2;
+
+        public JobSpecification2(IMRUJob2<Model> job2) {
+            this.job2 = job2;
+        }
+
+        @Override
+        public ITupleParserFactory getTupleParserFactory() {
+            return new ITupleParserFactory() {
+                @Override
+                public ITupleParser createTupleParser(
+                        final IHyracksTaskContext ctx) {
+                    return new ITupleParser() {
+                        @Override
+                        public void parse(InputStream in, IFrameWriter writer)
+                                throws HyracksDataException {
+                            job2.parse(ctx, in, writer);
+                        }
+                    };
+                }
+            };
+        }
+
+        @Override
+        public int getCachedDataFrameSize() {
+            return job2.getCachedDataFrameSize();
+        }
+
+        public boolean shouldTerminate(Model model) {
+            return job2.shouldTerminate(model);
+        };
+
+        @Override
+        public IMapFunctionFactory<Model> getMapFunctionFactory() {
+            return new IMapFunctionFactory<Model>() {
+                @Override
+                public boolean useAPI2() {
+                    return true;
+                }
+
+                public IMapFunction2 createMapFunction2(
+                        final IHyracksTaskContext ctx,
+                        final int cachedDataFrameSize, final Model model) {
+                    return new IMapFunction2() {
+                        @Override
+                        public void map(Iterator<ByteBuffer> input,
+                                IFrameWriter writer)
+                                throws HyracksDataException {
+                            ByteArrayOutputStream out = new ByteArrayOutputStream();
+                            job2.map(input, model, out, cachedDataFrameSize);
+                            byte[] objectData = out.toByteArray();
+                            serializeToFrames(ctx, writer, objectData);
+                        }
+                    };
+                };
+
+                @Override
+                public IMapFunction createMapFunction(
+                        final IHyracksTaskContext ctx,
+                        final int cachedDataFrameSize, final Model model) {
+                    return null;
+                }
+            };
+        }
+
+        @Override
+        public IReduceFunctionFactory getReduceFunctionFactory() {
+            return new IReduceFunctionFactory() {
+                @Override
+                public IReduceFunction createReduceFunction(
+                        final IHyracksTaskContext ctx) {
+                    return new IReassemblingReduceFunction() {
+                        private IFrameWriter writer;
+                        private ASyncIO io;
+
+                        @Override
+                        public void setFrameWriter(IFrameWriter writer) {
+                            this.writer = writer;
+                        }
+
+                        @Override
+                        public void open() throws HyracksDataException {
+                            io = new ASyncIO();
+                            threadPool.execute(new Runnable() {
+                                @Override
+                                public void run() {
+                                    Iterator<byte[]> input = io.getInput();
+                                    ByteArrayOutputStream out = new ByteArrayOutputStream();
+                                    try {
+                                        job2.reduce(ctx, input, out);
+                                        byte[] objectData = out.toByteArray();
+                                        serializeToFrames(ctx, writer,
+                                                objectData);
+                                    } catch (HyracksDataException e) {
+                                        e.printStackTrace();
+                                    }
+                                }
+                            });
+                        }
+
+                        @Override
+                        public void close() throws HyracksDataException {
+                            io.close();
+                        }
+
+                        @Override
+                        public void reduce(List<ByteBuffer> chunks)
+                                throws HyracksDataException {
+                            byte[] data = deserializeFromChunks(ctx, chunks);
+                            io.add(data);
+                        }
+                    };
+                }
+            };
+        }
+
+        @Override
+        public IUpdateFunctionFactory<Model> getUpdateFunctionFactory() {
+            return new IUpdateFunctionFactory<Model>() {
+
+                @Override
+                public IUpdateFunction createUpdateFunction(
+                        final IHyracksTaskContext ctx, final Model model) {
+                    return new IReassemblingUpdateFunction() {
+                        private ASyncIO io;
+
+                        @Override
+                        public void open() throws HyracksDataException {
+                            io = new ASyncIO();
+                            threadPool.execute(new Runnable() {
+                                @Override
+                                public void run() {
+                                    Iterator<byte[]> input = io.getInput();
+                                    try {
+                                        job2.update(ctx, input, model);
+                                    } catch (HyracksDataException e) {
+                                        e.printStackTrace();
+                                    }
+                                }
+                            });
+                        }
+
+                        @Override
+                        public void close() throws HyracksDataException {
+                            io.close();
+                        }
+
+                        @Override
+                        public void update(List<ByteBuffer> chunks)
+                                throws HyracksDataException {
+                            byte[] data = deserializeFromChunks(ctx, chunks);
+                            io.add(data);
+                        }
+                    };
+                }
+            };
+        }
+    }
+
+    public JobStatus run2(IMRUJob2<Model> job2, Model initialModel,
+            String tempPath, String app) throws Exception {
+        JobSpecification2<Model> job = new JobSpecification2<Model>(job2);
+        driver = new IMRUDriver<Model>(hcc, job, initialModel, jobFactory,
+                conf, tempPath, app);
+        return driver.run();
+    }
+
+    static class Job<Model extends IModel, T extends Serializable> extends
+            IMRUJob2<Model> {
+        IMRUJob<Model, T> job;
+
+        public Job(IMRUJob<Model, T> job) {
+            this.job = job;
+        }
+
+        @Override
+        public int getCachedDataFrameSize() {
+            return job.getCachedDataFrameSize();
+        }
+
+        @Override
+        public Model initModel() {
+            return job.initModel();
+        }
+
+        @Override
+        public void map(Iterator<ByteBuffer> input, Model model,
+                OutputStream output, int cachedDataFrameSize)
+                throws HyracksDataException {
+            T object = job.map(input, model, cachedDataFrameSize);
+            byte[] objectData;
+            try {
+                objectData = JavaSerializationUtils.serialize(object);
+                output.write(objectData);
+                output.close();
+            } catch (IOException e) {
+                throw new HyracksDataException(e);
+            }
+        }
+
+        @Override
+        public void parse(IHyracksTaskContext ctx, InputStream in,
+                IFrameWriter writer) throws HyracksDataException {
+            job.parse(ctx, in, writer);
+        }
+
+        @Override
+        public void reduce(final IHyracksTaskContext ctx,
+                final Iterator<byte[]> input, OutputStream output)
+                throws HyracksDataException {
+            Iterator<T> iterator = new Iterator<T>() {
+                @Override
+                public void remove() {
+                }
+
+                @Override
+                public boolean hasNext() {
+                    return input.hasNext();
+                }
+
+                @Override
+                public T next() {
+                    byte[] objectData = input.next();
+                    if (objectData == null)
+                        return null;
+                    NCApplicationContext appContext = (NCApplicationContext) ctx
+                            .getJobletContext().getApplicationContext();
+                    try {
+                        return (T) appContext.deserialize(objectData);
+                    } catch (Exception e) {
+                        e.printStackTrace();
+                    }
+                    return null;
+                }
+            };
+            T object = job.reduce(iterator);
+            byte[] objectData;
+            try {
+                objectData = JavaSerializationUtils.serialize(object);
+                output.write(objectData);
+                output.close();
+            } catch (IOException e) {
+                throw new HyracksDataException(e);
+            }
+        }
+
+        @Override
+        public boolean shouldTerminate(Model model) {
+            return job.shouldTerminate(model);
+        }
+
+        @Override
+        public void update(final IHyracksTaskContext ctx,
+                final Iterator<byte[]> input, Model model)
+                throws HyracksDataException {
+            Iterator<T> iterator = new Iterator<T>() {
+                @Override
+                public void remove() {
+                }
+
+                @Override
+                public boolean hasNext() {
+                    return input.hasNext();
+                }
+
+                @Override
+                public T next() {
+                    byte[] objectData = input.next();
+                    if (objectData == null)
+                        return null;
+                    NCApplicationContext appContext = (NCApplicationContext) ctx
+                            .getJobletContext().getApplicationContext();
+                    try {
+                        return (T) appContext.deserialize(objectData);
+                    } catch (Exception e) {
+                        e.printStackTrace();
+                    }
+                    return null;
+                }
+            };
+            job.update(iterator, model);
+        }
+    }
+
+    public JobStatus run(final IMRUJob<Model, T> job, String tempPath,
+            String app) throws Exception {
+        Model initialModel = job.initModel();
+        JobSpecification2<Model> job2 = new JobSpecification2<Model>(
+                new Job<Model, T>(job));
+        driver = new IMRUDriver<Model>(hcc, job2, initialModel, jobFactory,
+                conf, tempPath, app);
+        return driver.run();
+    }
+
     /**
      * @return The number of iterations performed.
      */
@@ -169,4 +530,46 @@
     public Model getModel() {
         return driver.getModel();
     }
+
+    @SuppressWarnings("unchecked")
+    private static byte[] deserializeFromChunks(IHyracksTaskContext ctx,
+            List<ByteBuffer> chunks) throws HyracksDataException {
+        int size = chunks.get(0).getInt(0);
+        byte objectData[] = new byte[size];
+        ByteBuffer objectDataByteBuffer = ByteBuffer.wrap(objectData);
+        int remaining = size;
+        // Handle the first chunk separately, since it contains the object size.
+        int length = Math.min(chunks.get(0).array().length - BYTES_IN_INT,
+                remaining);
+        objectDataByteBuffer.put(chunks.get(0).array(), BYTES_IN_INT, length);
+        remaining -= length;
+        // Handle the remaining chunks:
+        for (int i = 1; i < chunks.size(); i++) {
+            length = Math.min(chunks.get(i).array().length, remaining);
+            objectDataByteBuffer.put(chunks.get(i).array(), 0, length);
+            remaining -= length;
+        }
+        return objectData;
+    }
+
+    private static void serializeToFrames(IHyracksTaskContext ctx,
+            IFrameWriter writer, byte[] objectData) throws HyracksDataException {
+        ByteBuffer frame = ctx.allocateFrame();
+        int position = 0;
+        frame.position(0);
+        while (position < objectData.length) {
+            int length = Math.min(objectData.length - position, ctx
+                    .getFrameSize());
+            if (position == 0) {
+                // The first chunk is a special case, since it begins
+                // with an integer containing the length of the
+                // serialized object.
+                length = Math.min(ctx.getFrameSize() - BYTES_IN_INT, length);
+                frame.putInt(objectData.length);
+            }
+            frame.put(objectData, position, length);
+            FrameUtils.flushFrame(frame, writer);
+            position += length;
+        }
+    }
 }
diff --git a/imru/imru-core/src/main/java/edu/uci/ics/hyracks/imru/api2/IMRUJobTmp.java b/imru/imru-core/src/main/java/edu/uci/ics/hyracks/imru/api2/IMRUJobTmp.java
new file mode 100644
index 0000000..089ca90
--- /dev/null
+++ b/imru/imru-core/src/main/java/edu/uci/ics/hyracks/imru/api2/IMRUJobTmp.java
@@ -0,0 +1,62 @@
+package edu.uci.ics.hyracks.imru.api2;
+
+import java.io.File;
+import java.io.InputStream;
+import java.io.Serializable;
+import java.nio.ByteBuffer;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+
+import edu.uci.ics.hyracks.api.client.HyracksConnection;
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.job.JobStatus;
+import edu.uci.ics.hyracks.dataflow.std.file.ITupleParserFactory;
+import edu.uci.ics.hyracks.imru.api.IModel;
+import edu.uci.ics.hyracks.imru.base.IJobFactory;
+import edu.uci.ics.hyracks.imru.deserialized.AbstractDeserializingIMRUJobSpecification;
+import edu.uci.ics.hyracks.imru.deserialized.IDeserializedMapFunction;
+import edu.uci.ics.hyracks.imru.deserialized.IDeserializedMapFunctionFactory;
+import edu.uci.ics.hyracks.imru.deserialized.IDeserializedReduceFunction;
+import edu.uci.ics.hyracks.imru.deserialized.IDeserializedReduceFunctionFactory;
+import edu.uci.ics.hyracks.imru.deserialized.IDeserializedUpdateFunction;
+import edu.uci.ics.hyracks.imru.deserialized.IDeserializedUpdateFunctionFactory;
+import edu.uci.ics.hyracks.imru.example.bgd2.LinearModel;
+import edu.uci.ics.hyracks.imru.hadoop.config.ConfigurationFactory;
+import edu.uci.ics.hyracks.imru.jobgen.clusterconfig.ClusterConfig;
+import edu.uci.ics.hyracks.imru.runtime.IMRUDriver;
+
+public interface IMRUJobTmp<Model extends IModel, T extends Serializable>
+        extends Serializable {
+    public abstract int getCachedDataFrameSize();
+
+//    public abstract ITupleParserFactory getTupleParserFactory();
+
+    public abstract void parse(IHyracksTaskContext ctx,InputStream in, IFrameWriter writer) throws HyracksDataException;
+    
+    public abstract boolean shouldTerminate(Model model);
+
+    public abstract void openMap(Model model, int cachedDataFrameSize)
+            throws HyracksDataException;
+
+    public abstract void map(ByteBuffer input, Model model,
+            int cachedDataFrameSize) throws HyracksDataException;
+
+    public abstract T closeMap(Model model, int cachedDataFrameSize)
+            throws HyracksDataException;
+
+    public abstract void openReduce() throws HyracksDataException;
+
+    public abstract void reduce(T input) throws HyracksDataException;
+
+    public abstract T closeReduce() throws HyracksDataException;
+
+    public abstract void openUpdate(Model model) throws HyracksDataException;
+
+    public abstract void update(T input, Model model)
+            throws HyracksDataException;
+
+    public abstract void closeUpdate(Model model) throws HyracksDataException;
+}
diff --git a/imru/imru-core/src/main/java/edu/uci/ics/hyracks/imru/dataflow/MapOperatorDescriptor.java b/imru/imru-core/src/main/java/edu/uci/ics/hyracks/imru/dataflow/MapOperatorDescriptor.java
index 0ea9eee..8980c1f 100644
--- a/imru/imru-core/src/main/java/edu/uci/ics/hyracks/imru/dataflow/MapOperatorDescriptor.java
+++ b/imru/imru-core/src/main/java/edu/uci/ics/hyracks/imru/dataflow/MapOperatorDescriptor.java
@@ -3,6 +3,7 @@
 import java.io.IOException;
 import java.io.ObjectInputStream;
 import java.nio.ByteBuffer;
+import java.util.Iterator;
 import java.util.logging.Logger;
 
 import org.apache.hadoop.conf.Configuration;
@@ -25,6 +26,8 @@
 import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNodePushable;
 import edu.uci.ics.hyracks.imru.api.IIMRUJobSpecification;
 import edu.uci.ics.hyracks.imru.api.IMapFunction;
+import edu.uci.ics.hyracks.imru.api.IMapFunction2;
+import edu.uci.ics.hyracks.imru.api.IMapFunctionFactory;
 import edu.uci.ics.hyracks.imru.api.IModel;
 import edu.uci.ics.hyracks.imru.base.IConfigurationFactory;
 import edu.uci.ics.hyracks.imru.data.ChunkFrameHelper;
@@ -160,20 +163,55 @@
             RunFileWriter runFileWriter = state.getRunFileWriter();
 
             Log.info("Cached example file size is " + runFileWriter.getFileSize() + " bytes");
-            RunFileReader reader = new RunFileReader(runFileWriter.getFileReference(), ctx.getIOManager(),
+            final RunFileReader reader = new RunFileReader(runFileWriter.getFileReference(), ctx.getIOManager(),
                     runFileWriter.getFileSize());
             //readInReverse
             reader.open();
-            ByteBuffer inputFrame = fileCtx.allocateFrame();
+            final ByteBuffer inputFrame = fileCtx.allocateFrame();
             ChunkFrameHelper chunkFrameHelper = new ChunkFrameHelper(ctx);
-            IMapFunction mapFunction = imruSpec.getMapFunctionFactory().createMapFunction(chunkFrameHelper.getContext(), imruSpec.getCachedDataFrameSize(), model);
-            writer = chunkFrameHelper.wrapWriter(writer, partition);
-            mapFunction.open();
-            mapFunction.setFrameWriter(writer);
-            while (reader.nextFrame(inputFrame)) {
-                mapFunction.map(inputFrame);
+            IMapFunctionFactory<Model> factory=imruSpec.getMapFunctionFactory();
+            if (factory.useAPI2()) {
+                Iterator<ByteBuffer> input=new Iterator<ByteBuffer>() {
+                    boolean read=false;
+                    boolean hasData;
+                    @Override
+                    public void remove() {
+                    }
+                    
+                    @Override
+                    public ByteBuffer next() {
+                        if (!hasNext())
+                             return null;
+                        read=false;
+                        return inputFrame;
+                    }
+                    
+                    @Override
+                    public boolean hasNext() {
+                        try {
+                            if (!read) {
+                                hasData=reader.nextFrame(inputFrame);
+                                read=true;
+                            }
+                        } catch (HyracksDataException e) {
+                            e.printStackTrace();
+                        }
+                        return hasData;
+                    }
+                };
+                writer = chunkFrameHelper.wrapWriter(writer, partition);
+                IMapFunction2 mapFunction = factory.createMapFunction2(chunkFrameHelper.getContext(), imruSpec.getCachedDataFrameSize(), model);
+                mapFunction.map(input, writer);
+            } else {
+                IMapFunction mapFunction = factory.createMapFunction(chunkFrameHelper.getContext(), imruSpec.getCachedDataFrameSize(), model);
+                writer = chunkFrameHelper.wrapWriter(writer, partition);
+                mapFunction.open();
+                mapFunction.setFrameWriter(writer);
+                while (reader.nextFrame(inputFrame)) {
+                    mapFunction.map(inputFrame);
+                }
+                mapFunction.close();
             }
-            mapFunction.close();
             writer.close();
         }
 
diff --git a/imru/imru-core/src/main/java/edu/uci/ics/hyracks/imru/dataflow/ReduceOperatorDescriptor.java b/imru/imru-core/src/main/java/edu/uci/ics/hyracks/imru/dataflow/ReduceOperatorDescriptor.java
index e83b59f..2f52abb 100644
--- a/imru/imru-core/src/main/java/edu/uci/ics/hyracks/imru/dataflow/ReduceOperatorDescriptor.java
+++ b/imru/imru-core/src/main/java/edu/uci/ics/hyracks/imru/dataflow/ReduceOperatorDescriptor.java
@@ -66,6 +66,7 @@
             reduceFunction = imruSpec.getReduceFunctionFactory().createReduceFunction(chunkFrameHelper.getContext());
             writer = chunkFrameHelper.wrapWriter(writer, partition);
             reduceFunction.setFrameWriter(writer);
+            reduceFunction.open();
         }
 
         @Override
diff --git a/imru/imru-core/src/main/java/edu/uci/ics/hyracks/imru/deserialized/AbstractDeserializingIMRUJobSpecification.java b/imru/imru-core/src/main/java/edu/uci/ics/hyracks/imru/deserialized/AbstractDeserializingIMRUJobSpecification.java
index 227c69d..3451ef1 100644
--- a/imru/imru-core/src/main/java/edu/uci/ics/hyracks/imru/deserialized/AbstractDeserializingIMRUJobSpecification.java
+++ b/imru/imru-core/src/main/java/edu/uci/ics/hyracks/imru/deserialized/AbstractDeserializingIMRUJobSpecification.java
@@ -3,6 +3,7 @@
 import java.io.IOException;
 import java.io.Serializable;
 import java.nio.ByteBuffer;
+import java.util.Iterator;
 import java.util.List;
 
 import edu.uci.ics.hyracks.api.comm.IFrameWriter;
@@ -13,6 +14,7 @@
 import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
 import edu.uci.ics.hyracks.imru.api.IIMRUJobSpecification;
 import edu.uci.ics.hyracks.imru.api.IMapFunction;
+import edu.uci.ics.hyracks.imru.api.IMapFunction2;
 import edu.uci.ics.hyracks.imru.api.IMapFunctionFactory;
 import edu.uci.ics.hyracks.imru.api.IModel;
 import edu.uci.ics.hyracks.imru.api.IReassemblingReduceFunction;
@@ -45,12 +47,21 @@
     @Override
     public final IMapFunctionFactory<Model> getMapFunctionFactory() {
         return new IMapFunctionFactory<Model>() {
+            @Override
+            public boolean useAPI2() {
+                return false;
+            }
+
+            public IMapFunction2 createMapFunction2(IHyracksTaskContext ctx,
+                    int cachedDataFrameSize, Model model) {
+                return null;
+            };
 
             @Override
             public IMapFunction createMapFunction(final IHyracksTaskContext ctx, final int cachedDataFrameSize,
                     final Model model) {
                 return new IMapFunction() {
-
+                    
                     private IFrameWriter writer;
                     private IDeserializedMapFunction<T> mapFunction = getDeserializedMapFunctionFactory()
                             .createMapFunction(model, cachedDataFrameSize);
diff --git a/imru/imru-core/src/test/java/edu/uci/ics/hyracks/imru/test/ImruTest.java b/imru/imru-core/src/test/java/edu/uci/ics/hyracks/imru/test/ImruTest.java
index 4647812..307cb29 100644
--- a/imru/imru-core/src/test/java/edu/uci/ics/hyracks/imru/test/ImruTest.java
+++ b/imru/imru-core/src/test/java/edu/uci/ics/hyracks/imru/test/ImruTest.java
@@ -2,6 +2,8 @@
 
 import java.io.File;
 import java.util.EnumSet;
+import java.util.logging.Handler;
+import java.util.logging.Logger;
 
 import edu.uci.ics.hyracks.api.client.HyracksConnection;
 import edu.uci.ics.hyracks.api.client.IHyracksClientConnection;
@@ -30,7 +32,7 @@
     private static NodeControllerService nc2;
     private static IHyracksClientConnection hcc;
 
-    public static void init() throws Exception {
+    public static void startControllers() throws Exception {
         CCConfig ccConfig = new CCConfig();
         ccConfig.clientNetIpAddress = CC_HOST;
         ccConfig.clusterNetIpAddress = CC_HOST;
@@ -71,6 +73,13 @@
         ClusterConfig.loadClusterConfig(CC_HOST, TEST_HYRACKS_CC_CLIENT_PORT);
     }
 
+    public static void disableLogging() throws Exception {
+        Logger globalLogger = Logger.getLogger("");
+        Handler[] handlers = globalLogger.getHandlers();
+        for (Handler handler : handlers)
+            globalLogger.removeHandler(handler);
+    }
+
     public static void destroyApp(String hyracksAppName) throws Exception {
         hcc.destroyApplication(hyracksAppName);
     }
diff --git a/imru/imru-example/src/main/java/edu/uci/ics/hyracks/imru/example/bgd/BGDMain.java b/imru/imru-example/src/main/java/edu/uci/ics/hyracks/imru/example/bgd/BGDMain.java
index 3e94392..70e0d98 100644
--- a/imru/imru-example/src/main/java/edu/uci/ics/hyracks/imru/example/bgd/BGDMain.java
+++ b/imru/imru-example/src/main/java/edu/uci/ics/hyracks/imru/example/bgd/BGDMain.java
@@ -85,7 +85,7 @@
                         + " -cluster-conf imru/imru-core/src/main/resources/conf/cluster.conf"//
                         + " -example-paths /input/data.txt")
                         .split(" ");
-                ImruTest.init();
+                ImruTest.startControllers();
                 ImruTest.createApp("bgd", new File(
                         "imru/imru-example/src/main/resources/bgd.zip"));
             }
diff --git a/imru/imru-example/src/main/java/edu/uci/ics/hyracks/imru/example/bgd/job/BGDMapFunctionFactory.java b/imru/imru-example/src/main/java/edu/uci/ics/hyracks/imru/example/bgd/job/BGDMapFunctionFactory.java
index defd0b9..9f97d4a 100644
--- a/imru/imru-example/src/main/java/edu/uci/ics/hyracks/imru/example/bgd/job/BGDMapFunctionFactory.java
+++ b/imru/imru-example/src/main/java/edu/uci/ics/hyracks/imru/example/bgd/job/BGDMapFunctionFactory.java
@@ -13,6 +13,7 @@
 import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
 import edu.uci.ics.hyracks.dataflow.common.data.marshalling.FloatSerializerDeserializer;
 import edu.uci.ics.hyracks.imru.api.IMapFunction;
+import edu.uci.ics.hyracks.imru.api.IMapFunction2;
 import edu.uci.ics.hyracks.imru.api.IMapFunctionFactory;
 import edu.uci.ics.hyracks.imru.example.bgd.data.FragmentableFloatArray;
 import edu.uci.ics.hyracks.imru.example.bgd.data.LinearExample;
@@ -20,7 +21,15 @@
 import edu.uci.ics.hyracks.imru.example.bgd.data.RecordDescriptorUtils;
 
 public class BGDMapFunctionFactory implements IMapFunctionFactory<LinearModel> {
-
+     @Override
+    public IMapFunction2 createMapFunction2(IHyracksTaskContext ctx,
+            int cachedDataFrameSize, LinearModel model) {
+        return null;
+    }
+      @Override
+    public boolean useAPI2() {
+        return false;
+    }
     @Override
     public IMapFunction createMapFunction(final IHyracksTaskContext ctx, final int cachedDataFrameSize, final LinearModel model) {
         return new IMapFunction() {
diff --git a/imru/imru-example/src/main/java/edu/uci/ics/hyracks/imru/example/bgd2/BGDMain2.java b/imru/imru-example/src/main/java/edu/uci/ics/hyracks/imru/example/bgd2/BGDMain2.java
index 80c5c67..cdb2a3a 100644
--- a/imru/imru-example/src/main/java/edu/uci/ics/hyracks/imru/example/bgd2/BGDMain2.java
+++ b/imru/imru-example/src/main/java/edu/uci/ics/hyracks/imru/example/bgd2/BGDMain2.java
@@ -1,12 +1,19 @@
 package edu.uci.ics.hyracks.imru.example.bgd2;
 
+import java.io.BufferedReader;
+import java.io.DataOutput;
 import java.io.File;
 import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
 import java.io.PrintWriter;
 import java.nio.ByteBuffer;
+import java.util.Scanner;
 import java.util.logging.Handler;
 import java.util.logging.LogManager;
 import java.util.logging.Logger;
+import java.util.regex.Pattern;
 
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
@@ -15,11 +22,16 @@
 import org.kohsuke.args4j.Option;
 
 import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.api.job.JobStatus;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
 import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
 import edu.uci.ics.hyracks.dataflow.std.file.ITupleParserFactory;
-import edu.uci.ics.hyracks.imru.api2.IMRUJob2;
+import edu.uci.ics.hyracks.imru.api2.IMRUJobTmp;
 import edu.uci.ics.hyracks.imru.api2.IMRUJobControl;
 import edu.uci.ics.hyracks.imru.base.IJobFactory;
 import edu.uci.ics.hyracks.imru.example.bgd.R;
@@ -34,7 +46,7 @@
  * 
  * @author Josh Rosen
  */
-public class BGDMain2 implements IMRUJob2<LinearModel, LossGradient> {
+public class BGDMain2 implements IMRUJobTmp<LinearModel, LossGradient> {
     private final int numFeatures;
 
     public BGDMain2(int numFeatures) {
@@ -50,7 +62,7 @@
     @Override
     public void openMap(LinearModel model, int cachedDataFrameSize)
             throws HyracksDataException {
-        R.p("openMap");
+        R.p("openMap " + this);
         lossGradientMap = new LossGradient();
         lossGradientMap.loss = 0.0f;
         lossGradientMap.gradient = new float[model.numFeatures];
@@ -64,7 +76,7 @@
             throws HyracksDataException {
         accessor.reset(input);
         int tupleCount = accessor.getTupleCount();
-        R.p("map "+tupleCount);
+        R.p("map " + tupleCount + " " + this);
         for (int i = 0; i < tupleCount; i++) {
             example.reset(accessor, i);
             float innerProduct = example.dot(model.weights);
@@ -79,18 +91,19 @@
     @Override
     public LossGradient closeMap(LinearModel model, int cachedDataFrameSize)
             throws HyracksDataException {
-        R.p("closeMap");
+        R.p("closeMap " + this);
         return lossGradientMap;
     }
 
     @Override
     public void openReduce() throws HyracksDataException {
-        R.p("openReduce");
+        R.p("openReduce " + this);
+        new Error().printStackTrace();
     }
 
     @Override
     public void reduce(LossGradient input) throws HyracksDataException {
-        R.p("reduce");
+        R.p("reduce " + this);
         if (lossGradientReduce == null) {
             lossGradientReduce = input;
         } else {
@@ -103,19 +116,19 @@
 
     @Override
     public LossGradient closeReduce() throws HyracksDataException {
-        R.p("closeReduce");
+        R.p("closeReduce " + this);
         return lossGradientReduce;
     }
 
     @Override
     public void openUpdate(LinearModel model) throws HyracksDataException {
-        R.p("openUpdate");
+        R.p("openUpdate " + this);
     }
 
     @Override
     public void update(LossGradient input, LinearModel model)
             throws HyracksDataException {
-        R.p("update");
+        R.p("update " + this);
         if (lossGradientUpdate == null) {
             lossGradientUpdate = input;
         } else {
@@ -128,7 +141,7 @@
 
     @Override
     public void closeUpdate(LinearModel model) throws HyracksDataException {
-        R.p("closeUpdate");
+        R.p("closeUpdate " + this);
         // Update loss
         model.loss = lossGradientUpdate.loss;
         model.loss += model.regularizationConstant * norm(model.weights.array);
@@ -155,7 +168,7 @@
 
     @Override
     public int getCachedDataFrameSize() {
-        return 1024 * 1024;
+        return 4 * 1024;
     }
 
     @Override
@@ -164,8 +177,88 @@
     }
 
     @Override
-    public ITupleParserFactory getTupleParserFactory() {
-        return new LibsvmExampleTupleParserFactory(numFeatures);
+    public void parse(IHyracksTaskContext ctx, InputStream in,
+            IFrameWriter writer) throws HyracksDataException {
+        ByteBuffer frame = ctx.allocateFrame();
+        FrameTupleAppender appender = new FrameTupleAppender(ctx.getFrameSize());
+        appender.reset(frame, true);
+        ArrayTupleBuilder tb = new ArrayTupleBuilder(2);
+        DataOutput dos = tb.getDataOutput();
+        BufferedReader reader = new BufferedReader(new InputStreamReader(in));
+        int activeFeatures = 0;
+        try {
+            Pattern whitespacePattern = Pattern.compile("\\s+");
+            Pattern labelFeaturePattern = Pattern.compile("[:=]");
+            String line;
+            boolean firstLine = true;
+            while (true) {
+                tb.reset();
+                if (firstLine) {
+                    long start = System.currentTimeMillis();
+                    line = reader.readLine();
+                    long end = System.currentTimeMillis();
+                    // LOG.info("First call to reader.readLine() took " + (end -
+                    // start) + " milliseconds");
+                    firstLine = false;
+                } else {
+                    line = reader.readLine();
+                }
+                if (line == null) {
+                    break;
+                }
+                String[] comps = whitespacePattern.split(line, 2);
+
+                // Label
+                // Ignore leading plus sign
+                if (comps[0].charAt(0) == '+') {
+                    comps[0] = comps[0].substring(1);
+                }
+
+                int label = Integer.parseInt(comps[0]);
+                dos.writeInt(label);
+                tb.addFieldEndOffset();
+                Scanner scan = new Scanner(comps[1]);
+                scan.useDelimiter(",|\\s+");
+                while (scan.hasNext()) {
+                    String[] parts = labelFeaturePattern.split(scan.next());
+                    int index = Integer.parseInt(parts[0]);
+                    if (index > numFeatures) {
+                        throw new IndexOutOfBoundsException("Feature index "
+                                + index
+                                + " exceed the declared number of features ("
+                                + numFeatures + ")");
+                    }
+                    // Ignore leading plus sign.
+                    if (parts[1].charAt(0) == '+') {
+                        parts[1] = parts[1].substring(1);
+                    }
+                    float value = Float.parseFloat(parts[1]);
+                    dos.writeInt(index);
+                    dos.writeFloat(value);
+                    activeFeatures++;
+                }
+                dos.writeInt(-1); // Marks the end of the sparse array.
+                tb.addFieldEndOffset();
+                if (!appender.append(tb.getFieldEndOffsets(),
+                        tb.getByteArray(), 0, tb.getSize())) {
+                    FrameUtils.flushFrame(frame, writer);
+                    appender.reset(frame, true);
+                    if (!appender.append(tb.getFieldEndOffsets(), tb
+                            .getByteArray(), 0, tb.getSize())) {
+                        // LOG.severe("Example too large to fit in frame: " +
+                        // line);
+                        throw new IllegalStateException();
+                    }
+                }
+            }
+            if (appender.getTupleCount() > 0) {
+                FrameUtils.flushFrame(frame, writer);
+            }
+        } catch (IOException e) {
+            throw new HyracksDataException(e);
+        }
+        // LOG.info("Parsed input partition containing " + activeFeatures +
+        // " active features");
     }
 
     private static class Options {
@@ -209,11 +302,6 @@
 
     public static void main(String[] args) throws Exception {
         try {
-            Logger globalLogger = Logger.getLogger("");
-            Handler[] handlers = globalLogger.getHandlers();
-            for (Handler handler : handlers) {
-                globalLogger.removeHandler(handler);
-            }
             if (args.length == 0) {
                 args = ("-host localhost"//
                         + " -app bgd"//
@@ -226,9 +314,10 @@
                         + " -model-file /tmp/__imru.txt"//
                         + " -cluster-conf imru/imru-core/src/main/resources/conf/cluster.conf"//
                         + " -example-paths /input/data.txt").split(" ");
-                ImruTest.init();
+                ImruTest.startControllers();
                 ImruTest.createApp("bgd", new File(
-                        "imru/imru-example/src/main/resources/bgd.zip"));
+                        "imru/imru-example/src/main/resources/bootstrap.zip"));
+                ImruTest.disableLogging();
             }
             Options options = new Options();
             CmdLineParser parser = new CmdLineParser(options);
@@ -241,31 +330,19 @@
 
             // copy input files to HDFS
             FileSystem dfs = FileSystem.get(control.conf);
-            for (FileStatus f : dfs.listStatus(new Path("/tmp")))
-                dfs.delete(f.getPath());
+            if (dfs.listStatus(new Path("/tmp")) != null)
+                for (FileStatus f : dfs.listStatus(new Path("/tmp")))
+                    dfs.delete(f.getPath());
             dfs.copyFromLocalFile(new Path("/data/imru/test/data.txt"),
                     new Path("/input/data.txt"));
 
-            IJobFactory jobFactory;
-
             if (options.aggTreeType.equals("none")) {
-                jobFactory = new NoAggregationIMRUJobFactory(
-                        options.examplePaths, control.confFactory);
+                control.selectNoAggregation(options.examplePaths);
             } else if (options.aggTreeType.equals("generic")) {
-                if (options.aggCount < 1) {
-                    throw new IllegalArgumentException(
-                            "Must specify a nonnegative aggregator count using the -agg-count option");
-                }
-                jobFactory = new GenericAggregationIMRUJobFactory(
-                        options.examplePaths, control.confFactory,
+                control.selectGenericAggregation(options.examplePaths,
                         options.aggCount);
             } else if (options.aggTreeType.equals("nary")) {
-                if (options.fanIn < 1) {
-                    throw new IllegalArgumentException(
-                            "Must specify nonnegative -fan-in");
-                }
-                jobFactory = new NAryAggregationIMRUJobFactory(
-                        options.examplePaths, control.confFactory,
+                control.selectNAryAggregation(options.examplePaths,
                         options.fanIn);
             } else {
                 throw new IllegalArgumentException(
@@ -274,8 +351,8 @@
 
             LinearModel initalModel = new LinearModel(8000, options.numRounds);
 
-            JobStatus status = control.run(job, initalModel, jobFactory,
-                    options.tempPath, options.app);
+            JobStatus status = control.run(job, initalModel, options.tempPath,
+                    options.app);
             if (status == JobStatus.FAILURE) {
                 System.err.println("Job failed; see CC and NC logs");
                 System.exit(-1);
diff --git a/imru/imru-example/src/main/java/edu/uci/ics/hyracks/imru/example/bgd2/LibsvmExampleTupleParserFactory.java b/imru/imru-example/src/main/java/edu/uci/ics/hyracks/imru/example/bgd2/LibsvmExampleTupleParserFactory.java
deleted file mode 100644
index 66b8b69..0000000
--- a/imru/imru-example/src/main/java/edu/uci/ics/hyracks/imru/example/bgd2/LibsvmExampleTupleParserFactory.java
+++ /dev/null
@@ -1,113 +0,0 @@
-package edu.uci.ics.hyracks.imru.example.bgd2;
-
-import java.io.BufferedReader;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.InputStreamReader;
-import java.nio.ByteBuffer;
-import java.util.Scanner;
-import java.util.logging.Logger;
-import java.util.regex.Pattern;
-
-import edu.uci.ics.hyracks.api.comm.IFrameWriter;
-import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
-import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
-import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
-import edu.uci.ics.hyracks.dataflow.std.file.ITupleParser;
-import edu.uci.ics.hyracks.dataflow.std.file.ITupleParserFactory;
-
-public class LibsvmExampleTupleParserFactory implements ITupleParserFactory {
-    private static final long serialVersionUID = 1L;
-    final int featureLength;
-    private static Logger LOG = Logger.getLogger(LibsvmExampleTupleParserFactory.class.getName());
-
-    public LibsvmExampleTupleParserFactory(int featureLength) {
-        this.featureLength = featureLength;
-    }
-
-    @Override
-    public ITupleParser createTupleParser(final IHyracksTaskContext ctx) {
-        return new ITupleParser() {
-
-            @Override
-            public void parse(InputStream in, IFrameWriter writer) throws HyracksDataException {
-                ByteBuffer frame = ctx.allocateFrame();
-                FrameTupleAppender appender = new FrameTupleAppender(ctx.getFrameSize());
-                appender.reset(frame, true);
-                ArrayTupleBuilder tb = new ArrayTupleBuilder(2);
-                DataOutput dos = tb.getDataOutput();
-                BufferedReader reader = new BufferedReader(new InputStreamReader(in));
-                int activeFeatures = 0;
-                try {
-                    Pattern whitespacePattern = Pattern.compile("\\s+");
-                    Pattern labelFeaturePattern = Pattern.compile("[:=]");
-                    String line;
-                    boolean firstLine = true;
-                    while (true) {
-                        tb.reset();
-                        if (firstLine) {
-                            long start = System.currentTimeMillis();
-                            line = reader.readLine();
-                            long end = System.currentTimeMillis();
-                            LOG.info("First call to reader.readLine() took " + (end - start) + " milliseconds");
-                            firstLine = false;
-                        } else {
-                            line = reader.readLine();
-                        }
-                        if (line == null) {
-                            break;
-                        }
-                        String[] comps = whitespacePattern.split(line, 2);
-
-                        // Label
-                        // Ignore leading plus sign
-                        if (comps[0].charAt(0) == '+') {
-                            comps[0] = comps[0].substring(1);
-                        }
-
-                        int label = Integer.parseInt(comps[0]);
-                        dos.writeInt(label);
-                        tb.addFieldEndOffset();
-                        Scanner scan = new Scanner(comps[1]);
-                        scan.useDelimiter(",|\\s+");
-                        while (scan.hasNext()) {
-                            String[] parts = labelFeaturePattern.split(scan.next());
-                            int index = Integer.parseInt(parts[0]);
-                            if (index > featureLength) {
-                                throw new IndexOutOfBoundsException("Feature index " + index
-                                        + " exceed the declared number of features (" + featureLength + ")");
-                            }
-                            // Ignore leading plus sign.
-                            if (parts[1].charAt(0) == '+') {
-                                parts[1] = parts[1].substring(1);
-                            }
-                            float value = Float.parseFloat(parts[1]);
-                            dos.writeInt(index);
-                            dos.writeFloat(value);
-                            activeFeatures++;
-                        }
-                        dos.writeInt(-1); // Marks the end of the sparse array.
-                        tb.addFieldEndOffset();
-                        if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
-                            FrameUtils.flushFrame(frame, writer);
-                            appender.reset(frame, true);
-                            if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
-                                LOG.severe("Example too large to fit in frame: " + line);
-                                throw new IllegalStateException();
-                            }
-                        }
-                    }
-                    if (appender.getTupleCount() > 0) {
-                        FrameUtils.flushFrame(frame, writer);
-                    }
-                } catch (IOException e) {
-                    throw new HyracksDataException(e);
-                }
-                LOG.info("Parsed input partition containing " + activeFeatures + " active features");
-            }
-        };
-    }
-}
\ No newline at end of file
diff --git a/imru/imru-example/src/main/java/edu/uci/ics/hyracks/imru/example/helloworld/HelloWorldIncrementalResult.java b/imru/imru-example/src/main/java/edu/uci/ics/hyracks/imru/example/helloworld/HelloWorldIncrementalResult.java
new file mode 100644
index 0000000..4472f11
--- /dev/null
+++ b/imru/imru-example/src/main/java/edu/uci/ics/hyracks/imru/example/helloworld/HelloWorldIncrementalResult.java
@@ -0,0 +1,7 @@
+package edu.uci.ics.hyracks.imru.example.helloworld;
+
+import java.io.Serializable;
+
+public class HelloWorldIncrementalResult implements Serializable {
+    public int length;
+}
diff --git a/imru/imru-example/src/main/java/edu/uci/ics/hyracks/imru/example/helloworld/HelloWorldModel.java b/imru/imru-example/src/main/java/edu/uci/ics/hyracks/imru/example/helloworld/HelloWorldModel.java
new file mode 100644
index 0000000..3728caa
--- /dev/null
+++ b/imru/imru-example/src/main/java/edu/uci/ics/hyracks/imru/example/helloworld/HelloWorldModel.java
@@ -0,0 +1,8 @@
+package edu.uci.ics.hyracks.imru.example.helloworld;
+
+import edu.uci.ics.hyracks.imru.api.IModel;
+
+public class HelloWorldModel implements IModel {
+    public int totalLength;
+    public int roundsRemaining=5;
+}
diff --git a/imru/imru-example/src/main/java/edu/uci/ics/hyracks/imru/example/helloworld/IMRUHelloWorld.java b/imru/imru-example/src/main/java/edu/uci/ics/hyracks/imru/example/helloworld/IMRUHelloWorld.java
new file mode 100644
index 0000000..e48c43c
--- /dev/null
+++ b/imru/imru-example/src/main/java/edu/uci/ics/hyracks/imru/example/helloworld/IMRUHelloWorld.java
@@ -0,0 +1,229 @@
+package edu.uci.ics.hyracks.imru.example.helloworld;
+
+import java.io.BufferedReader;
+import java.io.DataInputStream;
+import java.io.DataOutput;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.nio.ByteBuffer;
+import java.util.Iterator;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.job.JobStatus;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import edu.uci.ics.hyracks.dataflow.common.comm.util.ByteBufferInputStream;
+import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
+import edu.uci.ics.hyracks.imru.api2.IMRUJob;
+import edu.uci.ics.hyracks.imru.api2.IMRUJobTmp;
+import edu.uci.ics.hyracks.imru.api2.IMRUJobControl;
+import edu.uci.ics.hyracks.imru.example.bgd.R;
+import edu.uci.ics.hyracks.imru.test.ImruTest;
+
+public class IMRUHelloWorld extends
+        IMRUJob<HelloWorldModel, HelloWorldIncrementalResult> {
+    static AtomicInteger nextId = new AtomicInteger();
+    int id;
+
+    public IMRUHelloWorld() {
+        this.id = nextId.getAndIncrement();
+    }
+
+    @Override
+    public HelloWorldModel initModel() {
+        return new HelloWorldModel();
+    }
+
+    @Override
+    public int getCachedDataFrameSize() {
+        return 256;
+    }
+
+    /**
+     * Parse input data and create frames
+     */
+    @Override
+    public void parse(IHyracksTaskContext ctx, InputStream in,
+            IFrameWriter writer) throws HyracksDataException {
+        ByteBuffer frame = ctx.allocateFrame();
+        FrameTupleAppender appender = new FrameTupleAppender(ctx.getFrameSize());
+        appender.reset(frame, true);
+        ArrayTupleBuilder tb = new ArrayTupleBuilder(2);
+        DataOutput dos = tb.getDataOutput();
+        try {
+            BufferedReader reader = new BufferedReader(
+                    new InputStreamReader(in));
+            String line = reader.readLine();
+            reader.close();
+            for (String s : line.split(" ")) {
+                // create a new frame
+                appender.reset(frame, true);
+
+                tb.reset();
+                // add one field
+                dos.writeUTF(s);
+                tb.addFieldEndOffset();
+                // add another field
+                dos.writeUTF(s + "_copy");
+                tb.addFieldEndOffset();
+                if (!appender.append(tb.getFieldEndOffsets(),
+                        tb.getByteArray(), 0, tb.getSize())) {
+                    // if frame can't hold this tuple
+                    throw new IllegalStateException(
+                            "Example too large to fit in frame: " + line);
+                }
+                FrameUtils.flushFrame(frame, writer);
+            }
+        } catch (IOException e) {
+            throw new HyracksDataException(e);
+        }
+    }
+
+    @Override
+    public HelloWorldIncrementalResult map(Iterator<ByteBuffer> input,
+            HelloWorldModel model, int cachedDataFrameSize)
+            throws HyracksDataException {
+        HelloWorldIncrementalResult mapResult;
+        IFrameTupleAccessor accessor;
+        R.p("openMap" + id+" model="+ model.totalLength);
+        mapResult = new HelloWorldIncrementalResult();
+        accessor = new FrameTupleAccessor(cachedDataFrameSize,
+                new RecordDescriptor(new ISerializerDeserializer[2]));
+        while (input.hasNext()) {
+            ByteBuffer buf = input.next();
+            try {
+                accessor.reset(buf);
+                int tupleCount = accessor.getTupleCount();
+                ByteBufferInputStream bbis = new ByteBufferInputStream();
+                DataInputStream di = new DataInputStream(bbis);
+                for (int i = 0; i < tupleCount; i++) {
+                    int fieldId = 0;
+                    int startOffset = accessor.getFieldSlotsLength()
+                            + accessor.getTupleStartOffset(i)
+                            + accessor.getFieldStartOffset(i, fieldId);
+                    bbis.setByteBuffer(accessor.getBuffer(), startOffset);
+                    String word = di.readUTF();
+                    R.p("map%d read frame: %s", id, word);
+                    mapResult.length += word.length();
+                }
+            } catch (IOException e) {
+                e.printStackTrace();
+            }
+        }
+        R.p("closeMap" + id + " output=" + mapResult.length);
+        return mapResult;
+    }
+
+    @Override
+    public HelloWorldIncrementalResult reduce(
+            Iterator<HelloWorldIncrementalResult> input)
+            throws HyracksDataException {
+        HelloWorldIncrementalResult reduceResult;
+        R.p("openReduce" + id);
+        reduceResult = new HelloWorldIncrementalResult();
+        while (input.hasNext()) {
+            HelloWorldIncrementalResult result = input.next();
+            R.p("reduce" + id + " input=" + result.length);
+            reduceResult.length += result.length;
+        }
+        R.p("closeReduce" + id + " output=" + reduceResult.length);
+        return reduceResult;
+    }
+
+    @Override
+    public void update(Iterator<HelloWorldIncrementalResult> input,
+            HelloWorldModel model) throws HyracksDataException {
+        R.p("openUpdate" + id + " input=" + model.totalLength);
+        while (input.hasNext()) {
+            HelloWorldIncrementalResult result = input.next();
+            R.p("update" + id);
+            model.totalLength += result.length;
+        }
+        R.p("closeUpdate" + id + " output=" + model.totalLength);
+        model.roundsRemaining--;
+    }
+
+    /**
+     * Return true to exit loop
+     */
+    @Override
+    public boolean shouldTerminate(HelloWorldModel model) {
+        return model.roundsRemaining == 0;
+    }
+
+    public static void main(String[] args) throws Exception {
+        try {
+            boolean debugging = true; // start everything in one process
+            ImruTest.disableLogging();
+            String host = "localhost";
+            int port = 3099;
+            String app = "imru_helloworld";
+
+            // hadoop 0.20.2 need to be started
+            String hadoopConfPath = "/data/imru/hadoop-0.20.2/conf";
+
+            // directory in hadoop HDFS which contains intermediate models
+            String tempPath = "/helloworld";
+
+            // config files which contains node names
+            String clusterConfPath = "imru/imru-core/src/main/resources/conf/cluster.conf";
+
+            // directory in hadoop HDFS which contains input data
+            String examplePaths = "/helloworld/input.txt";
+            if (debugging) {
+                ImruTest.startControllers();
+                // Minimum config file to invoke
+                // edu.uci.ics.hyracks.imru.runtime.bootstrap.IMRUNCBootstrapImpl
+                ImruTest.createApp(app, new File(
+                        "imru/imru-example/src/main/resources/bootstrap.zip"));
+            }
+            IMRUHelloWorld job = new IMRUHelloWorld();
+            IMRUJobControl<HelloWorldModel, HelloWorldIncrementalResult> control = new IMRUJobControl<HelloWorldModel, HelloWorldIncrementalResult>();
+            control.connect(host, port, hadoopConfPath, clusterConfPath);
+
+            // remove old intermediate models
+            FileSystem dfs = FileSystem.get(control.conf);
+            if (dfs.listStatus(new Path(tempPath)) != null)
+                for (FileStatus f : dfs.listStatus(new Path(tempPath)))
+                    dfs.delete(f.getPath());
+
+            // create input file
+            FSDataOutputStream out = dfs.create(new Path(examplePaths), true);
+            out.write("hello world".getBytes());
+            out.close();
+
+            // set aggregation type
+            // control.selectNAryAggregation(examplePaths, 2);
+            control.selectGenericAggregation(examplePaths, 1);
+
+            JobStatus status = control.run(job, tempPath, app);
+            if (status == JobStatus.FAILURE) {
+                System.err.println("Job failed; see CC and NC logs");
+                System.exit(-1);
+            }
+            int iterationCount = control.getIterationCount();
+            HelloWorldModel finalModel = control.getModel();
+            System.out.println("Terminated after " + iterationCount
+                    + " iterations");
+            R.p("FinalModel: " + finalModel.totalLength);
+        } catch (Throwable e) {
+            e.printStackTrace();
+        }
+        System.exit(0);
+    }
+
+}
diff --git a/imru/imru-example/src/main/resources/bgd.zip b/imru/imru-example/src/main/resources/bootstrap.zip
similarity index 100%
rename from imru/imru-example/src/main/resources/bgd.zip
rename to imru/imru-example/src/main/resources/bootstrap.zip
Binary files differ