Added support for central queue

git-svn-id: https://hyracks.googlecode.com/svn/branches/hyracks_online_aggregation@219 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-examples/onlineaggregation-example/onlineaggregationclient/src/main/java/edu/uci/ics/hyracks/examples/onlineaggregation/client/MapReduceMain.java b/hyracks-examples/onlineaggregation-example/onlineaggregationclient/src/main/java/edu/uci/ics/hyracks/examples/onlineaggregation/client/MapReduceMain.java
index 5049f0e..ba0c927 100644
--- a/hyracks-examples/onlineaggregation-example/onlineaggregationclient/src/main/java/edu/uci/ics/hyracks/examples/onlineaggregation/client/MapReduceMain.java
+++ b/hyracks-examples/onlineaggregation-example/onlineaggregationclient/src/main/java/edu/uci/ics/hyracks/examples/onlineaggregation/client/MapReduceMain.java
@@ -81,7 +81,7 @@
         int jobId = (int) System.currentTimeMillis();
 
         MapperOperatorDescriptor<Writable, Writable, Writable, Writable> mapper = new MapperOperatorDescriptor<Writable, Writable, Writable, Writable>(
-                spec, jobId, mConfig, new OnlineInputSplitProviderFactory(mConfig));
+                spec, jobId, mConfig, new OnlineInputSplitProviderFactory());
         mapper.setPartitionConstraint(new PartitionCountConstraint(options.numMaps));
 
         ReducerOperatorDescriptor<Writable, Writable, Writable, Writable> reducer = new ReducerOperatorDescriptor<Writable, Writable, Writable, Writable>(
diff --git a/hyracks-examples/onlineaggregation-example/onlineaggregationhelper/src/main/java/edu/uci/ics/hyracks/examples/onlineaggregation/CCBootstrap.java b/hyracks-examples/onlineaggregation-example/onlineaggregationhelper/src/main/java/edu/uci/ics/hyracks/examples/onlineaggregation/CCBootstrap.java
index cdfe429..52b9edd 100644
--- a/hyracks-examples/onlineaggregation-example/onlineaggregationhelper/src/main/java/edu/uci/ics/hyracks/examples/onlineaggregation/CCBootstrap.java
+++ b/hyracks-examples/onlineaggregation-example/onlineaggregationhelper/src/main/java/edu/uci/ics/hyracks/examples/onlineaggregation/CCBootstrap.java
@@ -1,26 +1,45 @@
 package edu.uci.ics.hyracks.examples.onlineaggregation;
 
-import edu.uci.ics.hyracks.api.application.IApplicationContext;
-import edu.uci.ics.hyracks.api.application.IBootstrap;
+import java.util.List;
+import java.util.UUID;
 
-public class CCBootstrap implements IBootstrap {
+import edu.uci.ics.hyracks.api.application.ICCApplicationContext;
+import edu.uci.ics.hyracks.api.application.ICCBootstrap;
+import edu.uci.ics.hyracks.api.job.IJobLifecycleListener;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
 
-	@Override
-	public void start() throws Exception {
-		// TODO Auto-generated method stub
+public class CCBootstrap implements ICCBootstrap {
+    private ICCApplicationContext appCtx;
 
-	}
+    @Override
+    public void start() throws Exception {
+        final CentralInputSplitQueue queue = new CentralInputSplitQueue();
+        appCtx.setDistributedState(queue);
+        appCtx.addJobLifecycleListener(new IJobLifecycleListener() {
+            @Override
+            public void notifyJobStart(UUID jobId) {
+            }
 
-	@Override
-	public void stop() throws Exception {
-		// TODO Auto-generated method stub
+            @Override
+            public void notifyJobFinish(UUID jobId) {
+            }
 
-	}
+            @Override
+            public void notifyJobCreation(UUID jobId, JobSpecification jobSpec) {
+                List<OnlineFileSplit> splits = null;
+                // TODO: Add splits into the queue
+                queue.addSplits(jobId, splits);
+            }
+        });
+    }
 
-	@Override
-	public void setApplicationContext(IApplicationContext appCtx) {
-		// TODO Auto-generated method stub
+    @Override
+    public void stop() throws Exception {
 
-	}
+    }
 
-}
+    @Override
+    public void setApplicationContext(ICCApplicationContext appCtx) {
+        this.appCtx = appCtx;
+    }
+}
\ No newline at end of file
diff --git a/hyracks-examples/onlineaggregation-example/onlineaggregationhelper/src/main/java/edu/uci/ics/hyracks/examples/onlineaggregation/CentralInputSplitQueue.java b/hyracks-examples/onlineaggregation-example/onlineaggregationhelper/src/main/java/edu/uci/ics/hyracks/examples/onlineaggregation/CentralInputSplitQueue.java
new file mode 100644
index 0000000..1132d97
--- /dev/null
+++ b/hyracks-examples/onlineaggregation-example/onlineaggregationhelper/src/main/java/edu/uci/ics/hyracks/examples/onlineaggregation/CentralInputSplitQueue.java
@@ -0,0 +1,42 @@
+package edu.uci.ics.hyracks.examples.onlineaggregation;
+
+import java.rmi.RemoteException;
+import java.rmi.server.UnicastRemoteObject;
+import java.util.Hashtable;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+public class CentralInputSplitQueue extends UnicastRemoteObject implements IInputSplitQueue {
+    private static final long serialVersionUID = 1L;
+
+    private Map<UUID, LinkedList<OnlineFileSplit>> jobQueues;
+
+    public CentralInputSplitQueue() throws RemoteException {
+        jobQueues = new Hashtable<UUID, LinkedList<OnlineFileSplit>>();
+    }
+
+    @Override
+    public MarshalledWritable<OnlineFileSplit> getNext(UUID jobId) throws Exception {
+        LinkedList<OnlineFileSplit> splitQueue = jobQueues.get(jobId);
+        if (splitQueue == null) {
+            return null;
+        }
+        OnlineFileSplit nextSplit = null;
+        synchronized (splitQueue) {
+            nextSplit = !splitQueue.isEmpty() ? splitQueue.remove() : null;
+        }
+        if (nextSplit != null) {
+            MarshalledWritable<OnlineFileSplit> w = new MarshalledWritable<OnlineFileSplit>();
+            w.set(nextSplit);
+            return w;
+        }
+        return null;
+    }
+
+    public void addSplits(UUID jobId, List<OnlineFileSplit> splits) {
+        LinkedList<OnlineFileSplit> splitQueue = new LinkedList<OnlineFileSplit>(splits);
+        jobQueues.put(jobId, splitQueue);
+    }
+}
\ No newline at end of file
diff --git a/hyracks-examples/onlineaggregation-example/onlineaggregationhelper/src/main/java/edu/uci/ics/hyracks/examples/onlineaggregation/CentralQueueAccessor.java b/hyracks-examples/onlineaggregation-example/onlineaggregationhelper/src/main/java/edu/uci/ics/hyracks/examples/onlineaggregation/CentralQueueAccessor.java
new file mode 100644
index 0000000..dc3d90d
--- /dev/null
+++ b/hyracks-examples/onlineaggregation-example/onlineaggregationhelper/src/main/java/edu/uci/ics/hyracks/examples/onlineaggregation/CentralQueueAccessor.java
@@ -0,0 +1,15 @@
+package edu.uci.ics.hyracks.examples.onlineaggregation;
+
+import edu.uci.ics.hyracks.api.application.INCApplicationContext;
+
+public class CentralQueueAccessor {
+    private static IInputSplitQueue QUEUE = null;
+
+    public static void initialize(INCApplicationContext appCtx) {
+        QUEUE = (IInputSplitQueue) appCtx.getDestributedState();
+    }
+
+    public static IInputSplitQueue getQueue() {
+        return QUEUE;
+    }
+}
\ No newline at end of file
diff --git a/hyracks-examples/onlineaggregation-example/onlineaggregationhelper/src/main/java/edu/uci/ics/hyracks/examples/onlineaggregation/IInputSplitQueue.java b/hyracks-examples/onlineaggregation-example/onlineaggregationhelper/src/main/java/edu/uci/ics/hyracks/examples/onlineaggregation/IInputSplitQueue.java
new file mode 100644
index 0000000..ed747a7
--- /dev/null
+++ b/hyracks-examples/onlineaggregation-example/onlineaggregationhelper/src/main/java/edu/uci/ics/hyracks/examples/onlineaggregation/IInputSplitQueue.java
@@ -0,0 +1,8 @@
+package edu.uci.ics.hyracks.examples.onlineaggregation;
+
+import java.rmi.Remote;
+import java.util.UUID;
+
+public interface IInputSplitQueue extends Remote {
+    public MarshalledWritable<OnlineFileSplit> getNext(UUID jobId) throws Exception;
+}
\ No newline at end of file
diff --git a/hyracks-examples/onlineaggregation-example/onlineaggregationhelper/src/main/java/edu/uci/ics/hyracks/examples/onlineaggregation/IInputSplitProvider.java b/hyracks-examples/onlineaggregation-example/onlineaggregationhelper/src/main/java/edu/uci/ics/hyracks/examples/onlineaggregation/IOnlineInputSplitProvider.java
similarity index 80%
rename from hyracks-examples/onlineaggregation-example/onlineaggregationhelper/src/main/java/edu/uci/ics/hyracks/examples/onlineaggregation/IInputSplitProvider.java
rename to hyracks-examples/onlineaggregation-example/onlineaggregationhelper/src/main/java/edu/uci/ics/hyracks/examples/onlineaggregation/IOnlineInputSplitProvider.java
index 8c99d17..0a33636 100644
--- a/hyracks-examples/onlineaggregation-example/onlineaggregationhelper/src/main/java/edu/uci/ics/hyracks/examples/onlineaggregation/IInputSplitProvider.java
+++ b/hyracks-examples/onlineaggregation-example/onlineaggregationhelper/src/main/java/edu/uci/ics/hyracks/examples/onlineaggregation/IOnlineInputSplitProvider.java
@@ -14,9 +14,8 @@
  */
 package edu.uci.ics.hyracks.examples.onlineaggregation;
 
-import org.apache.hadoop.mapreduce.InputSplit;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 
-
-public interface IInputSplitProvider<IS extends InputSplit> {
-    public IS next();
+public interface IOnlineInputSplitProvider {
+    public OnlineFileSplit next() throws HyracksDataException;
 }
\ No newline at end of file
diff --git a/hyracks-examples/onlineaggregation-example/onlineaggregationhelper/src/main/java/edu/uci/ics/hyracks/examples/onlineaggregation/IInputSplitProviderFactory.java b/hyracks-examples/onlineaggregation-example/onlineaggregationhelper/src/main/java/edu/uci/ics/hyracks/examples/onlineaggregation/IOnlineInputSplitProviderFactory.java
similarity index 79%
rename from hyracks-examples/onlineaggregation-example/onlineaggregationhelper/src/main/java/edu/uci/ics/hyracks/examples/onlineaggregation/IInputSplitProviderFactory.java
rename to hyracks-examples/onlineaggregation-example/onlineaggregationhelper/src/main/java/edu/uci/ics/hyracks/examples/onlineaggregation/IOnlineInputSplitProviderFactory.java
index 1f9adba..60731b2 100644
--- a/hyracks-examples/onlineaggregation-example/onlineaggregationhelper/src/main/java/edu/uci/ics/hyracks/examples/onlineaggregation/IInputSplitProviderFactory.java
+++ b/hyracks-examples/onlineaggregation-example/onlineaggregationhelper/src/main/java/edu/uci/ics/hyracks/examples/onlineaggregation/IOnlineInputSplitProviderFactory.java
@@ -15,11 +15,10 @@
 package edu.uci.ics.hyracks.examples.onlineaggregation;
 
 import java.io.Serializable;
-
-import org.apache.hadoop.mapreduce.InputSplit;
+import java.util.UUID;
 
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 
-public interface IInputSplitProviderFactory<P extends IInputSplitProvider> extends Serializable {
-    public P create(int id) throws HyracksDataException;
+public interface IOnlineInputSplitProviderFactory extends Serializable {
+    public IOnlineInputSplitProvider createInputSplitProvider(UUID jobId, int id) throws HyracksDataException;
 }
\ No newline at end of file
diff --git a/hyracks-examples/onlineaggregation-example/onlineaggregationhelper/src/main/java/edu/uci/ics/hyracks/examples/onlineaggregation/MapperOperatorDescriptor.java b/hyracks-examples/onlineaggregation-example/onlineaggregationhelper/src/main/java/edu/uci/ics/hyracks/examples/onlineaggregation/MapperOperatorDescriptor.java
index 78666a2..d2f6f16 100644
--- a/hyracks-examples/onlineaggregation-example/onlineaggregationhelper/src/main/java/edu/uci/ics/hyracks/examples/onlineaggregation/MapperOperatorDescriptor.java
+++ b/hyracks-examples/onlineaggregation-example/onlineaggregationhelper/src/main/java/edu/uci/ics/hyracks/examples/onlineaggregation/MapperOperatorDescriptor.java
@@ -21,7 +21,6 @@
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.mapreduce.InputFormat;
-import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.Mapper;
 import org.apache.hadoop.mapreduce.RecordReader;
 import org.apache.hadoop.mapreduce.RecordWriter;
@@ -48,10 +47,10 @@
     private static final long serialVersionUID = 1L;
     private final int jobId;
     private final MarshalledWritable<Configuration> config;
-    private final OnlineInputSplitProviderFactory factory;
+    private final IOnlineInputSplitProviderFactory factory;
 
     public MapperOperatorDescriptor(JobSpecification spec, int jobId, MarshalledWritable<Configuration> config,
-            OnlineInputSplitProviderFactory factory) throws HyracksDataException {
+            IOnlineInputSplitProviderFactory factory) throws HyracksDataException {
         super(spec, 0, 1);
         this.jobId = jobId;
         this.config = config;
@@ -68,7 +67,7 @@
         final Configuration conf = helper.getConfiguration();
         final Mapper<K1, V1, K2, V2> mapper = helper.getMapper();
         final InputFormat<K1, V1> inputFormat = helper.getInputFormat();
-        final OnlineInputSplitProvider isp = factory.create(partition);
+        final IOnlineInputSplitProvider isp = factory.createInputSplitProvider(ctx.getJobId(), partition);
         final TaskAttemptID taId = new TaskAttemptID("foo", jobId, true, partition, 0);
         final TaskAttemptContext taskAttemptContext = helper.createTaskAttemptContext(taId);
 
@@ -161,7 +160,8 @@
                     OnlineFileSplit split = null;
                     while ((split = isp.next()) != null) {
                         try {
-                            RecordReader<K1, V1> recordReader = inputFormat.createRecordReader(split, taskAttemptContext);
+                            RecordReader<K1, V1> recordReader = inputFormat.createRecordReader(split,
+                                    taskAttemptContext);
                             ClassLoader ctxCL = Thread.currentThread().getContextClassLoader();
                             try {
                                 Thread.currentThread().setContextClassLoader(getClass().getClassLoader());
diff --git a/hyracks-examples/onlineaggregation-example/onlineaggregationhelper/src/main/java/edu/uci/ics/hyracks/examples/onlineaggregation/NCBootstrap.java b/hyracks-examples/onlineaggregation-example/onlineaggregationhelper/src/main/java/edu/uci/ics/hyracks/examples/onlineaggregation/NCBootstrap.java
index f3f8524..dc89300 100644
--- a/hyracks-examples/onlineaggregation-example/onlineaggregationhelper/src/main/java/edu/uci/ics/hyracks/examples/onlineaggregation/NCBootstrap.java
+++ b/hyracks-examples/onlineaggregation-example/onlineaggregationhelper/src/main/java/edu/uci/ics/hyracks/examples/onlineaggregation/NCBootstrap.java
@@ -1,26 +1,22 @@
 package edu.uci.ics.hyracks.examples.onlineaggregation;
 
-import edu.uci.ics.hyracks.api.application.IApplicationContext;
-import edu.uci.ics.hyracks.api.application.IBootstrap;
+import edu.uci.ics.hyracks.api.application.INCApplicationContext;
+import edu.uci.ics.hyracks.api.application.INCBootstrap;
 
-public class NCBootstrap implements IBootstrap {
+public class NCBootstrap implements INCBootstrap {
+    INCApplicationContext appCtx;
 
-	@Override
-	public void start() throws Exception {
-		// TODO Auto-generated method stub
+    @Override
+    public void start() throws Exception {
+        CentralQueueAccessor.initialize(appCtx);
+    }
 
-	}
+    @Override
+    public void stop() throws Exception {
+    }
 
-	@Override
-	public void stop() throws Exception {
-		// TODO Auto-generated method stub
-
-	}
-
-	@Override
-	public void setApplicationContext(IApplicationContext appCtx) {
-		// TODO Auto-generated method stub
-
-	}
-
-}
+    @Override
+    public void setApplicationContext(INCApplicationContext appCtx) {
+        this.appCtx = appCtx;
+    }
+}
\ No newline at end of file
diff --git a/hyracks-examples/onlineaggregation-example/onlineaggregationhelper/src/main/java/edu/uci/ics/hyracks/examples/onlineaggregation/OnlineFileSplit.java b/hyracks-examples/onlineaggregation-example/onlineaggregationhelper/src/main/java/edu/uci/ics/hyracks/examples/onlineaggregation/OnlineFileSplit.java
index ebebc1c..9fb64e1 100644
--- a/hyracks-examples/onlineaggregation-example/onlineaggregationhelper/src/main/java/edu/uci/ics/hyracks/examples/onlineaggregation/OnlineFileSplit.java
+++ b/hyracks-examples/onlineaggregation-example/onlineaggregationhelper/src/main/java/edu/uci/ics/hyracks/examples/onlineaggregation/OnlineFileSplit.java
@@ -5,7 +5,6 @@
 import java.io.IOException;
 
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.lib.input.FileSplit;
 
 public class OnlineFileSplit extends FileSplit {
diff --git a/hyracks-examples/onlineaggregation-example/onlineaggregationhelper/src/main/java/edu/uci/ics/hyracks/examples/onlineaggregation/OnlineInputSplitProvider.java b/hyracks-examples/onlineaggregation-example/onlineaggregationhelper/src/main/java/edu/uci/ics/hyracks/examples/onlineaggregation/OnlineInputSplitProvider.java
index 5f9ccb6..5250ef6 100644
--- a/hyracks-examples/onlineaggregation-example/onlineaggregationhelper/src/main/java/edu/uci/ics/hyracks/examples/onlineaggregation/OnlineInputSplitProvider.java
+++ b/hyracks-examples/onlineaggregation-example/onlineaggregationhelper/src/main/java/edu/uci/ics/hyracks/examples/onlineaggregation/OnlineInputSplitProvider.java
@@ -1,18 +1,28 @@
 package edu.uci.ics.hyracks.examples.onlineaggregation;
 
-import java.util.Queue;
+import java.util.UUID;
 
-public class OnlineInputSplitProvider implements IInputSplitProvider<OnlineFileSplit> {
-	
-	private Queue<OnlineFileSplit> queue;
-	
-	public OnlineInputSplitProvider(Queue<OnlineFileSplit> queue) {
-		this.queue = queue;
-	}
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 
-	@Override
-	public OnlineFileSplit next() {
-		return this.queue.size() > 0 ? this.queue.remove() : null;
-	}
+public class OnlineInputSplitProvider implements IOnlineInputSplitProvider {
+    private UUID jobId;
+    private IInputSplitQueue queue;
 
-}
+    public OnlineInputSplitProvider(UUID jobId, IInputSplitQueue queue) {
+        this.jobId = jobId;
+        this.queue = queue;
+    }
+
+    @Override
+    public OnlineFileSplit next() throws HyracksDataException {
+        try {
+            MarshalledWritable<OnlineFileSplit> w = queue.getNext(jobId);
+            if (w == null) {
+                return null;
+            }
+            return w.get();
+        } catch (Exception e) {
+            throw new HyracksDataException(e);
+        }
+    }
+}
\ No newline at end of file
diff --git a/hyracks-examples/onlineaggregation-example/onlineaggregationhelper/src/main/java/edu/uci/ics/hyracks/examples/onlineaggregation/OnlineInputSplitProviderFactory.java b/hyracks-examples/onlineaggregation-example/onlineaggregationhelper/src/main/java/edu/uci/ics/hyracks/examples/onlineaggregation/OnlineInputSplitProviderFactory.java
index 0cdb582..950ed3d 100644
--- a/hyracks-examples/onlineaggregation-example/onlineaggregationhelper/src/main/java/edu/uci/ics/hyracks/examples/onlineaggregation/OnlineInputSplitProviderFactory.java
+++ b/hyracks-examples/onlineaggregation-example/onlineaggregationhelper/src/main/java/edu/uci/ics/hyracks/examples/onlineaggregation/OnlineInputSplitProviderFactory.java
@@ -14,36 +14,19 @@
  */
 package edu.uci.ics.hyracks.examples.onlineaggregation;
 
-import java.util.Queue;
-import java.util.concurrent.LinkedBlockingQueue;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapreduce.JobContext;
+import java.util.UUID;
 
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 
-public class OnlineInputSplitProviderFactory implements IInputSplitProviderFactory<OnlineInputSplitProvider> {
+public class OnlineInputSplitProviderFactory implements IOnlineInputSplitProviderFactory {
     private static final long serialVersionUID = 1L;
 
-    private MarshalledWritable<Configuration> mConfig;
-    
-    private Queue<OnlineFileSplit> splits;
-
-    public OnlineInputSplitProviderFactory(MarshalledWritable<Configuration> mConfig) throws HyracksDataException {
-        this.mConfig = mConfig;
-        this.splits = new LinkedBlockingQueue<OnlineFileSplit>();
-        
-        try {
-        	HadoopHelper helper = new HadoopHelper(mConfig);
-            JobContext jCtx = helper.createJobContext();
-            // TODO: this.splits.addAll(helper.getInputFormat().getSplits(jCtx));
-        } catch (Exception e) {
-            throw new HyracksDataException(e);
-        }
+    public OnlineInputSplitProviderFactory() {
     }
 
     @Override
-    public OnlineInputSplitProvider create(int id) throws HyracksDataException {
-    	return new OnlineInputSplitProvider(this.splits);
+    public IOnlineInputSplitProvider createInputSplitProvider(UUID jobId, int id) throws HyracksDataException {
+        IInputSplitQueue queue = CentralQueueAccessor.getQueue();
+        return new OnlineInputSplitProvider(jobId, queue);
     }
 }
\ No newline at end of file