Added support for central queue
git-svn-id: https://hyracks.googlecode.com/svn/branches/hyracks_online_aggregation@219 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-examples/onlineaggregation-example/onlineaggregationclient/src/main/java/edu/uci/ics/hyracks/examples/onlineaggregation/client/MapReduceMain.java b/hyracks-examples/onlineaggregation-example/onlineaggregationclient/src/main/java/edu/uci/ics/hyracks/examples/onlineaggregation/client/MapReduceMain.java
index 5049f0e..ba0c927 100644
--- a/hyracks-examples/onlineaggregation-example/onlineaggregationclient/src/main/java/edu/uci/ics/hyracks/examples/onlineaggregation/client/MapReduceMain.java
+++ b/hyracks-examples/onlineaggregation-example/onlineaggregationclient/src/main/java/edu/uci/ics/hyracks/examples/onlineaggregation/client/MapReduceMain.java
@@ -81,7 +81,7 @@
int jobId = (int) System.currentTimeMillis();
MapperOperatorDescriptor<Writable, Writable, Writable, Writable> mapper = new MapperOperatorDescriptor<Writable, Writable, Writable, Writable>(
- spec, jobId, mConfig, new OnlineInputSplitProviderFactory(mConfig));
+ spec, jobId, mConfig, new OnlineInputSplitProviderFactory());
mapper.setPartitionConstraint(new PartitionCountConstraint(options.numMaps));
ReducerOperatorDescriptor<Writable, Writable, Writable, Writable> reducer = new ReducerOperatorDescriptor<Writable, Writable, Writable, Writable>(
diff --git a/hyracks-examples/onlineaggregation-example/onlineaggregationhelper/src/main/java/edu/uci/ics/hyracks/examples/onlineaggregation/CCBootstrap.java b/hyracks-examples/onlineaggregation-example/onlineaggregationhelper/src/main/java/edu/uci/ics/hyracks/examples/onlineaggregation/CCBootstrap.java
index cdfe429..52b9edd 100644
--- a/hyracks-examples/onlineaggregation-example/onlineaggregationhelper/src/main/java/edu/uci/ics/hyracks/examples/onlineaggregation/CCBootstrap.java
+++ b/hyracks-examples/onlineaggregation-example/onlineaggregationhelper/src/main/java/edu/uci/ics/hyracks/examples/onlineaggregation/CCBootstrap.java
@@ -1,26 +1,45 @@
package edu.uci.ics.hyracks.examples.onlineaggregation;
-import edu.uci.ics.hyracks.api.application.IApplicationContext;
-import edu.uci.ics.hyracks.api.application.IBootstrap;
+import java.util.List;
+import java.util.UUID;
-public class CCBootstrap implements IBootstrap {
+import edu.uci.ics.hyracks.api.application.ICCApplicationContext;
+import edu.uci.ics.hyracks.api.application.ICCBootstrap;
+import edu.uci.ics.hyracks.api.job.IJobLifecycleListener;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
- @Override
- public void start() throws Exception {
- // TODO Auto-generated method stub
+public class CCBootstrap implements ICCBootstrap {
+ private ICCApplicationContext appCtx;
- }
+ @Override
+ public void start() throws Exception {
+ final CentralInputSplitQueue queue = new CentralInputSplitQueue();
+ appCtx.setDistributedState(queue);
+ appCtx.addJobLifecycleListener(new IJobLifecycleListener() {
+ @Override
+ public void notifyJobStart(UUID jobId) {
+ }
- @Override
- public void stop() throws Exception {
- // TODO Auto-generated method stub
+ @Override
+ public void notifyJobFinish(UUID jobId) {
+ }
- }
+ @Override
+ public void notifyJobCreation(UUID jobId, JobSpecification jobSpec) {
+ List<OnlineFileSplit> splits = null;
+ // TODO: Add splits into the queue
+ queue.addSplits(jobId, splits);
+ }
+ });
+ }
- @Override
- public void setApplicationContext(IApplicationContext appCtx) {
- // TODO Auto-generated method stub
+ @Override
+ public void stop() throws Exception {
- }
+ }
-}
+ @Override
+ public void setApplicationContext(ICCApplicationContext appCtx) {
+ this.appCtx = appCtx;
+ }
+}
\ No newline at end of file
diff --git a/hyracks-examples/onlineaggregation-example/onlineaggregationhelper/src/main/java/edu/uci/ics/hyracks/examples/onlineaggregation/CentralInputSplitQueue.java b/hyracks-examples/onlineaggregation-example/onlineaggregationhelper/src/main/java/edu/uci/ics/hyracks/examples/onlineaggregation/CentralInputSplitQueue.java
new file mode 100644
index 0000000..1132d97
--- /dev/null
+++ b/hyracks-examples/onlineaggregation-example/onlineaggregationhelper/src/main/java/edu/uci/ics/hyracks/examples/onlineaggregation/CentralInputSplitQueue.java
@@ -0,0 +1,42 @@
+package edu.uci.ics.hyracks.examples.onlineaggregation;
+
+import java.rmi.RemoteException;
+import java.rmi.server.UnicastRemoteObject;
+import java.util.Hashtable;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+public class CentralInputSplitQueue extends UnicastRemoteObject implements IInputSplitQueue {
+ private static final long serialVersionUID = 1L;
+
+ private Map<UUID, LinkedList<OnlineFileSplit>> jobQueues;
+
+ public CentralInputSplitQueue() throws RemoteException {
+ jobQueues = new Hashtable<UUID, LinkedList<OnlineFileSplit>>();
+ }
+
+ @Override
+ public MarshalledWritable<OnlineFileSplit> getNext(UUID jobId) throws Exception {
+ LinkedList<OnlineFileSplit> splitQueue = jobQueues.get(jobId);
+ if (splitQueue == null) {
+ return null;
+ }
+ OnlineFileSplit nextSplit = null;
+ synchronized (splitQueue) {
+ nextSplit = !splitQueue.isEmpty() ? splitQueue.remove() : null;
+ }
+ if (nextSplit != null) {
+ MarshalledWritable<OnlineFileSplit> w = new MarshalledWritable<OnlineFileSplit>();
+ w.set(nextSplit);
+ return w;
+ }
+ return null;
+ }
+
+ public void addSplits(UUID jobId, List<OnlineFileSplit> splits) {
+ LinkedList<OnlineFileSplit> splitQueue = new LinkedList<OnlineFileSplit>(splits);
+ jobQueues.put(jobId, splitQueue);
+ }
+}
\ No newline at end of file
diff --git a/hyracks-examples/onlineaggregation-example/onlineaggregationhelper/src/main/java/edu/uci/ics/hyracks/examples/onlineaggregation/CentralQueueAccessor.java b/hyracks-examples/onlineaggregation-example/onlineaggregationhelper/src/main/java/edu/uci/ics/hyracks/examples/onlineaggregation/CentralQueueAccessor.java
new file mode 100644
index 0000000..dc3d90d
--- /dev/null
+++ b/hyracks-examples/onlineaggregation-example/onlineaggregationhelper/src/main/java/edu/uci/ics/hyracks/examples/onlineaggregation/CentralQueueAccessor.java
@@ -0,0 +1,15 @@
+package edu.uci.ics.hyracks.examples.onlineaggregation;
+
+import edu.uci.ics.hyracks.api.application.INCApplicationContext;
+
+public class CentralQueueAccessor {
+ private static IInputSplitQueue QUEUE = null;
+
+ public static void initialize(INCApplicationContext appCtx) {
+ QUEUE = (IInputSplitQueue) appCtx.getDestributedState();
+ }
+
+ public static IInputSplitQueue getQueue() {
+ return QUEUE;
+ }
+}
\ No newline at end of file
diff --git a/hyracks-examples/onlineaggregation-example/onlineaggregationhelper/src/main/java/edu/uci/ics/hyracks/examples/onlineaggregation/IInputSplitQueue.java b/hyracks-examples/onlineaggregation-example/onlineaggregationhelper/src/main/java/edu/uci/ics/hyracks/examples/onlineaggregation/IInputSplitQueue.java
new file mode 100644
index 0000000..ed747a7
--- /dev/null
+++ b/hyracks-examples/onlineaggregation-example/onlineaggregationhelper/src/main/java/edu/uci/ics/hyracks/examples/onlineaggregation/IInputSplitQueue.java
@@ -0,0 +1,8 @@
+package edu.uci.ics.hyracks.examples.onlineaggregation;
+
+import java.rmi.Remote;
+import java.util.UUID;
+
+public interface IInputSplitQueue extends Remote {
+ public MarshalledWritable<OnlineFileSplit> getNext(UUID jobId) throws Exception;
+}
\ No newline at end of file
diff --git a/hyracks-examples/onlineaggregation-example/onlineaggregationhelper/src/main/java/edu/uci/ics/hyracks/examples/onlineaggregation/IInputSplitProvider.java b/hyracks-examples/onlineaggregation-example/onlineaggregationhelper/src/main/java/edu/uci/ics/hyracks/examples/onlineaggregation/IOnlineInputSplitProvider.java
similarity index 80%
rename from hyracks-examples/onlineaggregation-example/onlineaggregationhelper/src/main/java/edu/uci/ics/hyracks/examples/onlineaggregation/IInputSplitProvider.java
rename to hyracks-examples/onlineaggregation-example/onlineaggregationhelper/src/main/java/edu/uci/ics/hyracks/examples/onlineaggregation/IOnlineInputSplitProvider.java
index 8c99d17..0a33636 100644
--- a/hyracks-examples/onlineaggregation-example/onlineaggregationhelper/src/main/java/edu/uci/ics/hyracks/examples/onlineaggregation/IInputSplitProvider.java
+++ b/hyracks-examples/onlineaggregation-example/onlineaggregationhelper/src/main/java/edu/uci/ics/hyracks/examples/onlineaggregation/IOnlineInputSplitProvider.java
@@ -14,9 +14,8 @@
*/
package edu.uci.ics.hyracks.examples.onlineaggregation;
-import org.apache.hadoop.mapreduce.InputSplit;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-
-public interface IInputSplitProvider<IS extends InputSplit> {
- public IS next();
+public interface IOnlineInputSplitProvider {
+ public OnlineFileSplit next() throws HyracksDataException;
}
\ No newline at end of file
diff --git a/hyracks-examples/onlineaggregation-example/onlineaggregationhelper/src/main/java/edu/uci/ics/hyracks/examples/onlineaggregation/IInputSplitProviderFactory.java b/hyracks-examples/onlineaggregation-example/onlineaggregationhelper/src/main/java/edu/uci/ics/hyracks/examples/onlineaggregation/IOnlineInputSplitProviderFactory.java
similarity index 79%
rename from hyracks-examples/onlineaggregation-example/onlineaggregationhelper/src/main/java/edu/uci/ics/hyracks/examples/onlineaggregation/IInputSplitProviderFactory.java
rename to hyracks-examples/onlineaggregation-example/onlineaggregationhelper/src/main/java/edu/uci/ics/hyracks/examples/onlineaggregation/IOnlineInputSplitProviderFactory.java
index 1f9adba..60731b2 100644
--- a/hyracks-examples/onlineaggregation-example/onlineaggregationhelper/src/main/java/edu/uci/ics/hyracks/examples/onlineaggregation/IInputSplitProviderFactory.java
+++ b/hyracks-examples/onlineaggregation-example/onlineaggregationhelper/src/main/java/edu/uci/ics/hyracks/examples/onlineaggregation/IOnlineInputSplitProviderFactory.java
@@ -15,11 +15,10 @@
package edu.uci.ics.hyracks.examples.onlineaggregation;
import java.io.Serializable;
-
-import org.apache.hadoop.mapreduce.InputSplit;
+import java.util.UUID;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-public interface IInputSplitProviderFactory<P extends IInputSplitProvider> extends Serializable {
- public P create(int id) throws HyracksDataException;
+public interface IOnlineInputSplitProviderFactory extends Serializable {
+ public IOnlineInputSplitProvider createInputSplitProvider(UUID jobId, int id) throws HyracksDataException;
}
\ No newline at end of file
diff --git a/hyracks-examples/onlineaggregation-example/onlineaggregationhelper/src/main/java/edu/uci/ics/hyracks/examples/onlineaggregation/MapperOperatorDescriptor.java b/hyracks-examples/onlineaggregation-example/onlineaggregationhelper/src/main/java/edu/uci/ics/hyracks/examples/onlineaggregation/MapperOperatorDescriptor.java
index 78666a2..d2f6f16 100644
--- a/hyracks-examples/onlineaggregation-example/onlineaggregationhelper/src/main/java/edu/uci/ics/hyracks/examples/onlineaggregation/MapperOperatorDescriptor.java
+++ b/hyracks-examples/onlineaggregation-example/onlineaggregationhelper/src/main/java/edu/uci/ics/hyracks/examples/onlineaggregation/MapperOperatorDescriptor.java
@@ -21,7 +21,6 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.InputFormat;
-import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.RecordWriter;
@@ -48,10 +47,10 @@
private static final long serialVersionUID = 1L;
private final int jobId;
private final MarshalledWritable<Configuration> config;
- private final OnlineInputSplitProviderFactory factory;
+ private final IOnlineInputSplitProviderFactory factory;
public MapperOperatorDescriptor(JobSpecification spec, int jobId, MarshalledWritable<Configuration> config,
- OnlineInputSplitProviderFactory factory) throws HyracksDataException {
+ IOnlineInputSplitProviderFactory factory) throws HyracksDataException {
super(spec, 0, 1);
this.jobId = jobId;
this.config = config;
@@ -68,7 +67,7 @@
final Configuration conf = helper.getConfiguration();
final Mapper<K1, V1, K2, V2> mapper = helper.getMapper();
final InputFormat<K1, V1> inputFormat = helper.getInputFormat();
- final OnlineInputSplitProvider isp = factory.create(partition);
+ final IOnlineInputSplitProvider isp = factory.createInputSplitProvider(ctx.getJobId(), partition);
final TaskAttemptID taId = new TaskAttemptID("foo", jobId, true, partition, 0);
final TaskAttemptContext taskAttemptContext = helper.createTaskAttemptContext(taId);
@@ -161,7 +160,8 @@
OnlineFileSplit split = null;
while ((split = isp.next()) != null) {
try {
- RecordReader<K1, V1> recordReader = inputFormat.createRecordReader(split, taskAttemptContext);
+ RecordReader<K1, V1> recordReader = inputFormat.createRecordReader(split,
+ taskAttemptContext);
ClassLoader ctxCL = Thread.currentThread().getContextClassLoader();
try {
Thread.currentThread().setContextClassLoader(getClass().getClassLoader());
diff --git a/hyracks-examples/onlineaggregation-example/onlineaggregationhelper/src/main/java/edu/uci/ics/hyracks/examples/onlineaggregation/NCBootstrap.java b/hyracks-examples/onlineaggregation-example/onlineaggregationhelper/src/main/java/edu/uci/ics/hyracks/examples/onlineaggregation/NCBootstrap.java
index f3f8524..dc89300 100644
--- a/hyracks-examples/onlineaggregation-example/onlineaggregationhelper/src/main/java/edu/uci/ics/hyracks/examples/onlineaggregation/NCBootstrap.java
+++ b/hyracks-examples/onlineaggregation-example/onlineaggregationhelper/src/main/java/edu/uci/ics/hyracks/examples/onlineaggregation/NCBootstrap.java
@@ -1,26 +1,22 @@
package edu.uci.ics.hyracks.examples.onlineaggregation;
-import edu.uci.ics.hyracks.api.application.IApplicationContext;
-import edu.uci.ics.hyracks.api.application.IBootstrap;
+import edu.uci.ics.hyracks.api.application.INCApplicationContext;
+import edu.uci.ics.hyracks.api.application.INCBootstrap;
-public class NCBootstrap implements IBootstrap {
+public class NCBootstrap implements INCBootstrap {
+ INCApplicationContext appCtx;
- @Override
- public void start() throws Exception {
- // TODO Auto-generated method stub
+ @Override
+ public void start() throws Exception {
+ CentralQueueAccessor.initialize(appCtx);
+ }
- }
+ @Override
+ public void stop() throws Exception {
+ }
- @Override
- public void stop() throws Exception {
- // TODO Auto-generated method stub
-
- }
-
- @Override
- public void setApplicationContext(IApplicationContext appCtx) {
- // TODO Auto-generated method stub
-
- }
-
-}
+ @Override
+ public void setApplicationContext(INCApplicationContext appCtx) {
+ this.appCtx = appCtx;
+ }
+}
\ No newline at end of file
diff --git a/hyracks-examples/onlineaggregation-example/onlineaggregationhelper/src/main/java/edu/uci/ics/hyracks/examples/onlineaggregation/OnlineFileSplit.java b/hyracks-examples/onlineaggregation-example/onlineaggregationhelper/src/main/java/edu/uci/ics/hyracks/examples/onlineaggregation/OnlineFileSplit.java
index ebebc1c..9fb64e1 100644
--- a/hyracks-examples/onlineaggregation-example/onlineaggregationhelper/src/main/java/edu/uci/ics/hyracks/examples/onlineaggregation/OnlineFileSplit.java
+++ b/hyracks-examples/onlineaggregation-example/onlineaggregationhelper/src/main/java/edu/uci/ics/hyracks/examples/onlineaggregation/OnlineFileSplit.java
@@ -5,7 +5,6 @@
import java.io.IOException;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
public class OnlineFileSplit extends FileSplit {
diff --git a/hyracks-examples/onlineaggregation-example/onlineaggregationhelper/src/main/java/edu/uci/ics/hyracks/examples/onlineaggregation/OnlineInputSplitProvider.java b/hyracks-examples/onlineaggregation-example/onlineaggregationhelper/src/main/java/edu/uci/ics/hyracks/examples/onlineaggregation/OnlineInputSplitProvider.java
index 5f9ccb6..5250ef6 100644
--- a/hyracks-examples/onlineaggregation-example/onlineaggregationhelper/src/main/java/edu/uci/ics/hyracks/examples/onlineaggregation/OnlineInputSplitProvider.java
+++ b/hyracks-examples/onlineaggregation-example/onlineaggregationhelper/src/main/java/edu/uci/ics/hyracks/examples/onlineaggregation/OnlineInputSplitProvider.java
@@ -1,18 +1,28 @@
package edu.uci.ics.hyracks.examples.onlineaggregation;
-import java.util.Queue;
+import java.util.UUID;
-public class OnlineInputSplitProvider implements IInputSplitProvider<OnlineFileSplit> {
-
- private Queue<OnlineFileSplit> queue;
-
- public OnlineInputSplitProvider(Queue<OnlineFileSplit> queue) {
- this.queue = queue;
- }
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
- @Override
- public OnlineFileSplit next() {
- return this.queue.size() > 0 ? this.queue.remove() : null;
- }
+public class OnlineInputSplitProvider implements IOnlineInputSplitProvider {
+ private UUID jobId;
+ private IInputSplitQueue queue;
-}
+ public OnlineInputSplitProvider(UUID jobId, IInputSplitQueue queue) {
+ this.jobId = jobId;
+ this.queue = queue;
+ }
+
+ @Override
+ public OnlineFileSplit next() throws HyracksDataException {
+ try {
+ MarshalledWritable<OnlineFileSplit> w = queue.getNext(jobId);
+ if (w == null) {
+ return null;
+ }
+ return w.get();
+ } catch (Exception e) {
+ throw new HyracksDataException(e);
+ }
+ }
+}
\ No newline at end of file
diff --git a/hyracks-examples/onlineaggregation-example/onlineaggregationhelper/src/main/java/edu/uci/ics/hyracks/examples/onlineaggregation/OnlineInputSplitProviderFactory.java b/hyracks-examples/onlineaggregation-example/onlineaggregationhelper/src/main/java/edu/uci/ics/hyracks/examples/onlineaggregation/OnlineInputSplitProviderFactory.java
index 0cdb582..950ed3d 100644
--- a/hyracks-examples/onlineaggregation-example/onlineaggregationhelper/src/main/java/edu/uci/ics/hyracks/examples/onlineaggregation/OnlineInputSplitProviderFactory.java
+++ b/hyracks-examples/onlineaggregation-example/onlineaggregationhelper/src/main/java/edu/uci/ics/hyracks/examples/onlineaggregation/OnlineInputSplitProviderFactory.java
@@ -14,36 +14,19 @@
*/
package edu.uci.ics.hyracks.examples.onlineaggregation;
-import java.util.Queue;
-import java.util.concurrent.LinkedBlockingQueue;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapreduce.JobContext;
+import java.util.UUID;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-public class OnlineInputSplitProviderFactory implements IInputSplitProviderFactory<OnlineInputSplitProvider> {
+public class OnlineInputSplitProviderFactory implements IOnlineInputSplitProviderFactory {
private static final long serialVersionUID = 1L;
- private MarshalledWritable<Configuration> mConfig;
-
- private Queue<OnlineFileSplit> splits;
-
- public OnlineInputSplitProviderFactory(MarshalledWritable<Configuration> mConfig) throws HyracksDataException {
- this.mConfig = mConfig;
- this.splits = new LinkedBlockingQueue<OnlineFileSplit>();
-
- try {
- HadoopHelper helper = new HadoopHelper(mConfig);
- JobContext jCtx = helper.createJobContext();
- // TODO: this.splits.addAll(helper.getInputFormat().getSplits(jCtx));
- } catch (Exception e) {
- throw new HyracksDataException(e);
- }
+ public OnlineInputSplitProviderFactory() {
}
@Override
- public OnlineInputSplitProvider create(int id) throws HyracksDataException {
- return new OnlineInputSplitProvider(this.splits);
+ public IOnlineInputSplitProvider createInputSplitProvider(UUID jobId, int id) throws HyracksDataException {
+ IInputSplitQueue queue = CentralQueueAccessor.getQueue();
+ return new OnlineInputSplitProvider(jobId, queue);
}
}
\ No newline at end of file