make genomix-core runable
svn merge -r2905:2910 https://hyracks.googlecode.com/svn/branches/fullstack_asterix_stabilization
git-svn-id: https://hyracks.googlecode.com/svn/branches/fullstack_genomix@2912 123451ca-8445-de46-9d55-352943316053
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/jobgen/impl/PlanCompiler.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/jobgen/impl/PlanCompiler.java
index bbe8fb3..63a6852 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/jobgen/impl/PlanCompiler.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/jobgen/impl/PlanCompiler.java
@@ -54,6 +54,8 @@
operatorVisitedToParents.clear();
builder.buildSpec(rootOps);
spec.setConnectorPolicyAssignmentPolicy(new ConnectorPolicyAssignmentPolicy());
+ // Do not do activity cluster planning because it is slow on large clusters
+ spec.setUseConnectorPolicyForScheduling(false);
return spec;
}
diff --git a/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/IntroduceGroupByCombinerRule.java b/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/IntroduceGroupByCombinerRule.java
index 5c5fdb1..a8864fe 100644
--- a/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/IntroduceGroupByCombinerRule.java
+++ b/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/IntroduceGroupByCombinerRule.java
@@ -38,7 +38,6 @@
if (context.checkIfInDontApplySet(this, op)) {
return false;
}
- context.addToDontApplySet(this, op);
if (op.getOperatorTag() != LogicalOperatorTag.GROUP) {
return false;
}
@@ -86,6 +85,7 @@
opRef3.setValue(newGbyOp);
typeGby(newGbyOp, context);
typeGby(gbyOp, context);
+ context.addToDontApplySet(this, op);
return true;
}
diff --git a/hyracks/hyracks-hdfs/src/main/java/edu/uci/ics/hyracks/hdfs/dataflow/HDFSReadOperatorDescriptor.java b/hyracks/hyracks-hdfs/src/main/java/edu/uci/ics/hyracks/hdfs/dataflow/HDFSReadOperatorDescriptor.java
index 7f65bc7..e924650 100644
--- a/hyracks/hyracks-hdfs/src/main/java/edu/uci/ics/hyracks/hdfs/dataflow/HDFSReadOperatorDescriptor.java
+++ b/hyracks/hyracks-hdfs/src/main/java/edu/uci/ics/hyracks/hdfs/dataflow/HDFSReadOperatorDescriptor.java
@@ -89,7 +89,6 @@
IRecordDescriptorProvider recordDescProvider, final int partition, final int nPartitions)
throws HyracksDataException {
final InputSplit[] inputSplits = splitsFactory.getSplits();
- final JobConf conf = confFactory.getConf();
return new AbstractUnaryOutputSourceOperatorNodePushable() {
private String nodeName = ctx.getJobletContext().getApplicationContext().getNodeId();
@@ -100,6 +99,7 @@
ClassLoader ctxCL = Thread.currentThread().getContextClassLoader();
try {
Thread.currentThread().setContextClassLoader(getClass().getClassLoader());
+ JobConf conf = confFactory.getConf();
IKeyValueParser parser = tupleParserFactory.createKeyValueParser(ctx);
writer.open();
InputFormat inputFormat = conf.getInputFormat();
diff --git a/hyracks/hyracks-hdfs/src/main/java/edu/uci/ics/hyracks/hdfs/dataflow/HDFSWriteOperatorDescriptor.java b/hyracks/hyracks-hdfs/src/main/java/edu/uci/ics/hyracks/hdfs/dataflow/HDFSWriteOperatorDescriptor.java
index e29848c..ff97a29 100644
--- a/hyracks/hyracks-hdfs/src/main/java/edu/uci/ics/hyracks/hdfs/dataflow/HDFSWriteOperatorDescriptor.java
+++ b/hyracks/hyracks-hdfs/src/main/java/edu/uci/ics/hyracks/hdfs/dataflow/HDFSWriteOperatorDescriptor.java
@@ -71,20 +71,24 @@
public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
final IRecordDescriptorProvider recordDescProvider, final int partition, final int nPartitions)
throws HyracksDataException {
- final JobConf conf = confFactory.getConf();
- final String outputDirPath = FileOutputFormat.getOutputPath(conf).toString();
return new AbstractUnaryInputSinkOperatorNodePushable() {
- private String fileName = outputDirPath + File.separator + "part-" + partition;
private FSDataOutputStream dos;
private RecordDescriptor inputRd = recordDescProvider.getInputRecordDescriptor(getActivityId(), 0);;
private FrameTupleAccessor accessor = new FrameTupleAccessor(ctx.getFrameSize(), inputRd);
private FrameTupleReference tuple = new FrameTupleReference();
private ITupleWriter tupleWriter;
+ private ClassLoader ctxCL;
@Override
public void open() throws HyracksDataException {
+ ctxCL = Thread.currentThread().getContextClassLoader();
+ Thread.currentThread().setContextClassLoader(this.getClass().getClassLoader());
+ JobConf conf = confFactory.getConf();
+ String outputDirPath = FileOutputFormat.getOutputPath(conf).toString();
+ String fileName = outputDirPath + File.separator + "part-" + partition;
+
tupleWriter = tupleWriterFactory.getTupleWriter();
try {
FileSystem dfs = FileSystem.get(conf);
@@ -115,6 +119,8 @@
dos.close();
} catch (Exception e) {
throw new HyracksDataException(e);
+ } finally {
+ Thread.currentThread().setContextClassLoader(ctxCL);
}
}
diff --git a/hyracks/hyracks-hdfs/src/main/java/edu/uci/ics/hyracks/hdfs2/dataflow/HDFSReadOperatorDescriptor.java b/hyracks/hyracks-hdfs/src/main/java/edu/uci/ics/hyracks/hdfs2/dataflow/HDFSReadOperatorDescriptor.java
index 6ece1aa..b000c75 100644
--- a/hyracks/hyracks-hdfs/src/main/java/edu/uci/ics/hyracks/hdfs2/dataflow/HDFSReadOperatorDescriptor.java
+++ b/hyracks/hyracks-hdfs/src/main/java/edu/uci/ics/hyracks/hdfs2/dataflow/HDFSReadOperatorDescriptor.java
@@ -97,7 +97,6 @@
public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
IRecordDescriptorProvider recordDescProvider, final int partition, final int nPartitions)
throws HyracksDataException {
- final Job conf = confFactory.getConf();
final List<FileSplit> inputSplits = splitsFactory.getSplits();
return new AbstractUnaryOutputSourceOperatorNodePushable() {
@@ -109,6 +108,7 @@
ClassLoader ctxCL = Thread.currentThread().getContextClassLoader();
try {
Thread.currentThread().setContextClassLoader(this.getClass().getClassLoader());
+ Job conf = confFactory.getConf();
IKeyValueParser parser = tupleParserFactory.createKeyValueParser(ctx);
writer.open();
InputFormat inputFormat = ReflectionUtils.newInstance(conf.getInputFormatClass(),
diff --git a/hyracks/hyracks-hdfs/src/main/java/edu/uci/ics/hyracks/hdfs2/dataflow/HDFSWriteOperatorDescriptor.java b/hyracks/hyracks-hdfs/src/main/java/edu/uci/ics/hyracks/hdfs2/dataflow/HDFSWriteOperatorDescriptor.java
index 32bb9dc..87c1c47 100644
--- a/hyracks/hyracks-hdfs/src/main/java/edu/uci/ics/hyracks/hdfs2/dataflow/HDFSWriteOperatorDescriptor.java
+++ b/hyracks/hyracks-hdfs/src/main/java/edu/uci/ics/hyracks/hdfs2/dataflow/HDFSWriteOperatorDescriptor.java
@@ -72,21 +72,25 @@
public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
final IRecordDescriptorProvider recordDescProvider, final int partition, final int nPartitions)
throws HyracksDataException {
- final Job conf = confFactory.getConf();
- final String outputPath = FileOutputFormat.getOutputPath(new JobContext(conf.getConfiguration(), new JobID()))
- .toString();
return new AbstractUnaryInputSinkOperatorNodePushable() {
- private String fileName = outputPath + File.separator + "part-" + partition;
private FSDataOutputStream dos;
private RecordDescriptor inputRd = recordDescProvider.getInputRecordDescriptor(getActivityId(), 0);;
private FrameTupleAccessor accessor = new FrameTupleAccessor(ctx.getFrameSize(), inputRd);
private FrameTupleReference tuple = new FrameTupleReference();
private ITupleWriter tupleWriter;
+ private ClassLoader ctxCL;
@Override
public void open() throws HyracksDataException {
+ ctxCL = Thread.currentThread().getContextClassLoader();
+ Thread.currentThread().setContextClassLoader(this.getClass().getClassLoader());
+ Job conf = confFactory.getConf();
+ String outputPath = FileOutputFormat
+ .getOutputPath(new JobContext(conf.getConfiguration(), new JobID())).toString();
+ String fileName = outputPath + File.separator + "part-" + partition;
+
tupleWriter = tupleWriterFactory.getTupleWriter();
try {
FileSystem dfs = FileSystem.get(conf.getConfiguration());
@@ -117,6 +121,8 @@
dos.close();
} catch (Exception e) {
throw new HyracksDataException(e);
+ } finally {
+ Thread.currentThread().setContextClassLoader(ctxCL);
}
}
diff --git a/hyracks/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/ChannelSet.java b/hyracks/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/ChannelSet.java
index dabc8a4..eb5f716 100644
--- a/hyracks/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/ChannelSet.java
+++ b/hyracks/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/ChannelSet.java
@@ -58,6 +58,7 @@
ChannelControlBlock allocateChannel() throws NetException {
synchronized (mConn) {
+ //cleanupClosedChannels();
int idx = allocationBitmap.nextClearBit(0);
if (idx < 0 || idx >= ccbArray.length) {
cleanupClosedChannels();
@@ -231,4 +232,4 @@
ccbArray = Arrays.copyOf(ccbArray, ccbArray.length * 2);
}
}
-}
\ No newline at end of file
+}
diff --git a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/ReachabilityVertex.java b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/ReachabilityVertex.java
index 2792d88..0895386 100644
--- a/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/ReachabilityVertex.java
+++ b/pregelix/pregelix-example/src/main/java/edu/uci/ics/pregelix/example/ReachabilityVertex.java
@@ -116,10 +116,10 @@
@Override
public void compute(Iterator<ByteWritable> msgIterator) {
+ if (sourceId < 0) {
+ sourceId = getContext().getConfiguration().getLong(SOURCE_ID, SOURCE_ID_DEFAULT);
+ }
if (getSuperstep() == 1) {
- if (sourceId < 0) {
- sourceId = getContext().getConfiguration().getLong(SOURCE_ID, SOURCE_ID_DEFAULT);
- }
boolean isSource = isSource(getVertexId());
if (isSource) {
tmpVertexValue.set((byte) 1);