make genomix-core runable
svn merge -r2905:2910  https://hyracks.googlecode.com/svn/branches/fullstack_asterix_stabilization


git-svn-id: https://hyracks.googlecode.com/svn/branches/fullstack_genomix@2912 123451ca-8445-de46-9d55-352943316053
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/jobgen/impl/PlanCompiler.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/jobgen/impl/PlanCompiler.java
index bbe8fb3..63a6852 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/jobgen/impl/PlanCompiler.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/jobgen/impl/PlanCompiler.java
@@ -54,6 +54,8 @@
         operatorVisitedToParents.clear();
         builder.buildSpec(rootOps);
         spec.setConnectorPolicyAssignmentPolicy(new ConnectorPolicyAssignmentPolicy());
+        // Do not do activity cluster planning because it is slow on large clusters
+        spec.setUseConnectorPolicyForScheduling(false);
         return spec;
     }
 
diff --git a/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/IntroduceGroupByCombinerRule.java b/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/IntroduceGroupByCombinerRule.java
index 5c5fdb1..a8864fe 100644
--- a/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/IntroduceGroupByCombinerRule.java
+++ b/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/IntroduceGroupByCombinerRule.java
@@ -38,7 +38,6 @@
         if (context.checkIfInDontApplySet(this, op)) {
             return false;
         }
-        context.addToDontApplySet(this, op);
         if (op.getOperatorTag() != LogicalOperatorTag.GROUP) {
             return false;
         }
@@ -86,6 +85,7 @@
         opRef3.setValue(newGbyOp);
         typeGby(newGbyOp, context);
         typeGby(gbyOp, context);
+    	context.addToDontApplySet(this, op);
         return true;
     }
 
