IMRU new api and helloworld example
Almost working, one bug left
git-svn-id: https://hyracks.googlecode.com/svn/branches/fullstack_imru@2604 123451ca-8445-de46-9d55-352943316053
diff --git a/imru/imru-core/src/main/java/edu/uci/ics/hyracks/imru/api/IMapFunction.java b/imru/imru-core/src/main/java/edu/uci/ics/hyracks/imru/api/IMapFunction.java
index d914cad..1702967 100644
--- a/imru/imru-core/src/main/java/edu/uci/ics/hyracks/imru/api/IMapFunction.java
+++ b/imru/imru-core/src/main/java/edu/uci/ics/hyracks/imru/api/IMapFunction.java
@@ -1,6 +1,8 @@
package edu.uci.ics.hyracks.imru.api;
+import java.io.OutputStream;
import java.nio.ByteBuffer;
+import java.util.Iterator;
import edu.uci.ics.hyracks.api.comm.IFrameWriter;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
diff --git a/imru/imru-core/src/main/java/edu/uci/ics/hyracks/imru/api/IMapFunction2.java b/imru/imru-core/src/main/java/edu/uci/ics/hyracks/imru/api/IMapFunction2.java
new file mode 100644
index 0000000..970854c
--- /dev/null
+++ b/imru/imru-core/src/main/java/edu/uci/ics/hyracks/imru/api/IMapFunction2.java
@@ -0,0 +1,11 @@
+package edu.uci.ics.hyracks.imru.api;
+
+import java.nio.ByteBuffer;
+import java.util.Iterator;
+
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+
+public interface IMapFunction2 {
+ void map(Iterator<ByteBuffer> input, IFrameWriter writer) throws HyracksDataException;
+}
diff --git a/imru/imru-core/src/main/java/edu/uci/ics/hyracks/imru/api/IMapFunctionFactory.java b/imru/imru-core/src/main/java/edu/uci/ics/hyracks/imru/api/IMapFunctionFactory.java
index 6eab539..a0a66eb 100644
--- a/imru/imru-core/src/main/java/edu/uci/ics/hyracks/imru/api/IMapFunctionFactory.java
+++ b/imru/imru-core/src/main/java/edu/uci/ics/hyracks/imru/api/IMapFunctionFactory.java
@@ -3,5 +3,7 @@
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
public interface IMapFunctionFactory<Model extends IModel> {
+ boolean useAPI2();
IMapFunction createMapFunction(IHyracksTaskContext ctx, int cachedDataFrameSize, Model model);
+ IMapFunction2 createMapFunction2(IHyracksTaskContext ctx, int cachedDataFrameSize, Model model);
}
diff --git a/imru/imru-core/src/main/java/edu/uci/ics/hyracks/imru/api2/ASyncIO.java b/imru/imru-core/src/main/java/edu/uci/ics/hyracks/imru/api2/ASyncIO.java
new file mode 100644
index 0000000..9fc35f3
--- /dev/null
+++ b/imru/imru-core/src/main/java/edu/uci/ics/hyracks/imru/api2/ASyncIO.java
@@ -0,0 +1,67 @@
+package edu.uci.ics.hyracks.imru.api2;
+
+import java.nio.ByteBuffer;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+
+public class ASyncIO {
+ private LinkedList<byte[]> queue = new LinkedList<byte[]>();
+ private boolean more = true;
+
+ public ASyncIO() {
+ }
+
+ public void close() throws HyracksDataException {
+ more = false;
+ synchronized (queue) {
+ queue.notifyAll();
+ }
+ }
+
+ public void add(byte[] data) throws HyracksDataException {
+ synchronized (queue) {
+ queue.addLast(data);
+ queue.notifyAll();
+ }
+ }
+
+ public Iterator<byte[]> getInput() {
+ return new Iterator<byte[]>() {
+ byte[] data;
+
+ @Override
+ public void remove() {
+ }
+
+ @Override
+ public byte[] next() {
+ if (!hasNext())
+ return null;
+ byte[] data2 = data;
+ data = null;
+ return data2;
+ }
+
+ @Override
+ public boolean hasNext() {
+ try {
+ if (data == null) {
+ synchronized (queue) {
+ while (queue.size() == 0 && more) {
+ queue.wait();
+ }
+ if (queue.size() > 0)
+ data = queue.removeFirst();
+ }
+ }
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ return data != null;
+ }
+ };
+ }
+}
diff --git a/imru/imru-core/src/main/java/edu/uci/ics/hyracks/imru/api2/IMRUJob.java b/imru/imru-core/src/main/java/edu/uci/ics/hyracks/imru/api2/IMRUJob.java
new file mode 100644
index 0000000..226f1b8
--- /dev/null
+++ b/imru/imru-core/src/main/java/edu/uci/ics/hyracks/imru/api2/IMRUJob.java
@@ -0,0 +1,33 @@
+package edu.uci.ics.hyracks.imru.api2;
+
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.Serializable;
+import java.nio.ByteBuffer;
+import java.util.Iterator;
+
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+
+abstract public class IMRUJob<Model, IntermediateResult> implements
+ Serializable {
+ abstract public IntermediateResult map(Iterator<ByteBuffer> input,
+ Model model, int cachedDataFrameSize) throws HyracksDataException;
+
+ abstract public IntermediateResult reduce(Iterator<IntermediateResult> input)
+ throws HyracksDataException;
+
+ abstract public void update(Iterator<IntermediateResult> input, Model model)
+ throws HyracksDataException;
+
+ abstract public Model initModel();
+
+ public abstract int getCachedDataFrameSize();
+
+ public abstract void parse(IHyracksTaskContext ctx, InputStream in,
+ IFrameWriter writer) throws HyracksDataException;
+
+ public abstract boolean shouldTerminate(Model model);
+
+}
diff --git a/imru/imru-core/src/main/java/edu/uci/ics/hyracks/imru/api2/IMRUJob2.java b/imru/imru-core/src/main/java/edu/uci/ics/hyracks/imru/api2/IMRUJob2.java
index c2d2899..2d86880 100644
--- a/imru/imru-core/src/main/java/edu/uci/ics/hyracks/imru/api2/IMRUJob2.java
+++ b/imru/imru-core/src/main/java/edu/uci/ics/hyracks/imru/api2/IMRUJob2.java
@@ -1,57 +1,32 @@
package edu.uci.ics.hyracks.imru.api2;
-import java.io.File;
+import java.io.InputStream;
+import java.io.OutputStream;
import java.io.Serializable;
import java.nio.ByteBuffer;
+import java.util.Iterator;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-
-import edu.uci.ics.hyracks.api.client.HyracksConnection;
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.api.job.JobStatus;
-import edu.uci.ics.hyracks.dataflow.std.file.ITupleParserFactory;
-import edu.uci.ics.hyracks.imru.api.IModel;
-import edu.uci.ics.hyracks.imru.base.IJobFactory;
-import edu.uci.ics.hyracks.imru.deserialized.AbstractDeserializingIMRUJobSpecification;
-import edu.uci.ics.hyracks.imru.deserialized.IDeserializedMapFunction;
-import edu.uci.ics.hyracks.imru.deserialized.IDeserializedMapFunctionFactory;
-import edu.uci.ics.hyracks.imru.deserialized.IDeserializedReduceFunction;
-import edu.uci.ics.hyracks.imru.deserialized.IDeserializedReduceFunctionFactory;
-import edu.uci.ics.hyracks.imru.deserialized.IDeserializedUpdateFunction;
-import edu.uci.ics.hyracks.imru.deserialized.IDeserializedUpdateFunctionFactory;
-import edu.uci.ics.hyracks.imru.example.bgd2.LinearModel;
-import edu.uci.ics.hyracks.imru.hadoop.config.ConfigurationFactory;
-import edu.uci.ics.hyracks.imru.jobgen.clusterconfig.ClusterConfig;
-import edu.uci.ics.hyracks.imru.runtime.IMRUDriver;
-public interface IMRUJob2<Model extends IModel, T extends Serializable>
- extends Serializable {
+abstract public class IMRUJob2<Model> implements Serializable {
+ abstract public void map(Iterator<ByteBuffer> input, Model model,
+ OutputStream output,int cachedDataFrameSize) throws HyracksDataException;
+
+ abstract public void reduce(IHyracksTaskContext ctx,
+ Iterator<byte[]> input, OutputStream output)
+ throws HyracksDataException;
+
+ abstract public void update(IHyracksTaskContext ctx,
+ Iterator<byte[]> input, Model model) throws HyracksDataException;
+
+ abstract public Model initModel();
+
public abstract int getCachedDataFrameSize();
- public abstract ITupleParserFactory getTupleParserFactory();
+ public abstract void parse(IHyracksTaskContext ctx, InputStream in,
+ IFrameWriter writer) throws HyracksDataException;
public abstract boolean shouldTerminate(Model model);
-
- public abstract void openMap(Model model, int cachedDataFrameSize)
- throws HyracksDataException;
-
- public abstract void map(ByteBuffer input, Model model,
- int cachedDataFrameSize) throws HyracksDataException;
-
- public abstract T closeMap(Model model, int cachedDataFrameSize)
- throws HyracksDataException;
-
- public abstract void openReduce() throws HyracksDataException;
-
- public abstract void reduce(T input) throws HyracksDataException;
-
- public abstract T closeReduce() throws HyracksDataException;
-
- public abstract void openUpdate(Model model) throws HyracksDataException;
-
- public abstract void update(T input, Model model)
- throws HyracksDataException;
-
- public abstract void closeUpdate(Model model) throws HyracksDataException;
}
diff --git a/imru/imru-core/src/main/java/edu/uci/ics/hyracks/imru/api2/IMRUJobControl.java b/imru/imru-core/src/main/java/edu/uci/ics/hyracks/imru/api2/IMRUJobControl.java
index bb0062a..afebdc9 100644
--- a/imru/imru-core/src/main/java/edu/uci/ics/hyracks/imru/api2/IMRUJobControl.java
+++ b/imru/imru-core/src/main/java/edu/uci/ics/hyracks/imru/api2/IMRUJobControl.java
@@ -1,17 +1,42 @@
package edu.uci.ics.hyracks.imru.api2;
+import java.io.ByteArrayOutputStream;
import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
import java.io.Serializable;
import java.nio.ByteBuffer;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import edu.uci.ics.hyracks.api.client.HyracksConnection;
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.api.job.JobStatus;
+import edu.uci.ics.hyracks.api.util.JavaSerializationUtils;
+import edu.uci.ics.hyracks.control.nc.application.NCApplicationContext;
+import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
+import edu.uci.ics.hyracks.dataflow.std.file.ITupleParser;
import edu.uci.ics.hyracks.dataflow.std.file.ITupleParserFactory;
+import edu.uci.ics.hyracks.imru.api.IIMRUJobSpecification;
+import edu.uci.ics.hyracks.imru.api.IMapFunction;
+import edu.uci.ics.hyracks.imru.api.IMapFunction2;
+import edu.uci.ics.hyracks.imru.api.IMapFunctionFactory;
import edu.uci.ics.hyracks.imru.api.IModel;
+import edu.uci.ics.hyracks.imru.api.IReassemblingReduceFunction;
+import edu.uci.ics.hyracks.imru.api.IReassemblingUpdateFunction;
+import edu.uci.ics.hyracks.imru.api.IReduceFunction;
+import edu.uci.ics.hyracks.imru.api.IReduceFunctionFactory;
+import edu.uci.ics.hyracks.imru.api.IUpdateFunction;
+import edu.uci.ics.hyracks.imru.api.IUpdateFunctionFactory;
import edu.uci.ics.hyracks.imru.base.IJobFactory;
import edu.uci.ics.hyracks.imru.deserialized.AbstractDeserializingIMRUJobSpecification;
import edu.uci.ics.hyracks.imru.deserialized.IDeserializedMapFunction;
@@ -20,7 +45,11 @@
import edu.uci.ics.hyracks.imru.deserialized.IDeserializedReduceFunctionFactory;
import edu.uci.ics.hyracks.imru.deserialized.IDeserializedUpdateFunction;
import edu.uci.ics.hyracks.imru.deserialized.IDeserializedUpdateFunctionFactory;
+import edu.uci.ics.hyracks.imru.example.bgd.R;
import edu.uci.ics.hyracks.imru.hadoop.config.ConfigurationFactory;
+import edu.uci.ics.hyracks.imru.jobgen.GenericAggregationIMRUJobFactory;
+import edu.uci.ics.hyracks.imru.jobgen.NAryAggregationIMRUJobFactory;
+import edu.uci.ics.hyracks.imru.jobgen.NoAggregationIMRUJobFactory;
import edu.uci.ics.hyracks.imru.jobgen.clusterconfig.ClusterConfig;
import edu.uci.ics.hyracks.imru.runtime.IMRUDriver;
@@ -28,6 +57,9 @@
public HyracksConnection hcc;
public Configuration conf = new Configuration();
public ConfigurationFactory confFactory;
+ IJobFactory jobFactory;
+ private static final int BYTES_IN_INT = 4;
+ private static ExecutorService threadPool = Executors.newCachedThreadPool();
public void connect(String ccHost, int ccPort, String hadoopConfPath,
String clusterConfPath) throws Exception {
@@ -45,19 +77,52 @@
confFactory = new ConfigurationFactory(conf);
}
+ public void selectNoAggregation(String examplePaths) {
+ jobFactory = new NoAggregationIMRUJobFactory(examplePaths, confFactory);
+ }
+
+ public void selectGenericAggregation(String examplePaths, int aggCount) {
+ if (aggCount < 1)
+ throw new IllegalArgumentException(
+ "Must specify a nonnegative aggregator count using the -agg-count option");
+ jobFactory = new GenericAggregationIMRUJobFactory(examplePaths,
+ confFactory, aggCount);
+ }
+
+ public void selectNAryAggregation(String examplePaths, int fanIn) {
+ if (fanIn < 1) {
+ throw new IllegalArgumentException(
+ "Must specify nonnegative -fan-in");
+ }
+ jobFactory = new NAryAggregationIMRUJobFactory(examplePaths,
+ confFactory, fanIn);
+ }
+
IMRUDriver<Model> driver;
- static class JobSpecification<Model extends IModel, T extends Serializable>
+ static class JobSpecificationTmp<Model extends IModel, T extends Serializable>
extends AbstractDeserializingIMRUJobSpecification<Model, T> {
- IMRUJob2<Model, T> job2;
+ IMRUJobTmp<Model, T> job2;
- public JobSpecification(IMRUJob2<Model, T> job2) {
+ public JobSpecificationTmp(IMRUJobTmp<Model, T> job2) {
this.job2 = job2;
}
@Override
public ITupleParserFactory getTupleParserFactory() {
- return job2.getTupleParserFactory();
+ return new ITupleParserFactory() {
+ @Override
+ public ITupleParser createTupleParser(
+ final IHyracksTaskContext ctx) {
+ return new ITupleParser() {
+ @Override
+ public void parse(InputStream in, IFrameWriter writer)
+ throws HyracksDataException {
+ job2.parse(ctx, in, writer);
+ }
+ };
+ }
+ };
}
@Override
@@ -147,15 +212,311 @@
}
}
- public JobStatus run(IMRUJob2<Model, T> job2, Model initialModel,
- IJobFactory jobFactory, String tempPath, String app)
- throws Exception {
- JobSpecification<Model, T> job = new JobSpecification<Model, T>(job2);
+ public JobStatus run(IMRUJobTmp<Model, T> job2, Model initialModel,
+ String tempPath, String app) throws Exception {
+ JobSpecificationTmp<Model, T> job = new JobSpecificationTmp<Model, T>(
+ job2);
driver = new IMRUDriver<Model>(hcc, job, initialModel, jobFactory,
conf, tempPath, app);
return driver.run();
}
+ static class JobSpecification2<Model extends IModel> implements
+ IIMRUJobSpecification<Model> {
+ IMRUJob2<Model> job2;
+
+ public JobSpecification2(IMRUJob2<Model> job2) {
+ this.job2 = job2;
+ }
+
+ @Override
+ public ITupleParserFactory getTupleParserFactory() {
+ return new ITupleParserFactory() {
+ @Override
+ public ITupleParser createTupleParser(
+ final IHyracksTaskContext ctx) {
+ return new ITupleParser() {
+ @Override
+ public void parse(InputStream in, IFrameWriter writer)
+ throws HyracksDataException {
+ job2.parse(ctx, in, writer);
+ }
+ };
+ }
+ };
+ }
+
+ @Override
+ public int getCachedDataFrameSize() {
+ return job2.getCachedDataFrameSize();
+ }
+
+ public boolean shouldTerminate(Model model) {
+ return job2.shouldTerminate(model);
+ };
+
+ @Override
+ public IMapFunctionFactory<Model> getMapFunctionFactory() {
+ return new IMapFunctionFactory<Model>() {
+ @Override
+ public boolean useAPI2() {
+ return true;
+ }
+
+ public IMapFunction2 createMapFunction2(
+ final IHyracksTaskContext ctx,
+ final int cachedDataFrameSize, final Model model) {
+ return new IMapFunction2() {
+ @Override
+ public void map(Iterator<ByteBuffer> input,
+ IFrameWriter writer)
+ throws HyracksDataException {
+ ByteArrayOutputStream out = new ByteArrayOutputStream();
+ job2.map(input, model, out, cachedDataFrameSize);
+ byte[] objectData = out.toByteArray();
+ serializeToFrames(ctx, writer, objectData);
+ }
+ };
+ };
+
+ @Override
+ public IMapFunction createMapFunction(
+ final IHyracksTaskContext ctx,
+ final int cachedDataFrameSize, final Model model) {
+ return null;
+ }
+ };
+ }
+
+ @Override
+ public IReduceFunctionFactory getReduceFunctionFactory() {
+ return new IReduceFunctionFactory() {
+ @Override
+ public IReduceFunction createReduceFunction(
+ final IHyracksTaskContext ctx) {
+ return new IReassemblingReduceFunction() {
+ private IFrameWriter writer;
+ private ASyncIO io;
+
+ @Override
+ public void setFrameWriter(IFrameWriter writer) {
+ this.writer = writer;
+ }
+
+ @Override
+ public void open() throws HyracksDataException {
+ io = new ASyncIO();
+ threadPool.execute(new Runnable() {
+ @Override
+ public void run() {
+ Iterator<byte[]> input = io.getInput();
+ ByteArrayOutputStream out = new ByteArrayOutputStream();
+ try {
+ job2.reduce(ctx, input, out);
+ byte[] objectData = out.toByteArray();
+ serializeToFrames(ctx, writer,
+ objectData);
+ } catch (HyracksDataException e) {
+ e.printStackTrace();
+ }
+ }
+ });
+ }
+
+ @Override
+ public void close() throws HyracksDataException {
+ io.close();
+ }
+
+ @Override
+ public void reduce(List<ByteBuffer> chunks)
+ throws HyracksDataException {
+ byte[] data = deserializeFromChunks(ctx, chunks);
+ io.add(data);
+ }
+ };
+ }
+ };
+ }
+
+ @Override
+ public IUpdateFunctionFactory<Model> getUpdateFunctionFactory() {
+ return new IUpdateFunctionFactory<Model>() {
+
+ @Override
+ public IUpdateFunction createUpdateFunction(
+ final IHyracksTaskContext ctx, final Model model) {
+ return new IReassemblingUpdateFunction() {
+ private ASyncIO io;
+
+ @Override
+ public void open() throws HyracksDataException {
+ io = new ASyncIO();
+ threadPool.execute(new Runnable() {
+ @Override
+ public void run() {
+ Iterator<byte[]> input = io.getInput();
+ try {
+ job2.update(ctx, input, model);
+ } catch (HyracksDataException e) {
+ e.printStackTrace();
+ }
+ }
+ });
+ }
+
+ @Override
+ public void close() throws HyracksDataException {
+ io.close();
+ }
+
+ @Override
+ public void update(List<ByteBuffer> chunks)
+ throws HyracksDataException {
+ byte[] data = deserializeFromChunks(ctx, chunks);
+ io.add(data);
+ }
+ };
+ }
+ };
+ }
+ }
+
+ public JobStatus run2(IMRUJob2<Model> job2, Model initialModel,
+ String tempPath, String app) throws Exception {
+ JobSpecification2<Model> job = new JobSpecification2<Model>(job2);
+ driver = new IMRUDriver<Model>(hcc, job, initialModel, jobFactory,
+ conf, tempPath, app);
+ return driver.run();
+ }
+
+ static class Job<Model extends IModel, T extends Serializable> extends
+ IMRUJob2<Model> {
+ IMRUJob<Model, T> job;
+
+ public Job(IMRUJob<Model, T> job) {
+ this.job = job;
+ }
+
+ @Override
+ public int getCachedDataFrameSize() {
+ return job.getCachedDataFrameSize();
+ }
+
+ @Override
+ public Model initModel() {
+ return job.initModel();
+ }
+
+ @Override
+ public void map(Iterator<ByteBuffer> input, Model model,
+ OutputStream output, int cachedDataFrameSize)
+ throws HyracksDataException {
+ T object = job.map(input, model, cachedDataFrameSize);
+ byte[] objectData;
+ try {
+ objectData = JavaSerializationUtils.serialize(object);
+ output.write(objectData);
+ output.close();
+ } catch (IOException e) {
+ throw new HyracksDataException(e);
+ }
+ }
+
+ @Override
+ public void parse(IHyracksTaskContext ctx, InputStream in,
+ IFrameWriter writer) throws HyracksDataException {
+ job.parse(ctx, in, writer);
+ }
+
+ @Override
+ public void reduce(final IHyracksTaskContext ctx,
+ final Iterator<byte[]> input, OutputStream output)
+ throws HyracksDataException {
+ Iterator<T> iterator = new Iterator<T>() {
+ @Override
+ public void remove() {
+ }
+
+ @Override
+ public boolean hasNext() {
+ return input.hasNext();
+ }
+
+ @Override
+ public T next() {
+ byte[] objectData = input.next();
+ if (objectData == null)
+ return null;
+ NCApplicationContext appContext = (NCApplicationContext) ctx
+ .getJobletContext().getApplicationContext();
+ try {
+ return (T) appContext.deserialize(objectData);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ return null;
+ }
+ };
+ T object = job.reduce(iterator);
+ byte[] objectData;
+ try {
+ objectData = JavaSerializationUtils.serialize(object);
+ output.write(objectData);
+ output.close();
+ } catch (IOException e) {
+ throw new HyracksDataException(e);
+ }
+ }
+
+ @Override
+ public boolean shouldTerminate(Model model) {
+ return job.shouldTerminate(model);
+ }
+
+ @Override
+ public void update(final IHyracksTaskContext ctx,
+ final Iterator<byte[]> input, Model model)
+ throws HyracksDataException {
+ Iterator<T> iterator = new Iterator<T>() {
+ @Override
+ public void remove() {
+ }
+
+ @Override
+ public boolean hasNext() {
+ return input.hasNext();
+ }
+
+ @Override
+ public T next() {
+ byte[] objectData = input.next();
+ if (objectData == null)
+ return null;
+ NCApplicationContext appContext = (NCApplicationContext) ctx
+ .getJobletContext().getApplicationContext();
+ try {
+ return (T) appContext.deserialize(objectData);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ return null;
+ }
+ };
+ job.update(iterator, model);
+ }
+ }
+
+ public JobStatus run(final IMRUJob<Model, T> job, String tempPath,
+ String app) throws Exception {
+ Model initialModel = job.initModel();
+ JobSpecification2<Model> job2 = new JobSpecification2<Model>(
+ new Job<Model, T>(job));
+ driver = new IMRUDriver<Model>(hcc, job2, initialModel, jobFactory,
+ conf, tempPath, app);
+ return driver.run();
+ }
+
/**
* @return The number of iterations performed.
*/
@@ -169,4 +530,46 @@
public Model getModel() {
return driver.getModel();
}
+
+ @SuppressWarnings("unchecked")
+ private static byte[] deserializeFromChunks(IHyracksTaskContext ctx,
+ List<ByteBuffer> chunks) throws HyracksDataException {
+ int size = chunks.get(0).getInt(0);
+ byte objectData[] = new byte[size];
+ ByteBuffer objectDataByteBuffer = ByteBuffer.wrap(objectData);
+ int remaining = size;
+ // Handle the first chunk separately, since it contains the object size.
+ int length = Math.min(chunks.get(0).array().length - BYTES_IN_INT,
+ remaining);
+ objectDataByteBuffer.put(chunks.get(0).array(), BYTES_IN_INT, length);
+ remaining -= length;
+ // Handle the remaining chunks:
+ for (int i = 1; i < chunks.size(); i++) {
+ length = Math.min(chunks.get(i).array().length, remaining);
+ objectDataByteBuffer.put(chunks.get(i).array(), 0, length);
+ remaining -= length;
+ }
+ return objectData;
+ }
+
+ private static void serializeToFrames(IHyracksTaskContext ctx,
+ IFrameWriter writer, byte[] objectData) throws HyracksDataException {
+ ByteBuffer frame = ctx.allocateFrame();
+ int position = 0;
+ frame.position(0);
+ while (position < objectData.length) {
+ int length = Math.min(objectData.length - position, ctx
+ .getFrameSize());
+ if (position == 0) {
+ // The first chunk is a special case, since it begins
+ // with an integer containing the length of the
+ // serialized object.
+ length = Math.min(ctx.getFrameSize() - BYTES_IN_INT, length);
+ frame.putInt(objectData.length);
+ }
+ frame.put(objectData, position, length);
+ FrameUtils.flushFrame(frame, writer);
+ position += length;
+ }
+ }
}
diff --git a/imru/imru-core/src/main/java/edu/uci/ics/hyracks/imru/api2/IMRUJobTmp.java b/imru/imru-core/src/main/java/edu/uci/ics/hyracks/imru/api2/IMRUJobTmp.java
new file mode 100644
index 0000000..089ca90
--- /dev/null
+++ b/imru/imru-core/src/main/java/edu/uci/ics/hyracks/imru/api2/IMRUJobTmp.java
@@ -0,0 +1,62 @@
+package edu.uci.ics.hyracks.imru.api2;
+
+import java.io.File;
+import java.io.InputStream;
+import java.io.Serializable;
+import java.nio.ByteBuffer;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+
+import edu.uci.ics.hyracks.api.client.HyracksConnection;
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.job.JobStatus;
+import edu.uci.ics.hyracks.dataflow.std.file.ITupleParserFactory;
+import edu.uci.ics.hyracks.imru.api.IModel;
+import edu.uci.ics.hyracks.imru.base.IJobFactory;
+import edu.uci.ics.hyracks.imru.deserialized.AbstractDeserializingIMRUJobSpecification;
+import edu.uci.ics.hyracks.imru.deserialized.IDeserializedMapFunction;
+import edu.uci.ics.hyracks.imru.deserialized.IDeserializedMapFunctionFactory;
+import edu.uci.ics.hyracks.imru.deserialized.IDeserializedReduceFunction;
+import edu.uci.ics.hyracks.imru.deserialized.IDeserializedReduceFunctionFactory;
+import edu.uci.ics.hyracks.imru.deserialized.IDeserializedUpdateFunction;
+import edu.uci.ics.hyracks.imru.deserialized.IDeserializedUpdateFunctionFactory;
+import edu.uci.ics.hyracks.imru.example.bgd2.LinearModel;
+import edu.uci.ics.hyracks.imru.hadoop.config.ConfigurationFactory;
+import edu.uci.ics.hyracks.imru.jobgen.clusterconfig.ClusterConfig;
+import edu.uci.ics.hyracks.imru.runtime.IMRUDriver;
+
+public interface IMRUJobTmp<Model extends IModel, T extends Serializable>
+ extends Serializable {
+ public abstract int getCachedDataFrameSize();
+
+// public abstract ITupleParserFactory getTupleParserFactory();
+
+ public abstract void parse(IHyracksTaskContext ctx,InputStream in, IFrameWriter writer) throws HyracksDataException;
+
+ public abstract boolean shouldTerminate(Model model);
+
+ public abstract void openMap(Model model, int cachedDataFrameSize)
+ throws HyracksDataException;
+
+ public abstract void map(ByteBuffer input, Model model,
+ int cachedDataFrameSize) throws HyracksDataException;
+
+ public abstract T closeMap(Model model, int cachedDataFrameSize)
+ throws HyracksDataException;
+
+ public abstract void openReduce() throws HyracksDataException;
+
+ public abstract void reduce(T input) throws HyracksDataException;
+
+ public abstract T closeReduce() throws HyracksDataException;
+
+ public abstract void openUpdate(Model model) throws HyracksDataException;
+
+ public abstract void update(T input, Model model)
+ throws HyracksDataException;
+
+ public abstract void closeUpdate(Model model) throws HyracksDataException;
+}
diff --git a/imru/imru-core/src/main/java/edu/uci/ics/hyracks/imru/dataflow/MapOperatorDescriptor.java b/imru/imru-core/src/main/java/edu/uci/ics/hyracks/imru/dataflow/MapOperatorDescriptor.java
index 0ea9eee..8980c1f 100644
--- a/imru/imru-core/src/main/java/edu/uci/ics/hyracks/imru/dataflow/MapOperatorDescriptor.java
+++ b/imru/imru-core/src/main/java/edu/uci/ics/hyracks/imru/dataflow/MapOperatorDescriptor.java
@@ -3,6 +3,7 @@
import java.io.IOException;
import java.io.ObjectInputStream;
import java.nio.ByteBuffer;
+import java.util.Iterator;
import java.util.logging.Logger;
import org.apache.hadoop.conf.Configuration;
@@ -25,6 +26,8 @@
import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNodePushable;
import edu.uci.ics.hyracks.imru.api.IIMRUJobSpecification;
import edu.uci.ics.hyracks.imru.api.IMapFunction;
+import edu.uci.ics.hyracks.imru.api.IMapFunction2;
+import edu.uci.ics.hyracks.imru.api.IMapFunctionFactory;
import edu.uci.ics.hyracks.imru.api.IModel;
import edu.uci.ics.hyracks.imru.base.IConfigurationFactory;
import edu.uci.ics.hyracks.imru.data.ChunkFrameHelper;
@@ -160,20 +163,55 @@
RunFileWriter runFileWriter = state.getRunFileWriter();
Log.info("Cached example file size is " + runFileWriter.getFileSize() + " bytes");
- RunFileReader reader = new RunFileReader(runFileWriter.getFileReference(), ctx.getIOManager(),
+ final RunFileReader reader = new RunFileReader(runFileWriter.getFileReference(), ctx.getIOManager(),
runFileWriter.getFileSize());
//readInReverse
reader.open();
- ByteBuffer inputFrame = fileCtx.allocateFrame();
+ final ByteBuffer inputFrame = fileCtx.allocateFrame();
ChunkFrameHelper chunkFrameHelper = new ChunkFrameHelper(ctx);
- IMapFunction mapFunction = imruSpec.getMapFunctionFactory().createMapFunction(chunkFrameHelper.getContext(), imruSpec.getCachedDataFrameSize(), model);
- writer = chunkFrameHelper.wrapWriter(writer, partition);
- mapFunction.open();
- mapFunction.setFrameWriter(writer);
- while (reader.nextFrame(inputFrame)) {
- mapFunction.map(inputFrame);
+ IMapFunctionFactory<Model> factory=imruSpec.getMapFunctionFactory();
+ if (factory.useAPI2()) {
+ Iterator<ByteBuffer> input=new Iterator<ByteBuffer>() {
+ boolean read=false;
+ boolean hasData;
+ @Override
+ public void remove() {
+ }
+
+ @Override
+ public ByteBuffer next() {
+ if (!hasNext())
+ return null;
+ read=false;
+ return inputFrame;
+ }
+
+ @Override
+ public boolean hasNext() {
+ try {
+ if (!read) {
+ hasData=reader.nextFrame(inputFrame);
+ read=true;
+ }
+ } catch (HyracksDataException e) {
+ e.printStackTrace();
+ }
+ return hasData;
+ }
+ };
+ writer = chunkFrameHelper.wrapWriter(writer, partition);
+ IMapFunction2 mapFunction = factory.createMapFunction2(chunkFrameHelper.getContext(), imruSpec.getCachedDataFrameSize(), model);
+ mapFunction.map(input, writer);
+ } else {
+ IMapFunction mapFunction = factory.createMapFunction(chunkFrameHelper.getContext(), imruSpec.getCachedDataFrameSize(), model);
+ writer = chunkFrameHelper.wrapWriter(writer, partition);
+ mapFunction.open();
+ mapFunction.setFrameWriter(writer);
+ while (reader.nextFrame(inputFrame)) {
+ mapFunction.map(inputFrame);
+ }
+ mapFunction.close();
}
- mapFunction.close();
writer.close();
}
diff --git a/imru/imru-core/src/main/java/edu/uci/ics/hyracks/imru/dataflow/ReduceOperatorDescriptor.java b/imru/imru-core/src/main/java/edu/uci/ics/hyracks/imru/dataflow/ReduceOperatorDescriptor.java
index e83b59f..2f52abb 100644
--- a/imru/imru-core/src/main/java/edu/uci/ics/hyracks/imru/dataflow/ReduceOperatorDescriptor.java
+++ b/imru/imru-core/src/main/java/edu/uci/ics/hyracks/imru/dataflow/ReduceOperatorDescriptor.java
@@ -66,6 +66,7 @@
reduceFunction = imruSpec.getReduceFunctionFactory().createReduceFunction(chunkFrameHelper.getContext());
writer = chunkFrameHelper.wrapWriter(writer, partition);
reduceFunction.setFrameWriter(writer);
+ reduceFunction.open();
}
@Override
diff --git a/imru/imru-core/src/main/java/edu/uci/ics/hyracks/imru/deserialized/AbstractDeserializingIMRUJobSpecification.java b/imru/imru-core/src/main/java/edu/uci/ics/hyracks/imru/deserialized/AbstractDeserializingIMRUJobSpecification.java
index 227c69d..3451ef1 100644
--- a/imru/imru-core/src/main/java/edu/uci/ics/hyracks/imru/deserialized/AbstractDeserializingIMRUJobSpecification.java
+++ b/imru/imru-core/src/main/java/edu/uci/ics/hyracks/imru/deserialized/AbstractDeserializingIMRUJobSpecification.java
@@ -3,6 +3,7 @@
import java.io.IOException;
import java.io.Serializable;
import java.nio.ByteBuffer;
+import java.util.Iterator;
import java.util.List;
import edu.uci.ics.hyracks.api.comm.IFrameWriter;
@@ -13,6 +14,7 @@
import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
import edu.uci.ics.hyracks.imru.api.IIMRUJobSpecification;
import edu.uci.ics.hyracks.imru.api.IMapFunction;
+import edu.uci.ics.hyracks.imru.api.IMapFunction2;
import edu.uci.ics.hyracks.imru.api.IMapFunctionFactory;
import edu.uci.ics.hyracks.imru.api.IModel;
import edu.uci.ics.hyracks.imru.api.IReassemblingReduceFunction;
@@ -45,12 +47,21 @@
@Override
public final IMapFunctionFactory<Model> getMapFunctionFactory() {
return new IMapFunctionFactory<Model>() {
+ @Override
+ public boolean useAPI2() {
+ return false;
+ }
+
+ public IMapFunction2 createMapFunction2(IHyracksTaskContext ctx,
+ int cachedDataFrameSize, Model model) {
+ return null;
+ };
@Override
public IMapFunction createMapFunction(final IHyracksTaskContext ctx, final int cachedDataFrameSize,
final Model model) {
return new IMapFunction() {
-
+
private IFrameWriter writer;
private IDeserializedMapFunction<T> mapFunction = getDeserializedMapFunctionFactory()
.createMapFunction(model, cachedDataFrameSize);
diff --git a/imru/imru-core/src/test/java/edu/uci/ics/hyracks/imru/test/ImruTest.java b/imru/imru-core/src/test/java/edu/uci/ics/hyracks/imru/test/ImruTest.java
index 4647812..307cb29 100644
--- a/imru/imru-core/src/test/java/edu/uci/ics/hyracks/imru/test/ImruTest.java
+++ b/imru/imru-core/src/test/java/edu/uci/ics/hyracks/imru/test/ImruTest.java
@@ -2,6 +2,8 @@
import java.io.File;
import java.util.EnumSet;
+import java.util.logging.Handler;
+import java.util.logging.Logger;
import edu.uci.ics.hyracks.api.client.HyracksConnection;
import edu.uci.ics.hyracks.api.client.IHyracksClientConnection;
@@ -30,7 +32,7 @@
private static NodeControllerService nc2;
private static IHyracksClientConnection hcc;
- public static void init() throws Exception {
+ public static void startControllers() throws Exception {
CCConfig ccConfig = new CCConfig();
ccConfig.clientNetIpAddress = CC_HOST;
ccConfig.clusterNetIpAddress = CC_HOST;
@@ -71,6 +73,13 @@
ClusterConfig.loadClusterConfig(CC_HOST, TEST_HYRACKS_CC_CLIENT_PORT);
}
+ public static void disableLogging() throws Exception {
+ Logger globalLogger = Logger.getLogger("");
+ Handler[] handlers = globalLogger.getHandlers();
+ for (Handler handler : handlers)
+ globalLogger.removeHandler(handler);
+ }
+
public static void destroyApp(String hyracksAppName) throws Exception {
hcc.destroyApplication(hyracksAppName);
}
diff --git a/imru/imru-example/src/main/java/edu/uci/ics/hyracks/imru/example/bgd/BGDMain.java b/imru/imru-example/src/main/java/edu/uci/ics/hyracks/imru/example/bgd/BGDMain.java
index 3e94392..70e0d98 100644
--- a/imru/imru-example/src/main/java/edu/uci/ics/hyracks/imru/example/bgd/BGDMain.java
+++ b/imru/imru-example/src/main/java/edu/uci/ics/hyracks/imru/example/bgd/BGDMain.java
@@ -85,7 +85,7 @@
+ " -cluster-conf imru/imru-core/src/main/resources/conf/cluster.conf"//
+ " -example-paths /input/data.txt")
.split(" ");
- ImruTest.init();
+ ImruTest.startControllers();
ImruTest.createApp("bgd", new File(
"imru/imru-example/src/main/resources/bgd.zip"));
}
diff --git a/imru/imru-example/src/main/java/edu/uci/ics/hyracks/imru/example/bgd/job/BGDMapFunctionFactory.java b/imru/imru-example/src/main/java/edu/uci/ics/hyracks/imru/example/bgd/job/BGDMapFunctionFactory.java
index defd0b9..9f97d4a 100644
--- a/imru/imru-example/src/main/java/edu/uci/ics/hyracks/imru/example/bgd/job/BGDMapFunctionFactory.java
+++ b/imru/imru-example/src/main/java/edu/uci/ics/hyracks/imru/example/bgd/job/BGDMapFunctionFactory.java
@@ -13,6 +13,7 @@
import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
import edu.uci.ics.hyracks.dataflow.common.data.marshalling.FloatSerializerDeserializer;
import edu.uci.ics.hyracks.imru.api.IMapFunction;
+import edu.uci.ics.hyracks.imru.api.IMapFunction2;
import edu.uci.ics.hyracks.imru.api.IMapFunctionFactory;
import edu.uci.ics.hyracks.imru.example.bgd.data.FragmentableFloatArray;
import edu.uci.ics.hyracks.imru.example.bgd.data.LinearExample;
@@ -20,7 +21,15 @@
import edu.uci.ics.hyracks.imru.example.bgd.data.RecordDescriptorUtils;
public class BGDMapFunctionFactory implements IMapFunctionFactory<LinearModel> {
-
+ @Override
+ public IMapFunction2 createMapFunction2(IHyracksTaskContext ctx,
+ int cachedDataFrameSize, LinearModel model) {
+ return null;
+ }
+ @Override
+ public boolean useAPI2() {
+ return false;
+ }
@Override
public IMapFunction createMapFunction(final IHyracksTaskContext ctx, final int cachedDataFrameSize, final LinearModel model) {
return new IMapFunction() {
diff --git a/imru/imru-example/src/main/java/edu/uci/ics/hyracks/imru/example/bgd2/BGDMain2.java b/imru/imru-example/src/main/java/edu/uci/ics/hyracks/imru/example/bgd2/BGDMain2.java
index 80c5c67..cdb2a3a 100644
--- a/imru/imru-example/src/main/java/edu/uci/ics/hyracks/imru/example/bgd2/BGDMain2.java
+++ b/imru/imru-example/src/main/java/edu/uci/ics/hyracks/imru/example/bgd2/BGDMain2.java
@@ -1,12 +1,19 @@
package edu.uci.ics.hyracks.imru.example.bgd2;
+import java.io.BufferedReader;
+import java.io.DataOutput;
import java.io.File;
import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.nio.ByteBuffer;
+import java.util.Scanner;
import java.util.logging.Handler;
import java.util.logging.LogManager;
import java.util.logging.Logger;
+import java.util.regex.Pattern;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
@@ -15,11 +22,16 @@
import org.kohsuke.args4j.Option;
import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.api.job.JobStatus;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
import edu.uci.ics.hyracks.dataflow.std.file.ITupleParserFactory;
-import edu.uci.ics.hyracks.imru.api2.IMRUJob2;
+import edu.uci.ics.hyracks.imru.api2.IMRUJobTmp;
import edu.uci.ics.hyracks.imru.api2.IMRUJobControl;
import edu.uci.ics.hyracks.imru.base.IJobFactory;
import edu.uci.ics.hyracks.imru.example.bgd.R;
@@ -34,7 +46,7 @@
*
* @author Josh Rosen
*/
-public class BGDMain2 implements IMRUJob2<LinearModel, LossGradient> {
+public class BGDMain2 implements IMRUJobTmp<LinearModel, LossGradient> {
private final int numFeatures;
public BGDMain2(int numFeatures) {
@@ -50,7 +62,7 @@
@Override
public void openMap(LinearModel model, int cachedDataFrameSize)
throws HyracksDataException {
- R.p("openMap");
+ R.p("openMap " + this);
lossGradientMap = new LossGradient();
lossGradientMap.loss = 0.0f;
lossGradientMap.gradient = new float[model.numFeatures];
@@ -64,7 +76,7 @@
throws HyracksDataException {
accessor.reset(input);
int tupleCount = accessor.getTupleCount();
- R.p("map "+tupleCount);
+ R.p("map " + tupleCount + " " + this);
for (int i = 0; i < tupleCount; i++) {
example.reset(accessor, i);
float innerProduct = example.dot(model.weights);
@@ -79,18 +91,19 @@
@Override
public LossGradient closeMap(LinearModel model, int cachedDataFrameSize)
throws HyracksDataException {
- R.p("closeMap");
+ R.p("closeMap " + this);
return lossGradientMap;
}
@Override
public void openReduce() throws HyracksDataException {
- R.p("openReduce");
+ R.p("openReduce " + this);
+ new Error().printStackTrace();
}
@Override
public void reduce(LossGradient input) throws HyracksDataException {
- R.p("reduce");
+ R.p("reduce " + this);
if (lossGradientReduce == null) {
lossGradientReduce = input;
} else {
@@ -103,19 +116,19 @@
@Override
public LossGradient closeReduce() throws HyracksDataException {
- R.p("closeReduce");
+ R.p("closeReduce " + this);
return lossGradientReduce;
}
@Override
public void openUpdate(LinearModel model) throws HyracksDataException {
- R.p("openUpdate");
+ R.p("openUpdate " + this);
}
@Override
public void update(LossGradient input, LinearModel model)
throws HyracksDataException {
- R.p("update");
+ R.p("update " + this);
if (lossGradientUpdate == null) {
lossGradientUpdate = input;
} else {
@@ -128,7 +141,7 @@
@Override
public void closeUpdate(LinearModel model) throws HyracksDataException {
- R.p("closeUpdate");
+ R.p("closeUpdate " + this);
// Update loss
model.loss = lossGradientUpdate.loss;
model.loss += model.regularizationConstant * norm(model.weights.array);
@@ -155,7 +168,7 @@
@Override
public int getCachedDataFrameSize() {
- return 1024 * 1024;
+ return 4 * 1024;
}
@Override
@@ -164,8 +177,88 @@
}
@Override
- public ITupleParserFactory getTupleParserFactory() {
- return new LibsvmExampleTupleParserFactory(numFeatures);
+ public void parse(IHyracksTaskContext ctx, InputStream in,
+ IFrameWriter writer) throws HyracksDataException {
+ ByteBuffer frame = ctx.allocateFrame();
+ FrameTupleAppender appender = new FrameTupleAppender(ctx.getFrameSize());
+ appender.reset(frame, true);
+ ArrayTupleBuilder tb = new ArrayTupleBuilder(2);
+ DataOutput dos = tb.getDataOutput();
+ BufferedReader reader = new BufferedReader(new InputStreamReader(in));
+ int activeFeatures = 0;
+ try {
+ Pattern whitespacePattern = Pattern.compile("\\s+");
+ Pattern labelFeaturePattern = Pattern.compile("[:=]");
+ String line;
+ boolean firstLine = true;
+ while (true) {
+ tb.reset();
+ if (firstLine) {
+ long start = System.currentTimeMillis();
+ line = reader.readLine();
+ long end = System.currentTimeMillis();
+ // LOG.info("First call to reader.readLine() took " + (end -
+ // start) + " milliseconds");
+ firstLine = false;
+ } else {
+ line = reader.readLine();
+ }
+ if (line == null) {
+ break;
+ }
+ String[] comps = whitespacePattern.split(line, 2);
+
+ // Label
+ // Ignore leading plus sign
+ if (comps[0].charAt(0) == '+') {
+ comps[0] = comps[0].substring(1);
+ }
+
+ int label = Integer.parseInt(comps[0]);
+ dos.writeInt(label);
+ tb.addFieldEndOffset();
+ Scanner scan = new Scanner(comps[1]);
+ scan.useDelimiter(",|\\s+");
+ while (scan.hasNext()) {
+ String[] parts = labelFeaturePattern.split(scan.next());
+ int index = Integer.parseInt(parts[0]);
+ if (index > numFeatures) {
+ throw new IndexOutOfBoundsException("Feature index "
+ + index
+ + " exceed the declared number of features ("
+ + numFeatures + ")");
+ }
+ // Ignore leading plus sign.
+ if (parts[1].charAt(0) == '+') {
+ parts[1] = parts[1].substring(1);
+ }
+ float value = Float.parseFloat(parts[1]);
+ dos.writeInt(index);
+ dos.writeFloat(value);
+ activeFeatures++;
+ }
+ dos.writeInt(-1); // Marks the end of the sparse array.
+ tb.addFieldEndOffset();
+ if (!appender.append(tb.getFieldEndOffsets(),
+ tb.getByteArray(), 0, tb.getSize())) {
+ FrameUtils.flushFrame(frame, writer);
+ appender.reset(frame, true);
+ if (!appender.append(tb.getFieldEndOffsets(), tb
+ .getByteArray(), 0, tb.getSize())) {
+ // LOG.severe("Example too large to fit in frame: " +
+ // line);
+ throw new IllegalStateException();
+ }
+ }
+ }
+ if (appender.getTupleCount() > 0) {
+ FrameUtils.flushFrame(frame, writer);
+ }
+ } catch (IOException e) {
+ throw new HyracksDataException(e);
+ }
+ // LOG.info("Parsed input partition containing " + activeFeatures +
+ // " active features");
}
private static class Options {
@@ -209,11 +302,6 @@
public static void main(String[] args) throws Exception {
try {
- Logger globalLogger = Logger.getLogger("");
- Handler[] handlers = globalLogger.getHandlers();
- for (Handler handler : handlers) {
- globalLogger.removeHandler(handler);
- }
if (args.length == 0) {
args = ("-host localhost"//
+ " -app bgd"//
@@ -226,9 +314,10 @@
+ " -model-file /tmp/__imru.txt"//
+ " -cluster-conf imru/imru-core/src/main/resources/conf/cluster.conf"//
+ " -example-paths /input/data.txt").split(" ");
- ImruTest.init();
+ ImruTest.startControllers();
ImruTest.createApp("bgd", new File(
- "imru/imru-example/src/main/resources/bgd.zip"));
+ "imru/imru-example/src/main/resources/bootstrap.zip"));
+ ImruTest.disableLogging();
}
Options options = new Options();
CmdLineParser parser = new CmdLineParser(options);
@@ -241,31 +330,19 @@
// copy input files to HDFS
FileSystem dfs = FileSystem.get(control.conf);
- for (FileStatus f : dfs.listStatus(new Path("/tmp")))
- dfs.delete(f.getPath());
+ if (dfs.listStatus(new Path("/tmp")) != null)
+ for (FileStatus f : dfs.listStatus(new Path("/tmp")))
+ dfs.delete(f.getPath());
dfs.copyFromLocalFile(new Path("/data/imru/test/data.txt"),
new Path("/input/data.txt"));
- IJobFactory jobFactory;
-
if (options.aggTreeType.equals("none")) {
- jobFactory = new NoAggregationIMRUJobFactory(
- options.examplePaths, control.confFactory);
+ control.selectNoAggregation(options.examplePaths);
} else if (options.aggTreeType.equals("generic")) {
- if (options.aggCount < 1) {
- throw new IllegalArgumentException(
- "Must specify a nonnegative aggregator count using the -agg-count option");
- }
- jobFactory = new GenericAggregationIMRUJobFactory(
- options.examplePaths, control.confFactory,
+ control.selectGenericAggregation(options.examplePaths,
options.aggCount);
} else if (options.aggTreeType.equals("nary")) {
- if (options.fanIn < 1) {
- throw new IllegalArgumentException(
- "Must specify nonnegative -fan-in");
- }
- jobFactory = new NAryAggregationIMRUJobFactory(
- options.examplePaths, control.confFactory,
+ control.selectNAryAggregation(options.examplePaths,
options.fanIn);
} else {
throw new IllegalArgumentException(
@@ -274,8 +351,8 @@
LinearModel initalModel = new LinearModel(8000, options.numRounds);
- JobStatus status = control.run(job, initalModel, jobFactory,
- options.tempPath, options.app);
+ JobStatus status = control.run(job, initalModel, options.tempPath,
+ options.app);
if (status == JobStatus.FAILURE) {
System.err.println("Job failed; see CC and NC logs");
System.exit(-1);
diff --git a/imru/imru-example/src/main/java/edu/uci/ics/hyracks/imru/example/bgd2/LibsvmExampleTupleParserFactory.java b/imru/imru-example/src/main/java/edu/uci/ics/hyracks/imru/example/bgd2/LibsvmExampleTupleParserFactory.java
deleted file mode 100644
index 66b8b69..0000000
--- a/imru/imru-example/src/main/java/edu/uci/ics/hyracks/imru/example/bgd2/LibsvmExampleTupleParserFactory.java
+++ /dev/null
@@ -1,113 +0,0 @@
-package edu.uci.ics.hyracks.imru.example.bgd2;
-
-import java.io.BufferedReader;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.InputStreamReader;
-import java.nio.ByteBuffer;
-import java.util.Scanner;
-import java.util.logging.Logger;
-import java.util.regex.Pattern;
-
-import edu.uci.ics.hyracks.api.comm.IFrameWriter;
-import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
-import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
-import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
-import edu.uci.ics.hyracks.dataflow.std.file.ITupleParser;
-import edu.uci.ics.hyracks.dataflow.std.file.ITupleParserFactory;
-
-public class LibsvmExampleTupleParserFactory implements ITupleParserFactory {
- private static final long serialVersionUID = 1L;
- final int featureLength;
- private static Logger LOG = Logger.getLogger(LibsvmExampleTupleParserFactory.class.getName());
-
- public LibsvmExampleTupleParserFactory(int featureLength) {
- this.featureLength = featureLength;
- }
-
- @Override
- public ITupleParser createTupleParser(final IHyracksTaskContext ctx) {
- return new ITupleParser() {
-
- @Override
- public void parse(InputStream in, IFrameWriter writer) throws HyracksDataException {
- ByteBuffer frame = ctx.allocateFrame();
- FrameTupleAppender appender = new FrameTupleAppender(ctx.getFrameSize());
- appender.reset(frame, true);
- ArrayTupleBuilder tb = new ArrayTupleBuilder(2);
- DataOutput dos = tb.getDataOutput();
- BufferedReader reader = new BufferedReader(new InputStreamReader(in));
- int activeFeatures = 0;
- try {
- Pattern whitespacePattern = Pattern.compile("\\s+");
- Pattern labelFeaturePattern = Pattern.compile("[:=]");
- String line;
- boolean firstLine = true;
- while (true) {
- tb.reset();
- if (firstLine) {
- long start = System.currentTimeMillis();
- line = reader.readLine();
- long end = System.currentTimeMillis();
- LOG.info("First call to reader.readLine() took " + (end - start) + " milliseconds");
- firstLine = false;
- } else {
- line = reader.readLine();
- }
- if (line == null) {
- break;
- }
- String[] comps = whitespacePattern.split(line, 2);
-
- // Label
- // Ignore leading plus sign
- if (comps[0].charAt(0) == '+') {
- comps[0] = comps[0].substring(1);
- }
-
- int label = Integer.parseInt(comps[0]);
- dos.writeInt(label);
- tb.addFieldEndOffset();
- Scanner scan = new Scanner(comps[1]);
- scan.useDelimiter(",|\\s+");
- while (scan.hasNext()) {
- String[] parts = labelFeaturePattern.split(scan.next());
- int index = Integer.parseInt(parts[0]);
- if (index > featureLength) {
- throw new IndexOutOfBoundsException("Feature index " + index
- + " exceed the declared number of features (" + featureLength + ")");
- }
- // Ignore leading plus sign.
- if (parts[1].charAt(0) == '+') {
- parts[1] = parts[1].substring(1);
- }
- float value = Float.parseFloat(parts[1]);
- dos.writeInt(index);
- dos.writeFloat(value);
- activeFeatures++;
- }
- dos.writeInt(-1); // Marks the end of the sparse array.
- tb.addFieldEndOffset();
- if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
- FrameUtils.flushFrame(frame, writer);
- appender.reset(frame, true);
- if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
- LOG.severe("Example too large to fit in frame: " + line);
- throw new IllegalStateException();
- }
- }
- }
- if (appender.getTupleCount() > 0) {
- FrameUtils.flushFrame(frame, writer);
- }
- } catch (IOException e) {
- throw new HyracksDataException(e);
- }
- LOG.info("Parsed input partition containing " + activeFeatures + " active features");
- }
- };
- }
-}
\ No newline at end of file
diff --git a/imru/imru-example/src/main/java/edu/uci/ics/hyracks/imru/example/helloworld/HelloWorldIncrementalResult.java b/imru/imru-example/src/main/java/edu/uci/ics/hyracks/imru/example/helloworld/HelloWorldIncrementalResult.java
new file mode 100644
index 0000000..4472f11
--- /dev/null
+++ b/imru/imru-example/src/main/java/edu/uci/ics/hyracks/imru/example/helloworld/HelloWorldIncrementalResult.java
@@ -0,0 +1,7 @@
+package edu.uci.ics.hyracks.imru.example.helloworld;
+
+import java.io.Serializable;
+
+public class HelloWorldIncrementalResult implements Serializable {
+ public int length;
+}
diff --git a/imru/imru-example/src/main/java/edu/uci/ics/hyracks/imru/example/helloworld/HelloWorldModel.java b/imru/imru-example/src/main/java/edu/uci/ics/hyracks/imru/example/helloworld/HelloWorldModel.java
new file mode 100644
index 0000000..3728caa
--- /dev/null
+++ b/imru/imru-example/src/main/java/edu/uci/ics/hyracks/imru/example/helloworld/HelloWorldModel.java
@@ -0,0 +1,8 @@
+package edu.uci.ics.hyracks.imru.example.helloworld;
+
+import edu.uci.ics.hyracks.imru.api.IModel;
+
+public class HelloWorldModel implements IModel {
+ public int totalLength;
+ public int roundsRemaining=5;
+}
diff --git a/imru/imru-example/src/main/java/edu/uci/ics/hyracks/imru/example/helloworld/IMRUHelloWorld.java b/imru/imru-example/src/main/java/edu/uci/ics/hyracks/imru/example/helloworld/IMRUHelloWorld.java
new file mode 100644
index 0000000..e48c43c
--- /dev/null
+++ b/imru/imru-example/src/main/java/edu/uci/ics/hyracks/imru/example/helloworld/IMRUHelloWorld.java
@@ -0,0 +1,229 @@
+package edu.uci.ics.hyracks.imru.example.helloworld;
+
+import java.io.BufferedReader;
+import java.io.DataInputStream;
+import java.io.DataOutput;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.nio.ByteBuffer;
+import java.util.Iterator;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.job.JobStatus;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import edu.uci.ics.hyracks.dataflow.common.comm.util.ByteBufferInputStream;
+import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
+import edu.uci.ics.hyracks.imru.api2.IMRUJob;
+import edu.uci.ics.hyracks.imru.api2.IMRUJobTmp;
+import edu.uci.ics.hyracks.imru.api2.IMRUJobControl;
+import edu.uci.ics.hyracks.imru.example.bgd.R;
+import edu.uci.ics.hyracks.imru.test.ImruTest;
+
+public class IMRUHelloWorld extends
+ IMRUJob<HelloWorldModel, HelloWorldIncrementalResult> {
+ static AtomicInteger nextId = new AtomicInteger();
+ int id;
+
+ public IMRUHelloWorld() {
+ this.id = nextId.getAndIncrement();
+ }
+
+ @Override
+ public HelloWorldModel initModel() {
+ return new HelloWorldModel();
+ }
+
+ @Override
+ public int getCachedDataFrameSize() {
+ return 256;
+ }
+
+ /**
+ * Parse input data and create frames
+ */
+ @Override
+ public void parse(IHyracksTaskContext ctx, InputStream in,
+ IFrameWriter writer) throws HyracksDataException {
+ ByteBuffer frame = ctx.allocateFrame();
+ FrameTupleAppender appender = new FrameTupleAppender(ctx.getFrameSize());
+ appender.reset(frame, true);
+ ArrayTupleBuilder tb = new ArrayTupleBuilder(2);
+ DataOutput dos = tb.getDataOutput();
+ try {
+ BufferedReader reader = new BufferedReader(
+ new InputStreamReader(in));
+ String line = reader.readLine();
+ reader.close();
+ for (String s : line.split(" ")) {
+ // create a new frame
+ appender.reset(frame, true);
+
+ tb.reset();
+ // add one field
+ dos.writeUTF(s);
+ tb.addFieldEndOffset();
+ // add another field
+ dos.writeUTF(s + "_copy");
+ tb.addFieldEndOffset();
+ if (!appender.append(tb.getFieldEndOffsets(),
+ tb.getByteArray(), 0, tb.getSize())) {
+ // if frame can't hold this tuple
+ throw new IllegalStateException(
+ "Example too large to fit in frame: " + line);
+ }
+ FrameUtils.flushFrame(frame, writer);
+ }
+ } catch (IOException e) {
+ throw new HyracksDataException(e);
+ }
+ }
+
+ @Override
+ public HelloWorldIncrementalResult map(Iterator<ByteBuffer> input,
+ HelloWorldModel model, int cachedDataFrameSize)
+ throws HyracksDataException {
+ HelloWorldIncrementalResult mapResult;
+ IFrameTupleAccessor accessor;
+ R.p("openMap" + id+" model="+ model.totalLength);
+ mapResult = new HelloWorldIncrementalResult();
+ accessor = new FrameTupleAccessor(cachedDataFrameSize,
+ new RecordDescriptor(new ISerializerDeserializer[2]));
+ while (input.hasNext()) {
+ ByteBuffer buf = input.next();
+ try {
+ accessor.reset(buf);
+ int tupleCount = accessor.getTupleCount();
+ ByteBufferInputStream bbis = new ByteBufferInputStream();
+ DataInputStream di = new DataInputStream(bbis);
+ for (int i = 0; i < tupleCount; i++) {
+ int fieldId = 0;
+ int startOffset = accessor.getFieldSlotsLength()
+ + accessor.getTupleStartOffset(i)
+ + accessor.getFieldStartOffset(i, fieldId);
+ bbis.setByteBuffer(accessor.getBuffer(), startOffset);
+ String word = di.readUTF();
+ R.p("map%d read frame: %s", id, word);
+ mapResult.length += word.length();
+ }
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+ R.p("closeMap" + id + " output=" + mapResult.length);
+ return mapResult;
+ }
+
+ @Override
+ public HelloWorldIncrementalResult reduce(
+ Iterator<HelloWorldIncrementalResult> input)
+ throws HyracksDataException {
+ HelloWorldIncrementalResult reduceResult;
+ R.p("openReduce" + id);
+ reduceResult = new HelloWorldIncrementalResult();
+ while (input.hasNext()) {
+ HelloWorldIncrementalResult result = input.next();
+ R.p("reduce" + id + " input=" + result.length);
+ reduceResult.length += result.length;
+ }
+ R.p("closeReduce" + id + " output=" + reduceResult.length);
+ return reduceResult;
+ }
+
+ @Override
+ public void update(Iterator<HelloWorldIncrementalResult> input,
+ HelloWorldModel model) throws HyracksDataException {
+ R.p("openUpdate" + id + " input=" + model.totalLength);
+ while (input.hasNext()) {
+ HelloWorldIncrementalResult result = input.next();
+ R.p("update" + id);
+ model.totalLength += result.length;
+ }
+ R.p("closeUpdate" + id + " output=" + model.totalLength);
+ model.roundsRemaining--;
+ }
+
+ /**
+ * Return true to exit loop
+ */
+ @Override
+ public boolean shouldTerminate(HelloWorldModel model) {
+ return model.roundsRemaining == 0;
+ }
+
+ public static void main(String[] args) throws Exception {
+ try {
+ boolean debugging = true; // start everything in one process
+ ImruTest.disableLogging();
+ String host = "localhost";
+ int port = 3099;
+ String app = "imru_helloworld";
+
+ // hadoop 0.20.2 need to be started
+ String hadoopConfPath = "/data/imru/hadoop-0.20.2/conf";
+
+ // directory in hadoop HDFS which contains intermediate models
+ String tempPath = "/helloworld";
+
+ // config files which contains node names
+ String clusterConfPath = "imru/imru-core/src/main/resources/conf/cluster.conf";
+
+ // directory in hadoop HDFS which contains input data
+ String examplePaths = "/helloworld/input.txt";
+ if (debugging) {
+ ImruTest.startControllers();
+ // Minimum config file to invoke
+ // edu.uci.ics.hyracks.imru.runtime.bootstrap.IMRUNCBootstrapImpl
+ ImruTest.createApp(app, new File(
+ "imru/imru-example/src/main/resources/bootstrap.zip"));
+ }
+ IMRUHelloWorld job = new IMRUHelloWorld();
+ IMRUJobControl<HelloWorldModel, HelloWorldIncrementalResult> control = new IMRUJobControl<HelloWorldModel, HelloWorldIncrementalResult>();
+ control.connect(host, port, hadoopConfPath, clusterConfPath);
+
+ // remove old intermediate models
+ FileSystem dfs = FileSystem.get(control.conf);
+ if (dfs.listStatus(new Path(tempPath)) != null)
+ for (FileStatus f : dfs.listStatus(new Path(tempPath)))
+ dfs.delete(f.getPath());
+
+ // create input file
+ FSDataOutputStream out = dfs.create(new Path(examplePaths), true);
+ out.write("hello world".getBytes());
+ out.close();
+
+ // set aggregation type
+ // control.selectNAryAggregation(examplePaths, 2);
+ control.selectGenericAggregation(examplePaths, 1);
+
+ JobStatus status = control.run(job, tempPath, app);
+ if (status == JobStatus.FAILURE) {
+ System.err.println("Job failed; see CC and NC logs");
+ System.exit(-1);
+ }
+ int iterationCount = control.getIterationCount();
+ HelloWorldModel finalModel = control.getModel();
+ System.out.println("Terminated after " + iterationCount
+ + " iterations");
+ R.p("FinalModel: " + finalModel.totalLength);
+ } catch (Throwable e) {
+ e.printStackTrace();
+ }
+ System.exit(0);
+ }
+
+}
diff --git a/imru/imru-example/src/main/resources/bgd.zip b/imru/imru-example/src/main/resources/bootstrap.zip
similarity index 100%
rename from imru/imru-example/src/main/resources/bgd.zip
rename to imru/imru-example/src/main/resources/bootstrap.zip
Binary files differ