diff --git a/hyracks/hyracks-hdfs/src/main/java/edu/uci/ics/hyracks/hdfs/dataflow/HDFSReadOperatorDescriptor.java b/hyracks/hyracks-hdfs/src/main/java/edu/uci/ics/hyracks/hdfs/dataflow/HDFSReadOperatorDescriptor.java
index 7f65bc7..e924650 100644
--- a/hyracks/hyracks-hdfs/src/main/java/edu/uci/ics/hyracks/hdfs/dataflow/HDFSReadOperatorDescriptor.java
+++ b/hyracks/hyracks-hdfs/src/main/java/edu/uci/ics/hyracks/hdfs/dataflow/HDFSReadOperatorDescriptor.java
@@ -89,7 +89,6 @@
             IRecordDescriptorProvider recordDescProvider, final int partition, final int nPartitions)
             throws HyracksDataException {
         final InputSplit[] inputSplits = splitsFactory.getSplits();
-        final JobConf conf = confFactory.getConf();
 
         return new AbstractUnaryOutputSourceOperatorNodePushable() {
             private String nodeName = ctx.getJobletContext().getApplicationContext().getNodeId();
@@ -100,6 +99,7 @@
                 ClassLoader ctxCL = Thread.currentThread().getContextClassLoader();
                 try {
                     Thread.currentThread().setContextClassLoader(getClass().getClassLoader());
+                    JobConf conf = confFactory.getConf();
                     IKeyValueParser parser = tupleParserFactory.createKeyValueParser(ctx);
                     writer.open();
                     InputFormat inputFormat = conf.getInputFormat();
diff --git a/hyracks/hyracks-hdfs/src/main/java/edu/uci/ics/hyracks/hdfs/dataflow/HDFSWriteOperatorDescriptor.java b/hyracks/hyracks-hdfs/src/main/java/edu/uci/ics/hyracks/hdfs/dataflow/HDFSWriteOperatorDescriptor.java
index e29848c..ff97a29 100644
--- a/hyracks/hyracks-hdfs/src/main/java/edu/uci/ics/hyracks/hdfs/dataflow/HDFSWriteOperatorDescriptor.java
+++ b/hyracks/hyracks-hdfs/src/main/java/edu/uci/ics/hyracks/hdfs/dataflow/HDFSWriteOperatorDescriptor.java
@@ -71,20 +71,24 @@
     public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
             final IRecordDescriptorProvider recordDescProvider, final int partition, final int nPartitions)
             throws HyracksDataException {
-        final JobConf conf = confFactory.getConf();
-        final String outputDirPath = FileOutputFormat.getOutputPath(conf).toString();
 
         return new AbstractUnaryInputSinkOperatorNodePushable() {
 
-            private String fileName = outputDirPath + File.separator + "part-" + partition;
             private FSDataOutputStream dos;
             private RecordDescriptor inputRd = recordDescProvider.getInputRecordDescriptor(getActivityId(), 0);;
             private FrameTupleAccessor accessor = new FrameTupleAccessor(ctx.getFrameSize(), inputRd);
             private FrameTupleReference tuple = new FrameTupleReference();
             private ITupleWriter tupleWriter;
+            private ClassLoader ctxCL;
 
             @Override
             public void open() throws HyracksDataException {
+                ctxCL = Thread.currentThread().getContextClassLoader();
+                Thread.currentThread().setContextClassLoader(this.getClass().getClassLoader());
+                JobConf conf = confFactory.getConf();
+                String outputDirPath = FileOutputFormat.getOutputPath(conf).toString();
+                String fileName = outputDirPath + File.separator + "part-" + partition;
+
                 tupleWriter = tupleWriterFactory.getTupleWriter();
                 try {
                     FileSystem dfs = FileSystem.get(conf);
@@ -115,6 +119,8 @@
                     dos.close();
                 } catch (Exception e) {
                     throw new HyracksDataException(e);
+                } finally {
+                    Thread.currentThread().setContextClassLoader(ctxCL);
                 }
             }
 
diff --git a/hyracks/hyracks-hdfs/src/main/java/edu/uci/ics/hyracks/hdfs2/dataflow/HDFSReadOperatorDescriptor.java b/hyracks/hyracks-hdfs/src/main/java/edu/uci/ics/hyracks/hdfs2/dataflow/HDFSReadOperatorDescriptor.java
index 6ece1aa..b000c75 100644
--- a/hyracks/hyracks-hdfs/src/main/java/edu/uci/ics/hyracks/hdfs2/dataflow/HDFSReadOperatorDescriptor.java
+++ b/hyracks/hyracks-hdfs/src/main/java/edu/uci/ics/hyracks/hdfs2/dataflow/HDFSReadOperatorDescriptor.java
@@ -97,7 +97,6 @@
     public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
             IRecordDescriptorProvider recordDescProvider, final int partition, final int nPartitions)
             throws HyracksDataException {
-        final Job conf = confFactory.getConf();
         final List<FileSplit> inputSplits = splitsFactory.getSplits();
 
         return new AbstractUnaryOutputSourceOperatorNodePushable() {
@@ -109,6 +108,7 @@
                 ClassLoader ctxCL = Thread.currentThread().getContextClassLoader();
                 try {
                     Thread.currentThread().setContextClassLoader(this.getClass().getClassLoader());
+                    Job conf = confFactory.getConf();
                     IKeyValueParser parser = tupleParserFactory.createKeyValueParser(ctx);
                     writer.open();
                     InputFormat inputFormat = ReflectionUtils.newInstance(conf.getInputFormatClass(),
diff --git a/hyracks/hyracks-hdfs/src/main/java/edu/uci/ics/hyracks/hdfs2/dataflow/HDFSWriteOperatorDescriptor.java b/hyracks/hyracks-hdfs/src/main/java/edu/uci/ics/hyracks/hdfs2/dataflow/HDFSWriteOperatorDescriptor.java
index 32bb9dc..87c1c47 100644
--- a/hyracks/hyracks-hdfs/src/main/java/edu/uci/ics/hyracks/hdfs2/dataflow/HDFSWriteOperatorDescriptor.java
+++ b/hyracks/hyracks-hdfs/src/main/java/edu/uci/ics/hyracks/hdfs2/dataflow/HDFSWriteOperatorDescriptor.java
@@ -72,21 +72,25 @@
     public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
             final IRecordDescriptorProvider recordDescProvider, final int partition, final int nPartitions)
             throws HyracksDataException {
-        final Job conf = confFactory.getConf();
-        final String outputPath = FileOutputFormat.getOutputPath(new JobContext(conf.getConfiguration(), new JobID()))
-                .toString();
 
         return new AbstractUnaryInputSinkOperatorNodePushable() {
 
-            private String fileName = outputPath + File.separator + "part-" + partition;
             private FSDataOutputStream dos;
             private RecordDescriptor inputRd = recordDescProvider.getInputRecordDescriptor(getActivityId(), 0);;
             private FrameTupleAccessor accessor = new FrameTupleAccessor(ctx.getFrameSize(), inputRd);
             private FrameTupleReference tuple = new FrameTupleReference();
             private ITupleWriter tupleWriter;
+            private ClassLoader ctxCL;
 
             @Override
             public void open() throws HyracksDataException {
+                ctxCL = Thread.currentThread().getContextClassLoader();
+                Thread.currentThread().setContextClassLoader(this.getClass().getClassLoader());
+                Job conf = confFactory.getConf();
+                String outputPath = FileOutputFormat
+                        .getOutputPath(new JobContext(conf.getConfiguration(), new JobID())).toString();
+                String fileName = outputPath + File.separator + "part-" + partition;
+
                 tupleWriter = tupleWriterFactory.getTupleWriter();
                 try {
                     FileSystem dfs = FileSystem.get(conf.getConfiguration());
@@ -117,6 +121,8 @@
                     dos.close();
                 } catch (Exception e) {
                     throw new HyracksDataException(e);
+                } finally {
+                    Thread.currentThread().setContextClassLoader(ctxCL);
                 }
             }
 
diff --git a/hyracks/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/ChannelSet.java b/hyracks/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/ChannelSet.java
index dabc8a4..eb5f716 100644
--- a/hyracks/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/ChannelSet.java
+++ b/hyracks/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/ChannelSet.java
@@ -58,6 +58,7 @@
 
     ChannelControlBlock allocateChannel() throws NetException {
         synchronized (mConn) {
+       	    //cleanupClosedChannels();
             int idx = allocationBitmap.nextClearBit(0);
             if (idx < 0 || idx >= ccbArray.length) {
                 cleanupClosedChannels();
@@ -231,4 +232,4 @@
             ccbArray = Arrays.copyOf(ccbArray, ccbArray.length * 2);
         }
     }
-}
\ No newline at end of file
+}
diff --git a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/ReachabilityVertex.java b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/ReachabilityVertex.java
index 2792d88..0895386 100644
--- a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/ReachabilityVertex.java
+++ b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/ReachabilityVertex.java
@@ -116,10 +116,10 @@
 
     @Override
     public void compute(Iterator<ByteWritable> msgIterator) {
+        if (sourceId < 0) {
+            sourceId = getContext().getConfiguration().getLong(SOURCE_ID, SOURCE_ID_DEFAULT);
+        }
         if (getSuperstep() == 1) {
-            if (sourceId < 0) {
-                sourceId = getContext().getConfiguration().getLong(SOURCE_ID, SOURCE_ID_DEFAULT);
-            }
             boolean isSource = isSource(getVertexId());
             if (isSource) {
                 tmpVertexValue.set((byte) 1);